]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/table/merging_iterator.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / table / merging_iterator.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "table/merging_iterator.h"
11#include <string>
12#include <vector>
11fdf7f2 13#include "db/dbformat.h"
7c673cae
FG
14#include "db/pinned_iterators_manager.h"
15#include "monitoring/perf_context_imp.h"
16#include "rocksdb/comparator.h"
17#include "rocksdb/iterator.h"
18#include "rocksdb/options.h"
19#include "table/internal_iterator.h"
20#include "table/iter_heap.h"
21#include "table/iterator_wrapper.h"
22#include "util/arena.h"
23#include "util/autovector.h"
24#include "util/heap.h"
25#include "util/stop_watch.h"
26#include "util/sync_point.h"
27
28namespace rocksdb {
29// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30namespace {
31typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
32typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
33} // namespace
34
35const size_t kNumIterReserve = 4;
36
37class MergingIterator : public InternalIterator {
38 public:
11fdf7f2
TL
39 MergingIterator(const InternalKeyComparator* comparator,
40 InternalIterator** children, int n, bool is_arena_mode,
41 bool prefix_seek_mode)
7c673cae
FG
42 : is_arena_mode_(is_arena_mode),
43 comparator_(comparator),
44 current_(nullptr),
45 direction_(kForward),
46 minHeap_(comparator_),
47 prefix_seek_mode_(prefix_seek_mode),
48 pinned_iters_mgr_(nullptr) {
49 children_.resize(n);
50 for (int i = 0; i < n; i++) {
51 children_[i].Set(children[i]);
52 }
53 for (auto& child : children_) {
54 if (child.Valid()) {
11fdf7f2 55 assert(child.status().ok());
7c673cae 56 minHeap_.push(&child);
11fdf7f2
TL
57 } else {
58 considerStatus(child.status());
7c673cae
FG
59 }
60 }
61 current_ = CurrentForward();
62 }
63
11fdf7f2
TL
64 void considerStatus(Status s) {
65 if (!s.ok() && status_.ok()) {
66 status_ = s;
67 }
68 }
69
7c673cae
FG
70 virtual void AddIterator(InternalIterator* iter) {
71 assert(direction_ == kForward);
72 children_.emplace_back(iter);
73 if (pinned_iters_mgr_) {
74 iter->SetPinnedItersMgr(pinned_iters_mgr_);
75 }
76 auto new_wrapper = children_.back();
77 if (new_wrapper.Valid()) {
11fdf7f2 78 assert(new_wrapper.status().ok());
7c673cae
FG
79 minHeap_.push(&new_wrapper);
80 current_ = CurrentForward();
11fdf7f2
TL
81 } else {
82 considerStatus(new_wrapper.status());
7c673cae
FG
83 }
84 }
85
494da23a 86 ~MergingIterator() override {
7c673cae
FG
87 for (auto& child : children_) {
88 child.DeleteIter(is_arena_mode_);
89 }
90 }
91
494da23a 92 bool Valid() const override { return current_ != nullptr && status_.ok(); }
11fdf7f2 93
494da23a 94 Status status() const override { return status_; }
7c673cae 95
494da23a 96 void SeekToFirst() override {
7c673cae 97 ClearHeaps();
11fdf7f2 98 status_ = Status::OK();
7c673cae
FG
99 for (auto& child : children_) {
100 child.SeekToFirst();
101 if (child.Valid()) {
11fdf7f2 102 assert(child.status().ok());
7c673cae 103 minHeap_.push(&child);
11fdf7f2
TL
104 } else {
105 considerStatus(child.status());
7c673cae
FG
106 }
107 }
108 direction_ = kForward;
109 current_ = CurrentForward();
110 }
111
494da23a 112 void SeekToLast() override {
7c673cae
FG
113 ClearHeaps();
114 InitMaxHeap();
11fdf7f2 115 status_ = Status::OK();
7c673cae
FG
116 for (auto& child : children_) {
117 child.SeekToLast();
118 if (child.Valid()) {
11fdf7f2 119 assert(child.status().ok());
7c673cae 120 maxHeap_->push(&child);
11fdf7f2
TL
121 } else {
122 considerStatus(child.status());
7c673cae
FG
123 }
124 }
125 direction_ = kReverse;
126 current_ = CurrentReverse();
127 }
128
494da23a 129 void Seek(const Slice& target) override {
7c673cae 130 ClearHeaps();
11fdf7f2 131 status_ = Status::OK();
7c673cae
FG
132 for (auto& child : children_) {
133 {
134 PERF_TIMER_GUARD(seek_child_seek_time);
135 child.Seek(target);
136 }
137 PERF_COUNTER_ADD(seek_child_seek_count, 1);
138
139 if (child.Valid()) {
11fdf7f2 140 assert(child.status().ok());
7c673cae
FG
141 PERF_TIMER_GUARD(seek_min_heap_time);
142 minHeap_.push(&child);
11fdf7f2
TL
143 } else {
144 considerStatus(child.status());
7c673cae
FG
145 }
146 }
147 direction_ = kForward;
148 {
149 PERF_TIMER_GUARD(seek_min_heap_time);
150 current_ = CurrentForward();
151 }
152 }
153
494da23a 154 void SeekForPrev(const Slice& target) override {
7c673cae
FG
155 ClearHeaps();
156 InitMaxHeap();
11fdf7f2 157 status_ = Status::OK();
7c673cae
FG
158
159 for (auto& child : children_) {
160 {
161 PERF_TIMER_GUARD(seek_child_seek_time);
162 child.SeekForPrev(target);
163 }
164 PERF_COUNTER_ADD(seek_child_seek_count, 1);
165
166 if (child.Valid()) {
11fdf7f2 167 assert(child.status().ok());
7c673cae
FG
168 PERF_TIMER_GUARD(seek_max_heap_time);
169 maxHeap_->push(&child);
11fdf7f2
TL
170 } else {
171 considerStatus(child.status());
7c673cae
FG
172 }
173 }
174 direction_ = kReverse;
175 {
176 PERF_TIMER_GUARD(seek_max_heap_time);
177 current_ = CurrentReverse();
178 }
179 }
180
494da23a 181 void Next() override {
7c673cae
FG
182 assert(Valid());
183
184 // Ensure that all children are positioned after key().
185 // If we are moving in the forward direction, it is already
186 // true for all of the non-current children since current_ is
187 // the smallest child and key() == current_->key().
188 if (direction_ != kForward) {
11fdf7f2 189 SwitchToForward();
7c673cae
FG
190 // The loop advanced all non-current children to be > key() so current_
191 // should still be strictly the smallest key.
192 assert(current_ == CurrentForward());
193 }
194
195 // For the heap modifications below to be correct, current_ must be the
196 // current top of the heap.
197 assert(current_ == CurrentForward());
198
199 // as the current points to the current record. move the iterator forward.
200 current_->Next();
201 if (current_->Valid()) {
202 // current is still valid after the Next() call above. Call
203 // replace_top() to restore the heap property. When the same child
204 // iterator yields a sequence of keys, this is cheap.
11fdf7f2 205 assert(current_->status().ok());
7c673cae
FG
206 minHeap_.replace_top(current_);
207 } else {
208 // current stopped being valid, remove it from the heap.
11fdf7f2 209 considerStatus(current_->status());
7c673cae
FG
210 minHeap_.pop();
211 }
212 current_ = CurrentForward();
213 }
214
494da23a 215 void Prev() override {
7c673cae
FG
216 assert(Valid());
217 // Ensure that all children are positioned before key().
218 // If we are moving in the reverse direction, it is already
219 // true for all of the non-current children since current_ is
220 // the largest child and key() == current_->key().
221 if (direction_ != kReverse) {
222 // Otherwise, retreat the non-current children. We retreat current_
223 // just after the if-block.
224 ClearHeaps();
225 InitMaxHeap();
11fdf7f2 226 Slice target = key();
7c673cae
FG
227 for (auto& child : children_) {
228 if (&child != current_) {
11fdf7f2
TL
229 child.SeekForPrev(target);
230 TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
231 considerStatus(child.status());
232 if (child.Valid() && comparator_->Equal(target, child.key())) {
233 child.Prev();
234 considerStatus(child.status());
7c673cae
FG
235 }
236 }
237 if (child.Valid()) {
11fdf7f2 238 assert(child.status().ok());
7c673cae
FG
239 maxHeap_->push(&child);
240 }
241 }
242 direction_ = kReverse;
243 if (!prefix_seek_mode_) {
244 // Note that we don't do assert(current_ == CurrentReverse()) here
245 // because it is possible to have some keys larger than the seek-key
246 // inserted between Seek() and SeekToLast(), which makes current_ not
247 // equal to CurrentReverse().
248 current_ = CurrentReverse();
249 }
250 // The loop advanced all non-current children to be < key() so current_
251 // should still be strictly the smallest key.
252 assert(current_ == CurrentReverse());
253 }
254
255 // For the heap modifications below to be correct, current_ must be the
256 // current top of the heap.
257 assert(current_ == CurrentReverse());
258
259 current_->Prev();
260 if (current_->Valid()) {
261 // current is still valid after the Prev() call above. Call
262 // replace_top() to restore the heap property. When the same child
263 // iterator yields a sequence of keys, this is cheap.
11fdf7f2 264 assert(current_->status().ok());
7c673cae
FG
265 maxHeap_->replace_top(current_);
266 } else {
267 // current stopped being valid, remove it from the heap.
11fdf7f2 268 considerStatus(current_->status());
7c673cae
FG
269 maxHeap_->pop();
270 }
271 current_ = CurrentReverse();
272 }
273
494da23a 274 Slice key() const override {
7c673cae
FG
275 assert(Valid());
276 return current_->key();
277 }
278
494da23a 279 Slice value() const override {
7c673cae
FG
280 assert(Valid());
281 return current_->value();
282 }
283
494da23a 284 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
7c673cae
FG
285 pinned_iters_mgr_ = pinned_iters_mgr;
286 for (auto& child : children_) {
287 child.SetPinnedItersMgr(pinned_iters_mgr);
288 }
289 }
290
494da23a 291 bool IsKeyPinned() const override {
7c673cae
FG
292 assert(Valid());
293 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
294 current_->IsKeyPinned();
295 }
296
494da23a 297 bool IsValuePinned() const override {
7c673cae
FG
298 assert(Valid());
299 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
300 current_->IsValuePinned();
301 }
302
303 private:
304 // Clears heaps for both directions, used when changing direction or seeking
305 void ClearHeaps();
306 // Ensures that maxHeap_ is initialized when starting to go in the reverse
307 // direction
308 void InitMaxHeap();
309
310 bool is_arena_mode_;
11fdf7f2 311 const InternalKeyComparator* comparator_;
7c673cae
FG
312 autovector<IteratorWrapper, kNumIterReserve> children_;
313
314 // Cached pointer to child iterator with the current key, or nullptr if no
315 // child iterators are valid. This is the top of minHeap_ or maxHeap_
316 // depending on the direction.
317 IteratorWrapper* current_;
11fdf7f2
TL
318 // If any of the children have non-ok status, this is one of them.
319 Status status_;
7c673cae
FG
320 // Which direction is the iterator moving?
321 enum Direction {
322 kForward,
323 kReverse
324 };
325 Direction direction_;
326 MergerMinIterHeap minHeap_;
327 bool prefix_seek_mode_;
328
329 // Max heap is used for reverse iteration, which is way less common than
330 // forward. Lazily initialize it to save memory.
331 std::unique_ptr<MergerMaxIterHeap> maxHeap_;
332 PinnedIteratorsManager* pinned_iters_mgr_;
333
11fdf7f2
TL
334 void SwitchToForward();
335
7c673cae
FG
336 IteratorWrapper* CurrentForward() const {
337 assert(direction_ == kForward);
338 return !minHeap_.empty() ? minHeap_.top() : nullptr;
339 }
340
341 IteratorWrapper* CurrentReverse() const {
342 assert(direction_ == kReverse);
343 assert(maxHeap_);
344 return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
345 }
346};
347
11fdf7f2
TL
348void MergingIterator::SwitchToForward() {
349 // Otherwise, advance the non-current children. We advance current_
350 // just after the if-block.
351 ClearHeaps();
352 Slice target = key();
353 for (auto& child : children_) {
354 if (&child != current_) {
355 child.Seek(target);
356 considerStatus(child.status());
357 if (child.Valid() && comparator_->Equal(target, child.key())) {
358 child.Next();
359 considerStatus(child.status());
360 }
361 }
362 if (child.Valid()) {
363 minHeap_.push(&child);
364 }
365 }
366 direction_ = kForward;
367}
368
7c673cae
FG
369void MergingIterator::ClearHeaps() {
370 minHeap_.clear();
371 if (maxHeap_) {
372 maxHeap_->clear();
373 }
374}
375
376void MergingIterator::InitMaxHeap() {
377 if (!maxHeap_) {
378 maxHeap_.reset(new MergerMaxIterHeap(comparator_));
379 }
380}
381
11fdf7f2 382InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
7c673cae
FG
383 InternalIterator** list, int n,
384 Arena* arena, bool prefix_seek_mode) {
385 assert(n >= 0);
386 if (n == 0) {
11fdf7f2 387 return NewEmptyInternalIterator<Slice>(arena);
7c673cae
FG
388 } else if (n == 1) {
389 return list[0];
390 } else {
391 if (arena == nullptr) {
392 return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
393 } else {
394 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
395 return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
396 }
397 }
398}
399
11fdf7f2
TL
400MergeIteratorBuilder::MergeIteratorBuilder(
401 const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
7c673cae
FG
402 : first_iter(nullptr), use_merging_iter(false), arena(a) {
403 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
404 merge_iter =
405 new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
406}
407
11fdf7f2
TL
408MergeIteratorBuilder::~MergeIteratorBuilder() {
409 if (first_iter != nullptr) {
410 first_iter->~InternalIterator();
411 }
412 if (merge_iter != nullptr) {
413 merge_iter->~MergingIterator();
414 }
415}
416
7c673cae
FG
417void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
418 if (!use_merging_iter && first_iter != nullptr) {
419 merge_iter->AddIterator(first_iter);
420 use_merging_iter = true;
11fdf7f2 421 first_iter = nullptr;
7c673cae
FG
422 }
423 if (use_merging_iter) {
424 merge_iter->AddIterator(iter);
425 } else {
426 first_iter = iter;
427 }
428}
429
430InternalIterator* MergeIteratorBuilder::Finish() {
11fdf7f2 431 InternalIterator* ret = nullptr;
7c673cae 432 if (!use_merging_iter) {
11fdf7f2
TL
433 ret = first_iter;
434 first_iter = nullptr;
7c673cae 435 } else {
11fdf7f2 436 ret = merge_iter;
7c673cae 437 merge_iter = nullptr;
7c673cae 438 }
11fdf7f2 439 return ret;
7c673cae
FG
440}
441
442} // namespace rocksdb