]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "table/merging_iterator.h" | |
11 | #include <string> | |
12 | #include <vector> | |
11fdf7f2 | 13 | #include "db/dbformat.h" |
7c673cae FG |
14 | #include "db/pinned_iterators_manager.h" |
15 | #include "monitoring/perf_context_imp.h" | |
16 | #include "rocksdb/comparator.h" | |
17 | #include "rocksdb/iterator.h" | |
18 | #include "rocksdb/options.h" | |
19 | #include "table/internal_iterator.h" | |
20 | #include "table/iter_heap.h" | |
21 | #include "table/iterator_wrapper.h" | |
22 | #include "util/arena.h" | |
23 | #include "util/autovector.h" | |
24 | #include "util/heap.h" | |
25 | #include "util/stop_watch.h" | |
26 | #include "util/sync_point.h" | |
27 | ||
28 | namespace rocksdb { | |
29 | // Without anonymous namespace here, we fail the warning -Wmissing-prototypes | |
30 | namespace { | |
31 | typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap; | |
32 | typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap; | |
33 | } // namespace | |
34 | ||
35 | const size_t kNumIterReserve = 4; | |
36 | ||
37 | class MergingIterator : public InternalIterator { | |
38 | public: | |
11fdf7f2 TL |
39 | MergingIterator(const InternalKeyComparator* comparator, |
40 | InternalIterator** children, int n, bool is_arena_mode, | |
41 | bool prefix_seek_mode) | |
7c673cae FG |
42 | : is_arena_mode_(is_arena_mode), |
43 | comparator_(comparator), | |
44 | current_(nullptr), | |
45 | direction_(kForward), | |
46 | minHeap_(comparator_), | |
47 | prefix_seek_mode_(prefix_seek_mode), | |
48 | pinned_iters_mgr_(nullptr) { | |
49 | children_.resize(n); | |
50 | for (int i = 0; i < n; i++) { | |
51 | children_[i].Set(children[i]); | |
52 | } | |
53 | for (auto& child : children_) { | |
54 | if (child.Valid()) { | |
11fdf7f2 | 55 | assert(child.status().ok()); |
7c673cae | 56 | minHeap_.push(&child); |
11fdf7f2 TL |
57 | } else { |
58 | considerStatus(child.status()); | |
7c673cae FG |
59 | } |
60 | } | |
61 | current_ = CurrentForward(); | |
62 | } | |
63 | ||
11fdf7f2 TL |
64 | void considerStatus(Status s) { |
65 | if (!s.ok() && status_.ok()) { | |
66 | status_ = s; | |
67 | } | |
68 | } | |
69 | ||
7c673cae FG |
70 | virtual void AddIterator(InternalIterator* iter) { |
71 | assert(direction_ == kForward); | |
72 | children_.emplace_back(iter); | |
73 | if (pinned_iters_mgr_) { | |
74 | iter->SetPinnedItersMgr(pinned_iters_mgr_); | |
75 | } | |
76 | auto new_wrapper = children_.back(); | |
77 | if (new_wrapper.Valid()) { | |
11fdf7f2 | 78 | assert(new_wrapper.status().ok()); |
7c673cae FG |
79 | minHeap_.push(&new_wrapper); |
80 | current_ = CurrentForward(); | |
11fdf7f2 TL |
81 | } else { |
82 | considerStatus(new_wrapper.status()); | |
7c673cae FG |
83 | } |
84 | } | |
85 | ||
494da23a | 86 | ~MergingIterator() override { |
7c673cae FG |
87 | for (auto& child : children_) { |
88 | child.DeleteIter(is_arena_mode_); | |
89 | } | |
90 | } | |
91 | ||
494da23a | 92 | bool Valid() const override { return current_ != nullptr && status_.ok(); } |
11fdf7f2 | 93 | |
494da23a | 94 | Status status() const override { return status_; } |
7c673cae | 95 | |
494da23a | 96 | void SeekToFirst() override { |
7c673cae | 97 | ClearHeaps(); |
11fdf7f2 | 98 | status_ = Status::OK(); |
7c673cae FG |
99 | for (auto& child : children_) { |
100 | child.SeekToFirst(); | |
101 | if (child.Valid()) { | |
11fdf7f2 | 102 | assert(child.status().ok()); |
7c673cae | 103 | minHeap_.push(&child); |
11fdf7f2 TL |
104 | } else { |
105 | considerStatus(child.status()); | |
7c673cae FG |
106 | } |
107 | } | |
108 | direction_ = kForward; | |
109 | current_ = CurrentForward(); | |
110 | } | |
111 | ||
494da23a | 112 | void SeekToLast() override { |
7c673cae FG |
113 | ClearHeaps(); |
114 | InitMaxHeap(); | |
11fdf7f2 | 115 | status_ = Status::OK(); |
7c673cae FG |
116 | for (auto& child : children_) { |
117 | child.SeekToLast(); | |
118 | if (child.Valid()) { | |
11fdf7f2 | 119 | assert(child.status().ok()); |
7c673cae | 120 | maxHeap_->push(&child); |
11fdf7f2 TL |
121 | } else { |
122 | considerStatus(child.status()); | |
7c673cae FG |
123 | } |
124 | } | |
125 | direction_ = kReverse; | |
126 | current_ = CurrentReverse(); | |
127 | } | |
128 | ||
494da23a | 129 | void Seek(const Slice& target) override { |
7c673cae | 130 | ClearHeaps(); |
11fdf7f2 | 131 | status_ = Status::OK(); |
7c673cae FG |
132 | for (auto& child : children_) { |
133 | { | |
134 | PERF_TIMER_GUARD(seek_child_seek_time); | |
135 | child.Seek(target); | |
136 | } | |
137 | PERF_COUNTER_ADD(seek_child_seek_count, 1); | |
138 | ||
139 | if (child.Valid()) { | |
11fdf7f2 | 140 | assert(child.status().ok()); |
7c673cae FG |
141 | PERF_TIMER_GUARD(seek_min_heap_time); |
142 | minHeap_.push(&child); | |
11fdf7f2 TL |
143 | } else { |
144 | considerStatus(child.status()); | |
7c673cae FG |
145 | } |
146 | } | |
147 | direction_ = kForward; | |
148 | { | |
149 | PERF_TIMER_GUARD(seek_min_heap_time); | |
150 | current_ = CurrentForward(); | |
151 | } | |
152 | } | |
153 | ||
494da23a | 154 | void SeekForPrev(const Slice& target) override { |
7c673cae FG |
155 | ClearHeaps(); |
156 | InitMaxHeap(); | |
11fdf7f2 | 157 | status_ = Status::OK(); |
7c673cae FG |
158 | |
159 | for (auto& child : children_) { | |
160 | { | |
161 | PERF_TIMER_GUARD(seek_child_seek_time); | |
162 | child.SeekForPrev(target); | |
163 | } | |
164 | PERF_COUNTER_ADD(seek_child_seek_count, 1); | |
165 | ||
166 | if (child.Valid()) { | |
11fdf7f2 | 167 | assert(child.status().ok()); |
7c673cae FG |
168 | PERF_TIMER_GUARD(seek_max_heap_time); |
169 | maxHeap_->push(&child); | |
11fdf7f2 TL |
170 | } else { |
171 | considerStatus(child.status()); | |
7c673cae FG |
172 | } |
173 | } | |
174 | direction_ = kReverse; | |
175 | { | |
176 | PERF_TIMER_GUARD(seek_max_heap_time); | |
177 | current_ = CurrentReverse(); | |
178 | } | |
179 | } | |
180 | ||
494da23a | 181 | void Next() override { |
7c673cae FG |
182 | assert(Valid()); |
183 | ||
184 | // Ensure that all children are positioned after key(). | |
185 | // If we are moving in the forward direction, it is already | |
186 | // true for all of the non-current children since current_ is | |
187 | // the smallest child and key() == current_->key(). | |
188 | if (direction_ != kForward) { | |
11fdf7f2 | 189 | SwitchToForward(); |
7c673cae FG |
190 | // The loop advanced all non-current children to be > key() so current_ |
191 | // should still be strictly the smallest key. | |
192 | assert(current_ == CurrentForward()); | |
193 | } | |
194 | ||
195 | // For the heap modifications below to be correct, current_ must be the | |
196 | // current top of the heap. | |
197 | assert(current_ == CurrentForward()); | |
198 | ||
199 | // as the current points to the current record. move the iterator forward. | |
200 | current_->Next(); | |
201 | if (current_->Valid()) { | |
202 | // current is still valid after the Next() call above. Call | |
203 | // replace_top() to restore the heap property. When the same child | |
204 | // iterator yields a sequence of keys, this is cheap. | |
11fdf7f2 | 205 | assert(current_->status().ok()); |
7c673cae FG |
206 | minHeap_.replace_top(current_); |
207 | } else { | |
208 | // current stopped being valid, remove it from the heap. | |
11fdf7f2 | 209 | considerStatus(current_->status()); |
7c673cae FG |
210 | minHeap_.pop(); |
211 | } | |
212 | current_ = CurrentForward(); | |
213 | } | |
214 | ||
494da23a | 215 | void Prev() override { |
7c673cae FG |
216 | assert(Valid()); |
217 | // Ensure that all children are positioned before key(). | |
218 | // If we are moving in the reverse direction, it is already | |
219 | // true for all of the non-current children since current_ is | |
220 | // the largest child and key() == current_->key(). | |
221 | if (direction_ != kReverse) { | |
222 | // Otherwise, retreat the non-current children. We retreat current_ | |
223 | // just after the if-block. | |
224 | ClearHeaps(); | |
225 | InitMaxHeap(); | |
11fdf7f2 | 226 | Slice target = key(); |
7c673cae FG |
227 | for (auto& child : children_) { |
228 | if (&child != current_) { | |
11fdf7f2 TL |
229 | child.SeekForPrev(target); |
230 | TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); | |
231 | considerStatus(child.status()); | |
232 | if (child.Valid() && comparator_->Equal(target, child.key())) { | |
233 | child.Prev(); | |
234 | considerStatus(child.status()); | |
7c673cae FG |
235 | } |
236 | } | |
237 | if (child.Valid()) { | |
11fdf7f2 | 238 | assert(child.status().ok()); |
7c673cae FG |
239 | maxHeap_->push(&child); |
240 | } | |
241 | } | |
242 | direction_ = kReverse; | |
243 | if (!prefix_seek_mode_) { | |
244 | // Note that we don't do assert(current_ == CurrentReverse()) here | |
245 | // because it is possible to have some keys larger than the seek-key | |
246 | // inserted between Seek() and SeekToLast(), which makes current_ not | |
247 | // equal to CurrentReverse(). | |
248 | current_ = CurrentReverse(); | |
249 | } | |
250 | // The loop advanced all non-current children to be < key() so current_ | |
251 | // should still be strictly the smallest key. | |
252 | assert(current_ == CurrentReverse()); | |
253 | } | |
254 | ||
255 | // For the heap modifications below to be correct, current_ must be the | |
256 | // current top of the heap. | |
257 | assert(current_ == CurrentReverse()); | |
258 | ||
259 | current_->Prev(); | |
260 | if (current_->Valid()) { | |
261 | // current is still valid after the Prev() call above. Call | |
262 | // replace_top() to restore the heap property. When the same child | |
263 | // iterator yields a sequence of keys, this is cheap. | |
11fdf7f2 | 264 | assert(current_->status().ok()); |
7c673cae FG |
265 | maxHeap_->replace_top(current_); |
266 | } else { | |
267 | // current stopped being valid, remove it from the heap. | |
11fdf7f2 | 268 | considerStatus(current_->status()); |
7c673cae FG |
269 | maxHeap_->pop(); |
270 | } | |
271 | current_ = CurrentReverse(); | |
272 | } | |
273 | ||
494da23a | 274 | Slice key() const override { |
7c673cae FG |
275 | assert(Valid()); |
276 | return current_->key(); | |
277 | } | |
278 | ||
494da23a | 279 | Slice value() const override { |
7c673cae FG |
280 | assert(Valid()); |
281 | return current_->value(); | |
282 | } | |
283 | ||
494da23a | 284 | void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { |
7c673cae FG |
285 | pinned_iters_mgr_ = pinned_iters_mgr; |
286 | for (auto& child : children_) { | |
287 | child.SetPinnedItersMgr(pinned_iters_mgr); | |
288 | } | |
289 | } | |
290 | ||
494da23a | 291 | bool IsKeyPinned() const override { |
7c673cae FG |
292 | assert(Valid()); |
293 | return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && | |
294 | current_->IsKeyPinned(); | |
295 | } | |
296 | ||
494da23a | 297 | bool IsValuePinned() const override { |
7c673cae FG |
298 | assert(Valid()); |
299 | return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && | |
300 | current_->IsValuePinned(); | |
301 | } | |
302 | ||
303 | private: | |
304 | // Clears heaps for both directions, used when changing direction or seeking | |
305 | void ClearHeaps(); | |
306 | // Ensures that maxHeap_ is initialized when starting to go in the reverse | |
307 | // direction | |
308 | void InitMaxHeap(); | |
309 | ||
310 | bool is_arena_mode_; | |
11fdf7f2 | 311 | const InternalKeyComparator* comparator_; |
7c673cae FG |
312 | autovector<IteratorWrapper, kNumIterReserve> children_; |
313 | ||
314 | // Cached pointer to child iterator with the current key, or nullptr if no | |
315 | // child iterators are valid. This is the top of minHeap_ or maxHeap_ | |
316 | // depending on the direction. | |
317 | IteratorWrapper* current_; | |
11fdf7f2 TL |
318 | // If any of the children have non-ok status, this is one of them. |
319 | Status status_; | |
7c673cae FG |
320 | // Which direction is the iterator moving? |
321 | enum Direction { | |
322 | kForward, | |
323 | kReverse | |
324 | }; | |
325 | Direction direction_; | |
326 | MergerMinIterHeap minHeap_; | |
327 | bool prefix_seek_mode_; | |
328 | ||
329 | // Max heap is used for reverse iteration, which is way less common than | |
330 | // forward. Lazily initialize it to save memory. | |
331 | std::unique_ptr<MergerMaxIterHeap> maxHeap_; | |
332 | PinnedIteratorsManager* pinned_iters_mgr_; | |
333 | ||
11fdf7f2 TL |
334 | void SwitchToForward(); |
335 | ||
7c673cae FG |
336 | IteratorWrapper* CurrentForward() const { |
337 | assert(direction_ == kForward); | |
338 | return !minHeap_.empty() ? minHeap_.top() : nullptr; | |
339 | } | |
340 | ||
341 | IteratorWrapper* CurrentReverse() const { | |
342 | assert(direction_ == kReverse); | |
343 | assert(maxHeap_); | |
344 | return !maxHeap_->empty() ? maxHeap_->top() : nullptr; | |
345 | } | |
346 | }; | |
347 | ||
11fdf7f2 TL |
348 | void MergingIterator::SwitchToForward() { |
349 | // Otherwise, advance the non-current children. We advance current_ | |
350 | // just after the if-block. | |
351 | ClearHeaps(); | |
352 | Slice target = key(); | |
353 | for (auto& child : children_) { | |
354 | if (&child != current_) { | |
355 | child.Seek(target); | |
356 | considerStatus(child.status()); | |
357 | if (child.Valid() && comparator_->Equal(target, child.key())) { | |
358 | child.Next(); | |
359 | considerStatus(child.status()); | |
360 | } | |
361 | } | |
362 | if (child.Valid()) { | |
363 | minHeap_.push(&child); | |
364 | } | |
365 | } | |
366 | direction_ = kForward; | |
367 | } | |
368 | ||
7c673cae FG |
369 | void MergingIterator::ClearHeaps() { |
370 | minHeap_.clear(); | |
371 | if (maxHeap_) { | |
372 | maxHeap_->clear(); | |
373 | } | |
374 | } | |
375 | ||
376 | void MergingIterator::InitMaxHeap() { | |
377 | if (!maxHeap_) { | |
378 | maxHeap_.reset(new MergerMaxIterHeap(comparator_)); | |
379 | } | |
380 | } | |
381 | ||
11fdf7f2 | 382 | InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp, |
7c673cae FG |
383 | InternalIterator** list, int n, |
384 | Arena* arena, bool prefix_seek_mode) { | |
385 | assert(n >= 0); | |
386 | if (n == 0) { | |
11fdf7f2 | 387 | return NewEmptyInternalIterator<Slice>(arena); |
7c673cae FG |
388 | } else if (n == 1) { |
389 | return list[0]; | |
390 | } else { | |
391 | if (arena == nullptr) { | |
392 | return new MergingIterator(cmp, list, n, false, prefix_seek_mode); | |
393 | } else { | |
394 | auto mem = arena->AllocateAligned(sizeof(MergingIterator)); | |
395 | return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode); | |
396 | } | |
397 | } | |
398 | } | |
399 | ||
11fdf7f2 TL |
400 | MergeIteratorBuilder::MergeIteratorBuilder( |
401 | const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode) | |
7c673cae FG |
402 | : first_iter(nullptr), use_merging_iter(false), arena(a) { |
403 | auto mem = arena->AllocateAligned(sizeof(MergingIterator)); | |
404 | merge_iter = | |
405 | new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode); | |
406 | } | |
407 | ||
11fdf7f2 TL |
408 | MergeIteratorBuilder::~MergeIteratorBuilder() { |
409 | if (first_iter != nullptr) { | |
410 | first_iter->~InternalIterator(); | |
411 | } | |
412 | if (merge_iter != nullptr) { | |
413 | merge_iter->~MergingIterator(); | |
414 | } | |
415 | } | |
416 | ||
7c673cae FG |
417 | void MergeIteratorBuilder::AddIterator(InternalIterator* iter) { |
418 | if (!use_merging_iter && first_iter != nullptr) { | |
419 | merge_iter->AddIterator(first_iter); | |
420 | use_merging_iter = true; | |
11fdf7f2 | 421 | first_iter = nullptr; |
7c673cae FG |
422 | } |
423 | if (use_merging_iter) { | |
424 | merge_iter->AddIterator(iter); | |
425 | } else { | |
426 | first_iter = iter; | |
427 | } | |
428 | } | |
429 | ||
430 | InternalIterator* MergeIteratorBuilder::Finish() { | |
11fdf7f2 | 431 | InternalIterator* ret = nullptr; |
7c673cae | 432 | if (!use_merging_iter) { |
11fdf7f2 TL |
433 | ret = first_iter; |
434 | first_iter = nullptr; | |
7c673cae | 435 | } else { |
11fdf7f2 | 436 | ret = merge_iter; |
7c673cae | 437 | merge_iter = nullptr; |
7c673cae | 438 | } |
11fdf7f2 | 439 | return ret; |
7c673cae FG |
440 | } |
441 | ||
442 | } // namespace rocksdb |