1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "table/merging_iterator.h"
13 #include "db/dbformat.h"
14 #include "db/pinned_iterators_manager.h"
15 #include "monitoring/perf_context_imp.h"
16 #include "rocksdb/comparator.h"
17 #include "rocksdb/iterator.h"
18 #include "rocksdb/options.h"
19 #include "table/internal_iterator.h"
20 #include "table/iter_heap.h"
21 #include "table/iterator_wrapper.h"
22 #include "util/arena.h"
23 #include "util/autovector.h"
24 #include "util/heap.h"
25 #include "util/stop_watch.h"
26 #include "util/sync_point.h"
29 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
31 typedef BinaryHeap
<IteratorWrapper
*, MaxIteratorComparator
> MergerMaxIterHeap
;
32 typedef BinaryHeap
<IteratorWrapper
*, MinIteratorComparator
> MergerMinIterHeap
;
35 const size_t kNumIterReserve
= 4;
37 class MergingIterator
: public InternalIterator
{
39 MergingIterator(const InternalKeyComparator
* comparator
,
40 InternalIterator
** children
, int n
, bool is_arena_mode
,
41 bool prefix_seek_mode
)
42 : is_arena_mode_(is_arena_mode
),
43 comparator_(comparator
),
46 minHeap_(comparator_
),
47 prefix_seek_mode_(prefix_seek_mode
),
48 pinned_iters_mgr_(nullptr) {
50 for (int i
= 0; i
< n
; i
++) {
51 children_
[i
].Set(children
[i
]);
53 for (auto& child
: children_
) {
55 assert(child
.status().ok());
56 minHeap_
.push(&child
);
58 considerStatus(child
.status());
61 current_
= CurrentForward();
64 void considerStatus(Status s
) {
65 if (!s
.ok() && status_
.ok()) {
70 virtual void AddIterator(InternalIterator
* iter
) {
71 assert(direction_
== kForward
);
72 children_
.emplace_back(iter
);
73 if (pinned_iters_mgr_
) {
74 iter
->SetPinnedItersMgr(pinned_iters_mgr_
);
76 auto new_wrapper
= children_
.back();
77 if (new_wrapper
.Valid()) {
78 assert(new_wrapper
.status().ok());
79 minHeap_
.push(&new_wrapper
);
80 current_
= CurrentForward();
82 considerStatus(new_wrapper
.status());
86 ~MergingIterator() override
{
87 for (auto& child
: children_
) {
88 child
.DeleteIter(is_arena_mode_
);
92 bool Valid() const override
{ return current_
!= nullptr && status_
.ok(); }
94 Status
status() const override
{ return status_
; }
96 void SeekToFirst() override
{
98 status_
= Status::OK();
99 for (auto& child
: children_
) {
102 assert(child
.status().ok());
103 minHeap_
.push(&child
);
105 considerStatus(child
.status());
108 direction_
= kForward
;
109 current_
= CurrentForward();
112 void SeekToLast() override
{
115 status_
= Status::OK();
116 for (auto& child
: children_
) {
119 assert(child
.status().ok());
120 maxHeap_
->push(&child
);
122 considerStatus(child
.status());
125 direction_
= kReverse
;
126 current_
= CurrentReverse();
129 void Seek(const Slice
& target
) override
{
131 status_
= Status::OK();
132 for (auto& child
: children_
) {
134 PERF_TIMER_GUARD(seek_child_seek_time
);
137 PERF_COUNTER_ADD(seek_child_seek_count
, 1);
140 assert(child
.status().ok());
141 PERF_TIMER_GUARD(seek_min_heap_time
);
142 minHeap_
.push(&child
);
144 considerStatus(child
.status());
147 direction_
= kForward
;
149 PERF_TIMER_GUARD(seek_min_heap_time
);
150 current_
= CurrentForward();
154 void SeekForPrev(const Slice
& target
) override
{
157 status_
= Status::OK();
159 for (auto& child
: children_
) {
161 PERF_TIMER_GUARD(seek_child_seek_time
);
162 child
.SeekForPrev(target
);
164 PERF_COUNTER_ADD(seek_child_seek_count
, 1);
167 assert(child
.status().ok());
168 PERF_TIMER_GUARD(seek_max_heap_time
);
169 maxHeap_
->push(&child
);
171 considerStatus(child
.status());
174 direction_
= kReverse
;
176 PERF_TIMER_GUARD(seek_max_heap_time
);
177 current_
= CurrentReverse();
181 void Next() override
{
184 // Ensure that all children are positioned after key().
185 // If we are moving in the forward direction, it is already
186 // true for all of the non-current children since current_ is
187 // the smallest child and key() == current_->key().
188 if (direction_
!= kForward
) {
190 // The loop advanced all non-current children to be > key() so current_
191 // should still be strictly the smallest key.
192 assert(current_
== CurrentForward());
195 // For the heap modifications below to be correct, current_ must be the
196 // current top of the heap.
197 assert(current_
== CurrentForward());
199 // as the current points to the current record. move the iterator forward.
201 if (current_
->Valid()) {
202 // current is still valid after the Next() call above. Call
203 // replace_top() to restore the heap property. When the same child
204 // iterator yields a sequence of keys, this is cheap.
205 assert(current_
->status().ok());
206 minHeap_
.replace_top(current_
);
208 // current stopped being valid, remove it from the heap.
209 considerStatus(current_
->status());
212 current_
= CurrentForward();
215 void Prev() override
{
217 // Ensure that all children are positioned before key().
218 // If we are moving in the reverse direction, it is already
219 // true for all of the non-current children since current_ is
220 // the largest child and key() == current_->key().
221 if (direction_
!= kReverse
) {
222 // Otherwise, retreat the non-current children. We retreat current_
223 // just after the if-block.
226 Slice target
= key();
227 for (auto& child
: children_
) {
228 if (&child
!= current_
) {
229 child
.SeekForPrev(target
);
230 TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child
);
231 considerStatus(child
.status());
232 if (child
.Valid() && comparator_
->Equal(target
, child
.key())) {
234 considerStatus(child
.status());
238 assert(child
.status().ok());
239 maxHeap_
->push(&child
);
242 direction_
= kReverse
;
243 if (!prefix_seek_mode_
) {
244 // Note that we don't do assert(current_ == CurrentReverse()) here
245 // because it is possible to have some keys larger than the seek-key
246 // inserted between Seek() and SeekToLast(), which makes current_ not
247 // equal to CurrentReverse().
248 current_
= CurrentReverse();
250 // The loop advanced all non-current children to be < key() so current_
251 // should still be strictly the smallest key.
252 assert(current_
== CurrentReverse());
255 // For the heap modifications below to be correct, current_ must be the
256 // current top of the heap.
257 assert(current_
== CurrentReverse());
260 if (current_
->Valid()) {
261 // current is still valid after the Prev() call above. Call
262 // replace_top() to restore the heap property. When the same child
263 // iterator yields a sequence of keys, this is cheap.
264 assert(current_
->status().ok());
265 maxHeap_
->replace_top(current_
);
267 // current stopped being valid, remove it from the heap.
268 considerStatus(current_
->status());
271 current_
= CurrentReverse();
274 Slice
key() const override
{
276 return current_
->key();
279 Slice
value() const override
{
281 return current_
->value();
284 void SetPinnedItersMgr(PinnedIteratorsManager
* pinned_iters_mgr
) override
{
285 pinned_iters_mgr_
= pinned_iters_mgr
;
286 for (auto& child
: children_
) {
287 child
.SetPinnedItersMgr(pinned_iters_mgr
);
291 bool IsKeyPinned() const override
{
293 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
294 current_
->IsKeyPinned();
297 bool IsValuePinned() const override
{
299 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
300 current_
->IsValuePinned();
304 // Clears heaps for both directions, used when changing direction or seeking
306 // Ensures that maxHeap_ is initialized when starting to go in the reverse
311 const InternalKeyComparator
* comparator_
;
312 autovector
<IteratorWrapper
, kNumIterReserve
> children_
;
314 // Cached pointer to child iterator with the current key, or nullptr if no
315 // child iterators are valid. This is the top of minHeap_ or maxHeap_
316 // depending on the direction.
317 IteratorWrapper
* current_
;
318 // If any of the children have non-ok status, this is one of them.
320 // Which direction is the iterator moving?
325 Direction direction_
;
326 MergerMinIterHeap minHeap_
;
327 bool prefix_seek_mode_
;
329 // Max heap is used for reverse iteration, which is way less common than
330 // forward. Lazily initialize it to save memory.
331 std::unique_ptr
<MergerMaxIterHeap
> maxHeap_
;
332 PinnedIteratorsManager
* pinned_iters_mgr_
;
334 void SwitchToForward();
336 IteratorWrapper
* CurrentForward() const {
337 assert(direction_
== kForward
);
338 return !minHeap_
.empty() ? minHeap_
.top() : nullptr;
341 IteratorWrapper
* CurrentReverse() const {
342 assert(direction_
== kReverse
);
344 return !maxHeap_
->empty() ? maxHeap_
->top() : nullptr;
348 void MergingIterator::SwitchToForward() {
349 // Otherwise, advance the non-current children. We advance current_
350 // just after the if-block.
352 Slice target
= key();
353 for (auto& child
: children_
) {
354 if (&child
!= current_
) {
356 considerStatus(child
.status());
357 if (child
.Valid() && comparator_
->Equal(target
, child
.key())) {
359 considerStatus(child
.status());
363 minHeap_
.push(&child
);
366 direction_
= kForward
;
369 void MergingIterator::ClearHeaps() {
376 void MergingIterator::InitMaxHeap() {
378 maxHeap_
.reset(new MergerMaxIterHeap(comparator_
));
382 InternalIterator
* NewMergingIterator(const InternalKeyComparator
* cmp
,
383 InternalIterator
** list
, int n
,
384 Arena
* arena
, bool prefix_seek_mode
) {
387 return NewEmptyInternalIterator
<Slice
>(arena
);
391 if (arena
== nullptr) {
392 return new MergingIterator(cmp
, list
, n
, false, prefix_seek_mode
);
394 auto mem
= arena
->AllocateAligned(sizeof(MergingIterator
));
395 return new (mem
) MergingIterator(cmp
, list
, n
, true, prefix_seek_mode
);
400 MergeIteratorBuilder::MergeIteratorBuilder(
401 const InternalKeyComparator
* comparator
, Arena
* a
, bool prefix_seek_mode
)
402 : first_iter(nullptr), use_merging_iter(false), arena(a
) {
403 auto mem
= arena
->AllocateAligned(sizeof(MergingIterator
));
405 new (mem
) MergingIterator(comparator
, nullptr, 0, true, prefix_seek_mode
);
408 MergeIteratorBuilder::~MergeIteratorBuilder() {
409 if (first_iter
!= nullptr) {
410 first_iter
->~InternalIterator();
412 if (merge_iter
!= nullptr) {
413 merge_iter
->~MergingIterator();
417 void MergeIteratorBuilder::AddIterator(InternalIterator
* iter
) {
418 if (!use_merging_iter
&& first_iter
!= nullptr) {
419 merge_iter
->AddIterator(first_iter
);
420 use_merging_iter
= true;
421 first_iter
= nullptr;
423 if (use_merging_iter
) {
424 merge_iter
->AddIterator(iter
);
430 InternalIterator
* MergeIteratorBuilder::Finish() {
431 InternalIterator
* ret
= nullptr;
432 if (!use_merging_iter
) {
434 first_iter
= nullptr;
437 merge_iter
= nullptr;
442 } // namespace rocksdb