#include <vector>
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
+#include "memory/arena.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "table/internal_iterator.h"
#include "table/iter_heap.h"
#include "table/iterator_wrapper.h"
-#include "util/arena.h"
+#include "test_util/sync_point.h"
#include "util/autovector.h"
#include "util/heap.h"
#include "util/stop_watch.h"
-#include "util/sync_point.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {
typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
children_[i].Set(children[i]);
}
for (auto& child : children_) {
- if (child.Valid()) {
- assert(child.status().ok());
- minHeap_.push(&child);
- } else {
- considerStatus(child.status());
- }
+ AddToMinHeapOrCheckStatus(&child);
}
current_ = CurrentForward();
}
iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
auto new_wrapper = children_.back();
+ AddToMinHeapOrCheckStatus(&new_wrapper);
if (new_wrapper.Valid()) {
- assert(new_wrapper.status().ok());
- minHeap_.push(&new_wrapper);
current_ = CurrentForward();
- } else {
- considerStatus(new_wrapper.status());
}
}
status_ = Status::OK();
for (auto& child : children_) {
child.SeekToFirst();
- if (child.Valid()) {
- assert(child.status().ok());
- minHeap_.push(&child);
- } else {
- considerStatus(child.status());
- }
+ AddToMinHeapOrCheckStatus(&child);
}
direction_ = kForward;
current_ = CurrentForward();
status_ = Status::OK();
for (auto& child : children_) {
child.SeekToLast();
- if (child.Valid()) {
- assert(child.status().ok());
- maxHeap_->push(&child);
- } else {
- considerStatus(child.status());
- }
+ AddToMaxHeapOrCheckStatus(&child);
}
direction_ = kReverse;
current_ = CurrentReverse();
PERF_TIMER_GUARD(seek_child_seek_time);
child.Seek(target);
}
- PERF_COUNTER_ADD(seek_child_seek_count, 1);
- if (child.Valid()) {
- assert(child.status().ok());
+ PERF_COUNTER_ADD(seek_child_seek_count, 1);
+ {
+ // Strictly, we timed slightly more than min heap operation,
+ // but these operations are very cheap.
PERF_TIMER_GUARD(seek_min_heap_time);
- minHeap_.push(&child);
- } else {
- considerStatus(child.status());
+ AddToMinHeapOrCheckStatus(&child);
}
}
direction_ = kForward;
}
PERF_COUNTER_ADD(seek_child_seek_count, 1);
- if (child.Valid()) {
- assert(child.status().ok());
+ {
PERF_TIMER_GUARD(seek_max_heap_time);
- maxHeap_->push(&child);
- } else {
- considerStatus(child.status());
+ AddToMaxHeapOrCheckStatus(&child);
}
}
direction_ = kReverse;
SwitchToForward();
// The loop advanced all non-current children to be > key() so current_
// should still be strictly the smallest key.
- assert(current_ == CurrentForward());
}
// For the heap modifications below to be correct, current_ must be the
current_ = CurrentForward();
}
+ bool NextAndGetResult(IterateResult* result) override {
+ Next();
+ bool is_valid = Valid();
+ if (is_valid) {
+ result->key = key();
+ result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
+ }
+ return is_valid;
+ }
+
void Prev() override {
assert(Valid());
// Ensure that all children are positioned before key().
if (direction_ != kReverse) {
// Otherwise, retreat the non-current children. We retreat current_
// just after the if-block.
- ClearHeaps();
- InitMaxHeap();
- Slice target = key();
- for (auto& child : children_) {
- if (&child != current_) {
- child.SeekForPrev(target);
- TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
- considerStatus(child.status());
- if (child.Valid() && comparator_->Equal(target, child.key())) {
- child.Prev();
- considerStatus(child.status());
- }
- }
- if (child.Valid()) {
- assert(child.status().ok());
- maxHeap_->push(&child);
- }
- }
- direction_ = kReverse;
- if (!prefix_seek_mode_) {
- // Note that we don't do assert(current_ == CurrentReverse()) here
- // because it is possible to have some keys larger than the seek-key
- // inserted between Seek() and SeekToLast(), which makes current_ not
- // equal to CurrentReverse().
- current_ = CurrentReverse();
- }
- // The loop advanced all non-current children to be < key() so current_
- // should still be strictly the smallest key.
- assert(current_ == CurrentReverse());
+ SwitchToBackward();
}
// For the heap modifications below to be correct, current_ must be the
return current_->value();
}
+ // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
+ // from current child iterator. Potentially as long as one of child iterator
+ // report out of bound is not possible, we know current key is within bound.
+
+ bool MayBeOutOfLowerBound() override {
+ assert(Valid());
+ return current_->MayBeOutOfLowerBound();
+ }
+
+ bool MayBeOutOfUpperBound() override {
+ assert(Valid());
+ return current_->MayBeOutOfUpperBound();
+ }
+
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
for (auto& child : children_) {
std::unique_ptr<MergerMaxIterHeap> maxHeap_;
PinnedIteratorsManager* pinned_iters_mgr_;
+ // In forward direction, process a child that is not in the min heap.
+ // If valid, add to the min heap. Otherwise, check status.
+ void AddToMinHeapOrCheckStatus(IteratorWrapper*);
+
+ // In backward direction, process a child that is not in the max heap.
+ // If valid, add to the min heap. Otherwise, check status.
+ void AddToMaxHeapOrCheckStatus(IteratorWrapper*);
+
void SwitchToForward();
+ // Switch the direction from forward to backward without changing the
+ // position. Iterator should still be valid.
+ void SwitchToBackward();
+
IteratorWrapper* CurrentForward() const {
assert(direction_ == kForward);
return !minHeap_.empty() ? minHeap_.top() : nullptr;
}
};
+void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
+ if (child->Valid()) {
+ assert(child->status().ok());
+ minHeap_.push(child);
+ } else {
+ considerStatus(child->status());
+ }
+}
+
+void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
+ if (child->Valid()) {
+ assert(child->status().ok());
+ maxHeap_->push(child);
+ } else {
+ considerStatus(child->status());
+ }
+}
+
void MergingIterator::SwitchToForward() {
// Otherwise, advance the non-current children. We advance current_
// just after the if-block.
for (auto& child : children_) {
if (&child != current_) {
child.Seek(target);
- considerStatus(child.status());
if (child.Valid() && comparator_->Equal(target, child.key())) {
+ assert(child.status().ok());
child.Next();
- considerStatus(child.status());
}
}
- if (child.Valid()) {
- minHeap_.push(&child);
- }
+ AddToMinHeapOrCheckStatus(&child);
}
direction_ = kForward;
}
+void MergingIterator::SwitchToBackward() {
+ ClearHeaps();
+ InitMaxHeap();
+ Slice target = key();
+ for (auto& child : children_) {
+ if (&child != current_) {
+ child.SeekForPrev(target);
+ TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
+ if (child.Valid() && comparator_->Equal(target, child.key())) {
+ assert(child.status().ok());
+ child.Prev();
+ }
+ }
+ AddToMaxHeapOrCheckStatus(&child);
+ }
+ direction_ = kReverse;
+ if (!prefix_seek_mode_) {
+ // Note that we don't do assert(current_ == CurrentReverse()) here
+ // because it is possible to have some keys larger than the seek-key
+ // inserted between Seek() and SeekToLast(), which makes current_ not
+ // equal to CurrentReverse().
+ current_ = CurrentReverse();
+ }
+ assert(current_ == CurrentReverse());
+}
+
void MergingIterator::ClearHeaps() {
minHeap_.clear();
if (maxHeap_) {
return ret;
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE