1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "table/merging_iterator.h"
13 #include "db/pinned_iterators_manager.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "rocksdb/comparator.h"
16 #include "rocksdb/iterator.h"
17 #include "rocksdb/options.h"
18 #include "table/internal_iterator.h"
19 #include "table/iter_heap.h"
20 #include "table/iterator_wrapper.h"
21 #include "util/arena.h"
22 #include "util/autovector.h"
23 #include "util/heap.h"
24 #include "util/stop_watch.h"
25 #include "util/sync_point.h"
28 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30 typedef BinaryHeap
<IteratorWrapper
*, MaxIteratorComparator
> MergerMaxIterHeap
;
31 typedef BinaryHeap
<IteratorWrapper
*, MinIteratorComparator
> MergerMinIterHeap
;
34 const size_t kNumIterReserve
= 4;
36 class MergingIterator
: public InternalIterator
{
38 MergingIterator(const Comparator
* comparator
, InternalIterator
** children
,
39 int n
, bool is_arena_mode
, bool prefix_seek_mode
)
40 : is_arena_mode_(is_arena_mode
),
41 comparator_(comparator
),
44 minHeap_(comparator_
),
45 prefix_seek_mode_(prefix_seek_mode
),
46 pinned_iters_mgr_(nullptr) {
48 for (int i
= 0; i
< n
; i
++) {
49 children_
[i
].Set(children
[i
]);
51 for (auto& child
: children_
) {
53 minHeap_
.push(&child
);
56 current_
= CurrentForward();
59 virtual void AddIterator(InternalIterator
* iter
) {
60 assert(direction_
== kForward
);
61 children_
.emplace_back(iter
);
62 if (pinned_iters_mgr_
) {
63 iter
->SetPinnedItersMgr(pinned_iters_mgr_
);
65 auto new_wrapper
= children_
.back();
66 if (new_wrapper
.Valid()) {
67 minHeap_
.push(&new_wrapper
);
68 current_
= CurrentForward();
72 virtual ~MergingIterator() {
73 for (auto& child
: children_
) {
74 child
.DeleteIter(is_arena_mode_
);
78 virtual bool Valid() const override
{ return (current_
!= nullptr); }
80 virtual void SeekToFirst() override
{
82 for (auto& child
: children_
) {
85 minHeap_
.push(&child
);
88 direction_
= kForward
;
89 current_
= CurrentForward();
92 virtual void SeekToLast() override
{
95 for (auto& child
: children_
) {
98 maxHeap_
->push(&child
);
101 direction_
= kReverse
;
102 current_
= CurrentReverse();
105 virtual void Seek(const Slice
& target
) override
{
107 for (auto& child
: children_
) {
109 PERF_TIMER_GUARD(seek_child_seek_time
);
112 PERF_COUNTER_ADD(seek_child_seek_count
, 1);
115 PERF_TIMER_GUARD(seek_min_heap_time
);
116 minHeap_
.push(&child
);
119 direction_
= kForward
;
121 PERF_TIMER_GUARD(seek_min_heap_time
);
122 current_
= CurrentForward();
126 virtual void SeekForPrev(const Slice
& target
) override
{
130 for (auto& child
: children_
) {
132 PERF_TIMER_GUARD(seek_child_seek_time
);
133 child
.SeekForPrev(target
);
135 PERF_COUNTER_ADD(seek_child_seek_count
, 1);
138 PERF_TIMER_GUARD(seek_max_heap_time
);
139 maxHeap_
->push(&child
);
142 direction_
= kReverse
;
144 PERF_TIMER_GUARD(seek_max_heap_time
);
145 current_
= CurrentReverse();
149 virtual void Next() override
{
152 // Ensure that all children are positioned after key().
153 // If we are moving in the forward direction, it is already
154 // true for all of the non-current children since current_ is
155 // the smallest child and key() == current_->key().
156 if (direction_
!= kForward
) {
157 // Otherwise, advance the non-current children. We advance current_
158 // just after the if-block.
160 for (auto& child
: children_
) {
161 if (&child
!= current_
) {
163 if (child
.Valid() && comparator_
->Equal(key(), child
.key())) {
168 minHeap_
.push(&child
);
171 direction_
= kForward
;
172 // The loop advanced all non-current children to be > key() so current_
173 // should still be strictly the smallest key.
174 assert(current_
== CurrentForward());
177 // For the heap modifications below to be correct, current_ must be the
178 // current top of the heap.
179 assert(current_
== CurrentForward());
181 // as the current points to the current record. move the iterator forward.
183 if (current_
->Valid()) {
184 // current is still valid after the Next() call above. Call
185 // replace_top() to restore the heap property. When the same child
186 // iterator yields a sequence of keys, this is cheap.
187 minHeap_
.replace_top(current_
);
189 // current stopped being valid, remove it from the heap.
192 current_
= CurrentForward();
195 virtual void Prev() override
{
197 // Ensure that all children are positioned before key().
198 // If we are moving in the reverse direction, it is already
199 // true for all of the non-current children since current_ is
200 // the largest child and key() == current_->key().
201 if (direction_
!= kReverse
) {
202 // Otherwise, retreat the non-current children. We retreat current_
203 // just after the if-block.
206 for (auto& child
: children_
) {
207 if (&child
!= current_
) {
208 if (!prefix_seek_mode_
) {
211 // Child is at first entry >= key(). Step back one to be < key()
212 TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev",
216 // Child has no entries >= key(). Position at last entry.
217 TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast");
221 child
.SeekForPrev(key());
222 if (child
.Valid() && comparator_
->Equal(key(), child
.key())) {
228 maxHeap_
->push(&child
);
231 direction_
= kReverse
;
232 if (!prefix_seek_mode_
) {
233 // Note that we don't do assert(current_ == CurrentReverse()) here
234 // because it is possible to have some keys larger than the seek-key
235 // inserted between Seek() and SeekToLast(), which makes current_ not
236 // equal to CurrentReverse().
237 current_
= CurrentReverse();
239 // The loop advanced all non-current children to be < key() so current_
240 // should still be strictly the smallest key.
241 assert(current_
== CurrentReverse());
244 // For the heap modifications below to be correct, current_ must be the
245 // current top of the heap.
246 assert(current_
== CurrentReverse());
249 if (current_
->Valid()) {
250 // current is still valid after the Prev() call above. Call
251 // replace_top() to restore the heap property. When the same child
252 // iterator yields a sequence of keys, this is cheap.
253 maxHeap_
->replace_top(current_
);
255 // current stopped being valid, remove it from the heap.
258 current_
= CurrentReverse();
261 virtual Slice
key() const override
{
263 return current_
->key();
266 virtual Slice
value() const override
{
268 return current_
->value();
271 virtual Status
status() const override
{
273 for (auto& child
: children_
) {
282 virtual void SetPinnedItersMgr(
283 PinnedIteratorsManager
* pinned_iters_mgr
) override
{
284 pinned_iters_mgr_
= pinned_iters_mgr
;
285 for (auto& child
: children_
) {
286 child
.SetPinnedItersMgr(pinned_iters_mgr
);
290 virtual bool IsKeyPinned() const override
{
292 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
293 current_
->IsKeyPinned();
296 virtual bool IsValuePinned() const override
{
298 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
299 current_
->IsValuePinned();
303 // Clears heaps for both directions, used when changing direction or seeking
305 // Ensures that maxHeap_ is initialized when starting to go in the reverse
310 const Comparator
* comparator_
;
311 autovector
<IteratorWrapper
, kNumIterReserve
> children_
;
313 // Cached pointer to child iterator with the current key, or nullptr if no
314 // child iterators are valid. This is the top of minHeap_ or maxHeap_
315 // depending on the direction.
316 IteratorWrapper
* current_
;
317 // Which direction is the iterator moving?
322 Direction direction_
;
323 MergerMinIterHeap minHeap_
;
324 bool prefix_seek_mode_
;
326 // Max heap is used for reverse iteration, which is way less common than
327 // forward. Lazily initialize it to save memory.
328 std::unique_ptr
<MergerMaxIterHeap
> maxHeap_
;
329 PinnedIteratorsManager
* pinned_iters_mgr_
;
331 IteratorWrapper
* CurrentForward() const {
332 assert(direction_
== kForward
);
333 return !minHeap_
.empty() ? minHeap_
.top() : nullptr;
336 IteratorWrapper
* CurrentReverse() const {
337 assert(direction_
== kReverse
);
339 return !maxHeap_
->empty() ? maxHeap_
->top() : nullptr;
343 void MergingIterator::ClearHeaps() {
350 void MergingIterator::InitMaxHeap() {
352 maxHeap_
.reset(new MergerMaxIterHeap(comparator_
));
356 InternalIterator
* NewMergingIterator(const Comparator
* cmp
,
357 InternalIterator
** list
, int n
,
358 Arena
* arena
, bool prefix_seek_mode
) {
361 return NewEmptyInternalIterator(arena
);
365 if (arena
== nullptr) {
366 return new MergingIterator(cmp
, list
, n
, false, prefix_seek_mode
);
368 auto mem
= arena
->AllocateAligned(sizeof(MergingIterator
));
369 return new (mem
) MergingIterator(cmp
, list
, n
, true, prefix_seek_mode
);
374 MergeIteratorBuilder::MergeIteratorBuilder(const Comparator
* comparator
,
375 Arena
* a
, bool prefix_seek_mode
)
376 : first_iter(nullptr), use_merging_iter(false), arena(a
) {
377 auto mem
= arena
->AllocateAligned(sizeof(MergingIterator
));
379 new (mem
) MergingIterator(comparator
, nullptr, 0, true, prefix_seek_mode
);
382 void MergeIteratorBuilder::AddIterator(InternalIterator
* iter
) {
383 if (!use_merging_iter
&& first_iter
!= nullptr) {
384 merge_iter
->AddIterator(first_iter
);
385 use_merging_iter
= true;
387 if (use_merging_iter
) {
388 merge_iter
->AddIterator(iter
);
394 InternalIterator
* MergeIteratorBuilder::Finish() {
395 if (!use_merging_iter
) {
398 auto ret
= merge_iter
;
399 merge_iter
= nullptr;
404 } // namespace rocksdb