]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/table/merging_iterator.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / table / merging_iterator.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/merging_iterator.h"
11 #include <string>
12 #include <vector>
13 #include "db/pinned_iterators_manager.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "rocksdb/comparator.h"
16 #include "rocksdb/iterator.h"
17 #include "rocksdb/options.h"
18 #include "table/internal_iterator.h"
19 #include "table/iter_heap.h"
20 #include "table/iterator_wrapper.h"
21 #include "util/arena.h"
22 #include "util/autovector.h"
23 #include "util/heap.h"
24 #include "util/stop_watch.h"
25 #include "util/sync_point.h"
26
27 namespace rocksdb {
28 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
29 namespace {
30 typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
31 typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
32 } // namespace
33
34 const size_t kNumIterReserve = 4;
35
36 class MergingIterator : public InternalIterator {
37 public:
38 MergingIterator(const Comparator* comparator, InternalIterator** children,
39 int n, bool is_arena_mode, bool prefix_seek_mode)
40 : is_arena_mode_(is_arena_mode),
41 comparator_(comparator),
42 current_(nullptr),
43 direction_(kForward),
44 minHeap_(comparator_),
45 prefix_seek_mode_(prefix_seek_mode),
46 pinned_iters_mgr_(nullptr) {
47 children_.resize(n);
48 for (int i = 0; i < n; i++) {
49 children_[i].Set(children[i]);
50 }
51 for (auto& child : children_) {
52 if (child.Valid()) {
53 minHeap_.push(&child);
54 }
55 }
56 current_ = CurrentForward();
57 }
58
59 virtual void AddIterator(InternalIterator* iter) {
60 assert(direction_ == kForward);
61 children_.emplace_back(iter);
62 if (pinned_iters_mgr_) {
63 iter->SetPinnedItersMgr(pinned_iters_mgr_);
64 }
65 auto new_wrapper = children_.back();
66 if (new_wrapper.Valid()) {
67 minHeap_.push(&new_wrapper);
68 current_ = CurrentForward();
69 }
70 }
71
72 virtual ~MergingIterator() {
73 for (auto& child : children_) {
74 child.DeleteIter(is_arena_mode_);
75 }
76 }
77
78 virtual bool Valid() const override { return (current_ != nullptr); }
79
80 virtual void SeekToFirst() override {
81 ClearHeaps();
82 for (auto& child : children_) {
83 child.SeekToFirst();
84 if (child.Valid()) {
85 minHeap_.push(&child);
86 }
87 }
88 direction_ = kForward;
89 current_ = CurrentForward();
90 }
91
92 virtual void SeekToLast() override {
93 ClearHeaps();
94 InitMaxHeap();
95 for (auto& child : children_) {
96 child.SeekToLast();
97 if (child.Valid()) {
98 maxHeap_->push(&child);
99 }
100 }
101 direction_ = kReverse;
102 current_ = CurrentReverse();
103 }
104
105 virtual void Seek(const Slice& target) override {
106 ClearHeaps();
107 for (auto& child : children_) {
108 {
109 PERF_TIMER_GUARD(seek_child_seek_time);
110 child.Seek(target);
111 }
112 PERF_COUNTER_ADD(seek_child_seek_count, 1);
113
114 if (child.Valid()) {
115 PERF_TIMER_GUARD(seek_min_heap_time);
116 minHeap_.push(&child);
117 }
118 }
119 direction_ = kForward;
120 {
121 PERF_TIMER_GUARD(seek_min_heap_time);
122 current_ = CurrentForward();
123 }
124 }
125
126 virtual void SeekForPrev(const Slice& target) override {
127 ClearHeaps();
128 InitMaxHeap();
129
130 for (auto& child : children_) {
131 {
132 PERF_TIMER_GUARD(seek_child_seek_time);
133 child.SeekForPrev(target);
134 }
135 PERF_COUNTER_ADD(seek_child_seek_count, 1);
136
137 if (child.Valid()) {
138 PERF_TIMER_GUARD(seek_max_heap_time);
139 maxHeap_->push(&child);
140 }
141 }
142 direction_ = kReverse;
143 {
144 PERF_TIMER_GUARD(seek_max_heap_time);
145 current_ = CurrentReverse();
146 }
147 }
148
149 virtual void Next() override {
150 assert(Valid());
151
152 // Ensure that all children are positioned after key().
153 // If we are moving in the forward direction, it is already
154 // true for all of the non-current children since current_ is
155 // the smallest child and key() == current_->key().
156 if (direction_ != kForward) {
157 // Otherwise, advance the non-current children. We advance current_
158 // just after the if-block.
159 ClearHeaps();
160 for (auto& child : children_) {
161 if (&child != current_) {
162 child.Seek(key());
163 if (child.Valid() && comparator_->Equal(key(), child.key())) {
164 child.Next();
165 }
166 }
167 if (child.Valid()) {
168 minHeap_.push(&child);
169 }
170 }
171 direction_ = kForward;
172 // The loop advanced all non-current children to be > key() so current_
173 // should still be strictly the smallest key.
174 assert(current_ == CurrentForward());
175 }
176
177 // For the heap modifications below to be correct, current_ must be the
178 // current top of the heap.
179 assert(current_ == CurrentForward());
180
181 // as the current points to the current record. move the iterator forward.
182 current_->Next();
183 if (current_->Valid()) {
184 // current is still valid after the Next() call above. Call
185 // replace_top() to restore the heap property. When the same child
186 // iterator yields a sequence of keys, this is cheap.
187 minHeap_.replace_top(current_);
188 } else {
189 // current stopped being valid, remove it from the heap.
190 minHeap_.pop();
191 }
192 current_ = CurrentForward();
193 }
194
195 virtual void Prev() override {
196 assert(Valid());
197 // Ensure that all children are positioned before key().
198 // If we are moving in the reverse direction, it is already
199 // true for all of the non-current children since current_ is
200 // the largest child and key() == current_->key().
201 if (direction_ != kReverse) {
202 // Otherwise, retreat the non-current children. We retreat current_
203 // just after the if-block.
204 ClearHeaps();
205 InitMaxHeap();
206 for (auto& child : children_) {
207 if (&child != current_) {
208 if (!prefix_seek_mode_) {
209 child.Seek(key());
210 if (child.Valid()) {
211 // Child is at first entry >= key(). Step back one to be < key()
212 TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev",
213 &child);
214 child.Prev();
215 } else {
216 // Child has no entries >= key(). Position at last entry.
217 TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast");
218 child.SeekToLast();
219 }
220 } else {
221 child.SeekForPrev(key());
222 if (child.Valid() && comparator_->Equal(key(), child.key())) {
223 child.Prev();
224 }
225 }
226 }
227 if (child.Valid()) {
228 maxHeap_->push(&child);
229 }
230 }
231 direction_ = kReverse;
232 if (!prefix_seek_mode_) {
233 // Note that we don't do assert(current_ == CurrentReverse()) here
234 // because it is possible to have some keys larger than the seek-key
235 // inserted between Seek() and SeekToLast(), which makes current_ not
236 // equal to CurrentReverse().
237 current_ = CurrentReverse();
238 }
239 // The loop advanced all non-current children to be < key() so current_
240 // should still be strictly the smallest key.
241 assert(current_ == CurrentReverse());
242 }
243
244 // For the heap modifications below to be correct, current_ must be the
245 // current top of the heap.
246 assert(current_ == CurrentReverse());
247
248 current_->Prev();
249 if (current_->Valid()) {
250 // current is still valid after the Prev() call above. Call
251 // replace_top() to restore the heap property. When the same child
252 // iterator yields a sequence of keys, this is cheap.
253 maxHeap_->replace_top(current_);
254 } else {
255 // current stopped being valid, remove it from the heap.
256 maxHeap_->pop();
257 }
258 current_ = CurrentReverse();
259 }
260
261 virtual Slice key() const override {
262 assert(Valid());
263 return current_->key();
264 }
265
266 virtual Slice value() const override {
267 assert(Valid());
268 return current_->value();
269 }
270
271 virtual Status status() const override {
272 Status s;
273 for (auto& child : children_) {
274 s = child.status();
275 if (!s.ok()) {
276 break;
277 }
278 }
279 return s;
280 }
281
282 virtual void SetPinnedItersMgr(
283 PinnedIteratorsManager* pinned_iters_mgr) override {
284 pinned_iters_mgr_ = pinned_iters_mgr;
285 for (auto& child : children_) {
286 child.SetPinnedItersMgr(pinned_iters_mgr);
287 }
288 }
289
290 virtual bool IsKeyPinned() const override {
291 assert(Valid());
292 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
293 current_->IsKeyPinned();
294 }
295
296 virtual bool IsValuePinned() const override {
297 assert(Valid());
298 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
299 current_->IsValuePinned();
300 }
301
302 private:
303 // Clears heaps for both directions, used when changing direction or seeking
304 void ClearHeaps();
305 // Ensures that maxHeap_ is initialized when starting to go in the reverse
306 // direction
307 void InitMaxHeap();
308
309 bool is_arena_mode_;
310 const Comparator* comparator_;
311 autovector<IteratorWrapper, kNumIterReserve> children_;
312
313 // Cached pointer to child iterator with the current key, or nullptr if no
314 // child iterators are valid. This is the top of minHeap_ or maxHeap_
315 // depending on the direction.
316 IteratorWrapper* current_;
317 // Which direction is the iterator moving?
318 enum Direction {
319 kForward,
320 kReverse
321 };
322 Direction direction_;
323 MergerMinIterHeap minHeap_;
324 bool prefix_seek_mode_;
325
326 // Max heap is used for reverse iteration, which is way less common than
327 // forward. Lazily initialize it to save memory.
328 std::unique_ptr<MergerMaxIterHeap> maxHeap_;
329 PinnedIteratorsManager* pinned_iters_mgr_;
330
331 IteratorWrapper* CurrentForward() const {
332 assert(direction_ == kForward);
333 return !minHeap_.empty() ? minHeap_.top() : nullptr;
334 }
335
336 IteratorWrapper* CurrentReverse() const {
337 assert(direction_ == kReverse);
338 assert(maxHeap_);
339 return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
340 }
341 };
342
343 void MergingIterator::ClearHeaps() {
344 minHeap_.clear();
345 if (maxHeap_) {
346 maxHeap_->clear();
347 }
348 }
349
350 void MergingIterator::InitMaxHeap() {
351 if (!maxHeap_) {
352 maxHeap_.reset(new MergerMaxIterHeap(comparator_));
353 }
354 }
355
356 InternalIterator* NewMergingIterator(const Comparator* cmp,
357 InternalIterator** list, int n,
358 Arena* arena, bool prefix_seek_mode) {
359 assert(n >= 0);
360 if (n == 0) {
361 return NewEmptyInternalIterator(arena);
362 } else if (n == 1) {
363 return list[0];
364 } else {
365 if (arena == nullptr) {
366 return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
367 } else {
368 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
369 return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
370 }
371 }
372 }
373
374 MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator,
375 Arena* a, bool prefix_seek_mode)
376 : first_iter(nullptr), use_merging_iter(false), arena(a) {
377 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
378 merge_iter =
379 new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
380 }
381
382 void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
383 if (!use_merging_iter && first_iter != nullptr) {
384 merge_iter->AddIterator(first_iter);
385 use_merging_iter = true;
386 }
387 if (use_merging_iter) {
388 merge_iter->AddIterator(iter);
389 } else {
390 first_iter = iter;
391 }
392 }
393
394 InternalIterator* MergeIteratorBuilder::Finish() {
395 if (!use_merging_iter) {
396 return first_iter;
397 } else {
398 auto ret = merge_iter;
399 merge_iter = nullptr;
400 return ret;
401 }
402 }
403
404 } // namespace rocksdb