]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/table/merging_iterator.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / table / merging_iterator.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/merging_iterator.h"
11 #include <string>
12 #include <vector>
13 #include "db/dbformat.h"
14 #include "db/pinned_iterators_manager.h"
15 #include "monitoring/perf_context_imp.h"
16 #include "rocksdb/comparator.h"
17 #include "rocksdb/iterator.h"
18 #include "rocksdb/options.h"
19 #include "table/internal_iterator.h"
20 #include "table/iter_heap.h"
21 #include "table/iterator_wrapper.h"
22 #include "util/arena.h"
23 #include "util/autovector.h"
24 #include "util/heap.h"
25 #include "util/stop_watch.h"
26 #include "util/sync_point.h"
27
28 namespace rocksdb {
29 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30 namespace {
31 typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
32 typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
33 } // namespace
34
35 const size_t kNumIterReserve = 4;
36
37 class MergingIterator : public InternalIterator {
38 public:
39 MergingIterator(const InternalKeyComparator* comparator,
40 InternalIterator** children, int n, bool is_arena_mode,
41 bool prefix_seek_mode)
42 : is_arena_mode_(is_arena_mode),
43 comparator_(comparator),
44 current_(nullptr),
45 direction_(kForward),
46 minHeap_(comparator_),
47 prefix_seek_mode_(prefix_seek_mode),
48 pinned_iters_mgr_(nullptr) {
49 children_.resize(n);
50 for (int i = 0; i < n; i++) {
51 children_[i].Set(children[i]);
52 }
53 for (auto& child : children_) {
54 if (child.Valid()) {
55 assert(child.status().ok());
56 minHeap_.push(&child);
57 } else {
58 considerStatus(child.status());
59 }
60 }
61 current_ = CurrentForward();
62 }
63
64 void considerStatus(Status s) {
65 if (!s.ok() && status_.ok()) {
66 status_ = s;
67 }
68 }
69
70 virtual void AddIterator(InternalIterator* iter) {
71 assert(direction_ == kForward);
72 children_.emplace_back(iter);
73 if (pinned_iters_mgr_) {
74 iter->SetPinnedItersMgr(pinned_iters_mgr_);
75 }
76 auto new_wrapper = children_.back();
77 if (new_wrapper.Valid()) {
78 assert(new_wrapper.status().ok());
79 minHeap_.push(&new_wrapper);
80 current_ = CurrentForward();
81 } else {
82 considerStatus(new_wrapper.status());
83 }
84 }
85
86 ~MergingIterator() override {
87 for (auto& child : children_) {
88 child.DeleteIter(is_arena_mode_);
89 }
90 }
91
92 bool Valid() const override { return current_ != nullptr && status_.ok(); }
93
94 Status status() const override { return status_; }
95
96 void SeekToFirst() override {
97 ClearHeaps();
98 status_ = Status::OK();
99 for (auto& child : children_) {
100 child.SeekToFirst();
101 if (child.Valid()) {
102 assert(child.status().ok());
103 minHeap_.push(&child);
104 } else {
105 considerStatus(child.status());
106 }
107 }
108 direction_ = kForward;
109 current_ = CurrentForward();
110 }
111
112 void SeekToLast() override {
113 ClearHeaps();
114 InitMaxHeap();
115 status_ = Status::OK();
116 for (auto& child : children_) {
117 child.SeekToLast();
118 if (child.Valid()) {
119 assert(child.status().ok());
120 maxHeap_->push(&child);
121 } else {
122 considerStatus(child.status());
123 }
124 }
125 direction_ = kReverse;
126 current_ = CurrentReverse();
127 }
128
129 void Seek(const Slice& target) override {
130 ClearHeaps();
131 status_ = Status::OK();
132 for (auto& child : children_) {
133 {
134 PERF_TIMER_GUARD(seek_child_seek_time);
135 child.Seek(target);
136 }
137 PERF_COUNTER_ADD(seek_child_seek_count, 1);
138
139 if (child.Valid()) {
140 assert(child.status().ok());
141 PERF_TIMER_GUARD(seek_min_heap_time);
142 minHeap_.push(&child);
143 } else {
144 considerStatus(child.status());
145 }
146 }
147 direction_ = kForward;
148 {
149 PERF_TIMER_GUARD(seek_min_heap_time);
150 current_ = CurrentForward();
151 }
152 }
153
154 void SeekForPrev(const Slice& target) override {
155 ClearHeaps();
156 InitMaxHeap();
157 status_ = Status::OK();
158
159 for (auto& child : children_) {
160 {
161 PERF_TIMER_GUARD(seek_child_seek_time);
162 child.SeekForPrev(target);
163 }
164 PERF_COUNTER_ADD(seek_child_seek_count, 1);
165
166 if (child.Valid()) {
167 assert(child.status().ok());
168 PERF_TIMER_GUARD(seek_max_heap_time);
169 maxHeap_->push(&child);
170 } else {
171 considerStatus(child.status());
172 }
173 }
174 direction_ = kReverse;
175 {
176 PERF_TIMER_GUARD(seek_max_heap_time);
177 current_ = CurrentReverse();
178 }
179 }
180
181 void Next() override {
182 assert(Valid());
183
184 // Ensure that all children are positioned after key().
185 // If we are moving in the forward direction, it is already
186 // true for all of the non-current children since current_ is
187 // the smallest child and key() == current_->key().
188 if (direction_ != kForward) {
189 SwitchToForward();
190 // The loop advanced all non-current children to be > key() so current_
191 // should still be strictly the smallest key.
192 assert(current_ == CurrentForward());
193 }
194
195 // For the heap modifications below to be correct, current_ must be the
196 // current top of the heap.
197 assert(current_ == CurrentForward());
198
199 // as the current points to the current record. move the iterator forward.
200 current_->Next();
201 if (current_->Valid()) {
202 // current is still valid after the Next() call above. Call
203 // replace_top() to restore the heap property. When the same child
204 // iterator yields a sequence of keys, this is cheap.
205 assert(current_->status().ok());
206 minHeap_.replace_top(current_);
207 } else {
208 // current stopped being valid, remove it from the heap.
209 considerStatus(current_->status());
210 minHeap_.pop();
211 }
212 current_ = CurrentForward();
213 }
214
215 void Prev() override {
216 assert(Valid());
217 // Ensure that all children are positioned before key().
218 // If we are moving in the reverse direction, it is already
219 // true for all of the non-current children since current_ is
220 // the largest child and key() == current_->key().
221 if (direction_ != kReverse) {
222 // Otherwise, retreat the non-current children. We retreat current_
223 // just after the if-block.
224 ClearHeaps();
225 InitMaxHeap();
226 Slice target = key();
227 for (auto& child : children_) {
228 if (&child != current_) {
229 child.SeekForPrev(target);
230 TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
231 considerStatus(child.status());
232 if (child.Valid() && comparator_->Equal(target, child.key())) {
233 child.Prev();
234 considerStatus(child.status());
235 }
236 }
237 if (child.Valid()) {
238 assert(child.status().ok());
239 maxHeap_->push(&child);
240 }
241 }
242 direction_ = kReverse;
243 if (!prefix_seek_mode_) {
244 // Note that we don't do assert(current_ == CurrentReverse()) here
245 // because it is possible to have some keys larger than the seek-key
246 // inserted between Seek() and SeekToLast(), which makes current_ not
247 // equal to CurrentReverse().
248 current_ = CurrentReverse();
249 }
250 // The loop advanced all non-current children to be < key() so current_
251 // should still be strictly the smallest key.
252 assert(current_ == CurrentReverse());
253 }
254
255 // For the heap modifications below to be correct, current_ must be the
256 // current top of the heap.
257 assert(current_ == CurrentReverse());
258
259 current_->Prev();
260 if (current_->Valid()) {
261 // current is still valid after the Prev() call above. Call
262 // replace_top() to restore the heap property. When the same child
263 // iterator yields a sequence of keys, this is cheap.
264 assert(current_->status().ok());
265 maxHeap_->replace_top(current_);
266 } else {
267 // current stopped being valid, remove it from the heap.
268 considerStatus(current_->status());
269 maxHeap_->pop();
270 }
271 current_ = CurrentReverse();
272 }
273
274 Slice key() const override {
275 assert(Valid());
276 return current_->key();
277 }
278
279 Slice value() const override {
280 assert(Valid());
281 return current_->value();
282 }
283
284 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
285 pinned_iters_mgr_ = pinned_iters_mgr;
286 for (auto& child : children_) {
287 child.SetPinnedItersMgr(pinned_iters_mgr);
288 }
289 }
290
291 bool IsKeyPinned() const override {
292 assert(Valid());
293 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
294 current_->IsKeyPinned();
295 }
296
297 bool IsValuePinned() const override {
298 assert(Valid());
299 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
300 current_->IsValuePinned();
301 }
302
303 private:
304 // Clears heaps for both directions, used when changing direction or seeking
305 void ClearHeaps();
306 // Ensures that maxHeap_ is initialized when starting to go in the reverse
307 // direction
308 void InitMaxHeap();
309
310 bool is_arena_mode_;
311 const InternalKeyComparator* comparator_;
312 autovector<IteratorWrapper, kNumIterReserve> children_;
313
314 // Cached pointer to child iterator with the current key, or nullptr if no
315 // child iterators are valid. This is the top of minHeap_ or maxHeap_
316 // depending on the direction.
317 IteratorWrapper* current_;
318 // If any of the children have non-ok status, this is one of them.
319 Status status_;
320 // Which direction is the iterator moving?
321 enum Direction {
322 kForward,
323 kReverse
324 };
325 Direction direction_;
326 MergerMinIterHeap minHeap_;
327 bool prefix_seek_mode_;
328
329 // Max heap is used for reverse iteration, which is way less common than
330 // forward. Lazily initialize it to save memory.
331 std::unique_ptr<MergerMaxIterHeap> maxHeap_;
332 PinnedIteratorsManager* pinned_iters_mgr_;
333
334 void SwitchToForward();
335
336 IteratorWrapper* CurrentForward() const {
337 assert(direction_ == kForward);
338 return !minHeap_.empty() ? minHeap_.top() : nullptr;
339 }
340
341 IteratorWrapper* CurrentReverse() const {
342 assert(direction_ == kReverse);
343 assert(maxHeap_);
344 return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
345 }
346 };
347
348 void MergingIterator::SwitchToForward() {
349 // Otherwise, advance the non-current children. We advance current_
350 // just after the if-block.
351 ClearHeaps();
352 Slice target = key();
353 for (auto& child : children_) {
354 if (&child != current_) {
355 child.Seek(target);
356 considerStatus(child.status());
357 if (child.Valid() && comparator_->Equal(target, child.key())) {
358 child.Next();
359 considerStatus(child.status());
360 }
361 }
362 if (child.Valid()) {
363 minHeap_.push(&child);
364 }
365 }
366 direction_ = kForward;
367 }
368
369 void MergingIterator::ClearHeaps() {
370 minHeap_.clear();
371 if (maxHeap_) {
372 maxHeap_->clear();
373 }
374 }
375
376 void MergingIterator::InitMaxHeap() {
377 if (!maxHeap_) {
378 maxHeap_.reset(new MergerMaxIterHeap(comparator_));
379 }
380 }
381
382 InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
383 InternalIterator** list, int n,
384 Arena* arena, bool prefix_seek_mode) {
385 assert(n >= 0);
386 if (n == 0) {
387 return NewEmptyInternalIterator<Slice>(arena);
388 } else if (n == 1) {
389 return list[0];
390 } else {
391 if (arena == nullptr) {
392 return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
393 } else {
394 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
395 return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
396 }
397 }
398 }
399
400 MergeIteratorBuilder::MergeIteratorBuilder(
401 const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
402 : first_iter(nullptr), use_merging_iter(false), arena(a) {
403 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
404 merge_iter =
405 new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
406 }
407
408 MergeIteratorBuilder::~MergeIteratorBuilder() {
409 if (first_iter != nullptr) {
410 first_iter->~InternalIterator();
411 }
412 if (merge_iter != nullptr) {
413 merge_iter->~MergingIterator();
414 }
415 }
416
417 void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
418 if (!use_merging_iter && first_iter != nullptr) {
419 merge_iter->AddIterator(first_iter);
420 use_merging_iter = true;
421 first_iter = nullptr;
422 }
423 if (use_merging_iter) {
424 merge_iter->AddIterator(iter);
425 } else {
426 first_iter = iter;
427 }
428 }
429
430 InternalIterator* MergeIteratorBuilder::Finish() {
431 InternalIterator* ret = nullptr;
432 if (!use_merging_iter) {
433 ret = first_iter;
434 first_iter = nullptr;
435 } else {
436 ret = merge_iter;
437 merge_iter = nullptr;
438 }
439 return ret;
440 }
441
442 } // namespace rocksdb