]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/forward_iterator.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / forward_iterator.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7 #include "db/forward_iterator.h"
8
9 #include <limits>
10 #include <string>
11 #include <utility>
12
13 #include "db/column_family.h"
14 #include "db/db_impl/db_impl.h"
15 #include "db/db_iter.h"
16 #include "db/dbformat.h"
17 #include "db/job_context.h"
18 #include "db/range_del_aggregator.h"
19 #include "db/range_tombstone_fragmenter.h"
20 #include "rocksdb/env.h"
21 #include "rocksdb/slice.h"
22 #include "rocksdb/slice_transform.h"
23 #include "table/merging_iterator.h"
24 #include "test_util/sync_point.h"
25 #include "util/string_util.h"
26
27 namespace ROCKSDB_NAMESPACE {
28
29 // Usage:
30 // ForwardLevelIterator iter;
31 // iter.SetFileIndex(file_index);
32 // iter.Seek(target); // or iter.SeekToFirst();
33 // iter.Next()
34 class ForwardLevelIterator : public InternalIterator {
35 public:
36 ForwardLevelIterator(
37 const ColumnFamilyData* const cfd, const ReadOptions& read_options,
38 const std::vector<FileMetaData*>& files,
39 const std::shared_ptr<const SliceTransform>& prefix_extractor,
40 bool allow_unprepared_value)
41 : cfd_(cfd),
42 read_options_(read_options),
43 files_(files),
44 valid_(false),
45 file_index_(std::numeric_limits<uint32_t>::max()),
46 file_iter_(nullptr),
47 pinned_iters_mgr_(nullptr),
48 prefix_extractor_(prefix_extractor),
49 allow_unprepared_value_(allow_unprepared_value) {
50 status_.PermitUncheckedError(); // Allow uninitialized status through
51 }
52
53 ~ForwardLevelIterator() override {
54 // Reset current pointer
55 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
56 pinned_iters_mgr_->PinIterator(file_iter_);
57 } else {
58 delete file_iter_;
59 }
60 }
61
62 void SetFileIndex(uint32_t file_index) {
63 assert(file_index < files_.size());
64 status_ = Status::OK();
65 if (file_index != file_index_) {
66 file_index_ = file_index;
67 Reset();
68 }
69 }
70 void Reset() {
71 assert(file_index_ < files_.size());
72
73 // Reset current pointer
74 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
75 pinned_iters_mgr_->PinIterator(file_iter_);
76 } else {
77 delete file_iter_;
78 }
79
80 ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
81 kMaxSequenceNumber /* upper_bound */);
82 file_iter_ = cfd_->table_cache()->NewIterator(
83 read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
84 *files_[file_index_],
85 read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
86 prefix_extractor_, /*table_reader_ptr=*/nullptr,
87 /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
88 /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
89 /*max_file_size_for_l0_meta_pin=*/0,
90 /*smallest_compaction_key=*/nullptr,
91 /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
92 file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
93 valid_ = false;
94 if (!range_del_agg.IsEmpty()) {
95 status_ = Status::NotSupported(
96 "Range tombstones unsupported with ForwardIterator");
97 }
98 }
99 void SeekToLast() override {
100 status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()");
101 valid_ = false;
102 }
103 void Prev() override {
104 status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
105 valid_ = false;
106 }
107 bool Valid() const override { return valid_; }
108 void SeekToFirst() override {
109 assert(file_iter_ != nullptr);
110 if (!status_.ok()) {
111 assert(!valid_);
112 return;
113 }
114 file_iter_->SeekToFirst();
115 valid_ = file_iter_->Valid();
116 }
117 void Seek(const Slice& internal_key) override {
118 assert(file_iter_ != nullptr);
119
120 // This deviates from the usual convention for InternalIterator::Seek() in
121 // that it doesn't discard pre-existing error status. That's because this
122 // Seek() is only supposed to be called immediately after SetFileIndex()
123 // (which discards pre-existing error status), and SetFileIndex() may set
124 // an error status, which we shouldn't discard.
125 if (!status_.ok()) {
126 assert(!valid_);
127 return;
128 }
129
130 file_iter_->Seek(internal_key);
131 valid_ = file_iter_->Valid();
132 }
133 void SeekForPrev(const Slice& /*internal_key*/) override {
134 status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
135 valid_ = false;
136 }
137 void Next() override {
138 assert(valid_);
139 file_iter_->Next();
140 for (;;) {
141 valid_ = file_iter_->Valid();
142 if (!file_iter_->status().ok()) {
143 assert(!valid_);
144 return;
145 }
146 if (valid_) {
147 return;
148 }
149 if (file_index_ + 1 >= files_.size()) {
150 valid_ = false;
151 return;
152 }
153 SetFileIndex(file_index_ + 1);
154 if (!status_.ok()) {
155 assert(!valid_);
156 return;
157 }
158 file_iter_->SeekToFirst();
159 }
160 }
161 Slice key() const override {
162 assert(valid_);
163 return file_iter_->key();
164 }
165 Slice value() const override {
166 assert(valid_);
167 return file_iter_->value();
168 }
169 Status status() const override {
170 if (!status_.ok()) {
171 return status_;
172 } else if (file_iter_) {
173 return file_iter_->status();
174 }
175 return Status::OK();
176 }
177 bool PrepareValue() override {
178 assert(valid_);
179 if (file_iter_->PrepareValue()) {
180 return true;
181 }
182
183 assert(!file_iter_->Valid());
184 valid_ = false;
185 return false;
186 }
187 bool IsKeyPinned() const override {
188 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
189 file_iter_->IsKeyPinned();
190 }
191 bool IsValuePinned() const override {
192 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
193 file_iter_->IsValuePinned();
194 }
195 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
196 pinned_iters_mgr_ = pinned_iters_mgr;
197 if (file_iter_) {
198 file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
199 }
200 }
201
202 private:
203 const ColumnFamilyData* const cfd_;
204 const ReadOptions& read_options_;
205 const std::vector<FileMetaData*>& files_;
206
207 bool valid_;
208 uint32_t file_index_;
209 Status status_;
210 InternalIterator* file_iter_;
211 PinnedIteratorsManager* pinned_iters_mgr_;
212 // Kept alive by ForwardIterator::sv_->mutable_cf_options
213 const std::shared_ptr<const SliceTransform>& prefix_extractor_;
214 const bool allow_unprepared_value_;
215 };
216
217 ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
218 ColumnFamilyData* cfd,
219 SuperVersion* current_sv,
220 bool allow_unprepared_value)
221 : db_(db),
222 read_options_(read_options),
223 cfd_(cfd),
224 prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()),
225 user_comparator_(cfd->user_comparator()),
226 allow_unprepared_value_(allow_unprepared_value),
227 immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
228 sv_(current_sv),
229 mutable_iter_(nullptr),
230 current_(nullptr),
231 valid_(false),
232 status_(Status::OK()),
233 immutable_status_(Status::OK()),
234 has_iter_trimmed_for_upper_bound_(false),
235 current_over_upper_bound_(false),
236 is_prev_set_(false),
237 is_prev_inclusive_(false),
238 pinned_iters_mgr_(nullptr) {
239 if (sv_) {
240 RebuildIterators(false);
241 }
242
243 // immutable_status_ is a local aggregation of the
244 // status of the immutable Iterators.
245 // We have to PermitUncheckedError in case it is never
246 // used, otherwise it will fail ASSERT_STATUS_CHECKED.
247 immutable_status_.PermitUncheckedError();
248 }
249
250 ForwardIterator::~ForwardIterator() { Cleanup(true); }
251
252 void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
253 bool background_purge_on_iterator_cleanup) {
254 if (sv->Unref()) {
255 // Job id == 0 means that this is not our background process, but rather
256 // user thread
257 JobContext job_context(0);
258 db->mutex_.Lock();
259 sv->Cleanup();
260 db->FindObsoleteFiles(&job_context, false, true);
261 if (background_purge_on_iterator_cleanup) {
262 db->ScheduleBgLogWriterClose(&job_context);
263 db->AddSuperVersionsToFreeQueue(sv);
264 db->SchedulePurge();
265 }
266 db->mutex_.Unlock();
267 if (!background_purge_on_iterator_cleanup) {
268 delete sv;
269 }
270 if (job_context.HaveSomethingToDelete()) {
271 db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
272 }
273 job_context.Clean();
274 }
275 }
276
277 namespace {
278 struct SVCleanupParams {
279 DBImpl* db;
280 SuperVersion* sv;
281 bool background_purge_on_iterator_cleanup;
282 };
283 } // anonymous namespace
284
285 // Used in PinnedIteratorsManager to release pinned SuperVersion
286 void ForwardIterator::DeferredSVCleanup(void* arg) {
287 auto d = reinterpret_cast<SVCleanupParams*>(arg);
288 ForwardIterator::SVCleanup(d->db, d->sv,
289 d->background_purge_on_iterator_cleanup);
290 delete d;
291 }
292
293 void ForwardIterator::SVCleanup() {
294 if (sv_ == nullptr) {
295 return;
296 }
297 bool background_purge =
298 read_options_.background_purge_on_iterator_cleanup ||
299 db_->immutable_db_options().avoid_unnecessary_blocking_io;
300 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
301 // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
302 // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
303 // The slices may point into some memtables owned by sv_, so we need to keep
304 // sv_ referenced until pinned_iters_mgr_ unpins everything.
305 auto p = new SVCleanupParams{db_, sv_, background_purge};
306 pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
307 } else {
308 SVCleanup(db_, sv_, background_purge);
309 }
310 }
311
312 void ForwardIterator::Cleanup(bool release_sv) {
313 if (mutable_iter_ != nullptr) {
314 DeleteIterator(mutable_iter_, true /* is_arena */);
315 }
316
317 for (auto* m : imm_iters_) {
318 DeleteIterator(m, true /* is_arena */);
319 }
320 imm_iters_.clear();
321
322 for (auto* f : l0_iters_) {
323 DeleteIterator(f);
324 }
325 l0_iters_.clear();
326
327 for (auto* l : level_iters_) {
328 DeleteIterator(l);
329 }
330 level_iters_.clear();
331
332 if (release_sv) {
333 SVCleanup();
334 }
335 }
336
337 bool ForwardIterator::Valid() const {
338 // See UpdateCurrent().
339 return valid_ ? !current_over_upper_bound_ : false;
340 }
341
342 void ForwardIterator::SeekToFirst() {
343 if (sv_ == nullptr) {
344 RebuildIterators(true);
345 } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
346 RenewIterators();
347 } else if (immutable_status_.IsIncomplete()) {
348 ResetIncompleteIterators();
349 }
350 SeekInternal(Slice(), true, false);
351 }
352
353 bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
354 return !(read_options_.iterate_upper_bound == nullptr ||
355 cfd_->internal_comparator().user_comparator()->Compare(
356 ExtractUserKey(internal_key),
357 *read_options_.iterate_upper_bound) < 0);
358 }
359
360 void ForwardIterator::Seek(const Slice& internal_key) {
361 if (sv_ == nullptr) {
362 RebuildIterators(true);
363 } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
364 RenewIterators();
365 } else if (immutable_status_.IsIncomplete()) {
366 ResetIncompleteIterators();
367 }
368
369 SeekInternal(internal_key, false, false);
370 if (read_options_.async_io) {
371 SeekInternal(internal_key, false, true);
372 }
373 }
374
375 // In case of async_io, SeekInternal is called twice with seek_after_async_io
376 // enabled in second call which only does seeking part to retrieve the blocks.
377 void ForwardIterator::SeekInternal(const Slice& internal_key,
378 bool seek_to_first,
379 bool seek_after_async_io) {
380 assert(mutable_iter_);
381 // mutable
382 if (!seek_after_async_io) {
383 seek_to_first ? mutable_iter_->SeekToFirst()
384 : mutable_iter_->Seek(internal_key);
385 }
386
387 // immutable
388 // TODO(ljin): NeedToSeekImmutable has negative impact on performance
389 // if it turns to need to seek immutable often. We probably want to have
390 // an option to turn it off.
391 if (seek_to_first || seek_after_async_io ||
392 NeedToSeekImmutable(internal_key)) {
393 if (!seek_after_async_io) {
394 immutable_status_ = Status::OK();
395 if (has_iter_trimmed_for_upper_bound_ &&
396 (
397 // prev_ is not set yet
398 is_prev_set_ == false ||
399 // We are doing SeekToFirst() and internal_key.size() = 0
400 seek_to_first ||
401 // prev_key_ > internal_key
402 cfd_->internal_comparator().InternalKeyComparator::Compare(
403 prev_key_.GetInternalKey(), internal_key) > 0)) {
404 // Some iterators are trimmed. Need to rebuild.
405 RebuildIterators(true);
406 // Already seeked mutable iter, so seek again
407 seek_to_first ? mutable_iter_->SeekToFirst()
408 : mutable_iter_->Seek(internal_key);
409 }
410 {
411 auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
412 immutable_min_heap_.swap(tmp);
413 }
414 for (size_t i = 0; i < imm_iters_.size(); i++) {
415 auto* m = imm_iters_[i];
416 seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
417 if (!m->status().ok()) {
418 immutable_status_ = m->status();
419 } else if (m->Valid()) {
420 immutable_min_heap_.push(m);
421 }
422 }
423 }
424
425 Slice target_user_key;
426 if (!seek_to_first) {
427 target_user_key = ExtractUserKey(internal_key);
428 }
429 const VersionStorageInfo* vstorage = sv_->current->storage_info();
430 const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
431 for (size_t i = 0; i < l0.size(); ++i) {
432 if (!l0_iters_[i]) {
433 continue;
434 }
435 if (seek_after_async_io) {
436 if (!l0_iters_[i]->status().IsTryAgain()) {
437 continue;
438 }
439 }
440
441 if (seek_to_first) {
442 l0_iters_[i]->SeekToFirst();
443 } else {
444 // If the target key passes over the largest key, we are sure Next()
445 // won't go over this file.
446 if (seek_after_async_io == false &&
447 user_comparator_->Compare(target_user_key,
448 l0[i]->largest.user_key()) > 0) {
449 if (read_options_.iterate_upper_bound != nullptr) {
450 has_iter_trimmed_for_upper_bound_ = true;
451 DeleteIterator(l0_iters_[i]);
452 l0_iters_[i] = nullptr;
453 }
454 continue;
455 }
456 l0_iters_[i]->Seek(internal_key);
457 }
458
459 if (l0_iters_[i]->status().IsTryAgain()) {
460 assert(!seek_after_async_io);
461 continue;
462 } else if (!l0_iters_[i]->status().ok()) {
463 immutable_status_ = l0_iters_[i]->status();
464 } else if (l0_iters_[i]->Valid() &&
465 !IsOverUpperBound(l0_iters_[i]->key())) {
466 immutable_min_heap_.push(l0_iters_[i]);
467 } else {
468 has_iter_trimmed_for_upper_bound_ = true;
469 DeleteIterator(l0_iters_[i]);
470 l0_iters_[i] = nullptr;
471 }
472 }
473
474 for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
475 const std::vector<FileMetaData*>& level_files =
476 vstorage->LevelFiles(level);
477 if (level_files.empty()) {
478 continue;
479 }
480 if (level_iters_[level - 1] == nullptr) {
481 continue;
482 }
483
484 if (seek_after_async_io) {
485 if (!level_iters_[level - 1]->status().IsTryAgain()) {
486 continue;
487 }
488 }
489 uint32_t f_idx = 0;
490 if (!seek_to_first && !seek_after_async_io) {
491 f_idx = FindFileInRange(level_files, internal_key, 0,
492 static_cast<uint32_t>(level_files.size()));
493 }
494
495 // Seek
496 if (seek_after_async_io || f_idx < level_files.size()) {
497 if (!seek_after_async_io) {
498 level_iters_[level - 1]->SetFileIndex(f_idx);
499 }
500 seek_to_first ? level_iters_[level - 1]->SeekToFirst()
501 : level_iters_[level - 1]->Seek(internal_key);
502
503 if (level_iters_[level - 1]->status().IsTryAgain()) {
504 assert(!seek_after_async_io);
505 continue;
506 } else if (!level_iters_[level - 1]->status().ok()) {
507 immutable_status_ = level_iters_[level - 1]->status();
508 } else if (level_iters_[level - 1]->Valid() &&
509 !IsOverUpperBound(level_iters_[level - 1]->key())) {
510 immutable_min_heap_.push(level_iters_[level - 1]);
511 } else {
512 // Nothing in this level is interesting. Remove.
513 has_iter_trimmed_for_upper_bound_ = true;
514 DeleteIterator(level_iters_[level - 1]);
515 level_iters_[level - 1] = nullptr;
516 }
517 }
518 }
519
520 if (seek_to_first) {
521 is_prev_set_ = false;
522 } else {
523 prev_key_.SetInternalKey(internal_key);
524 is_prev_set_ = true;
525 is_prev_inclusive_ = true;
526 }
527
528 TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
529 } else if (current_ && current_ != mutable_iter_) {
530 // current_ is one of immutable iterators, push it back to the heap
531 immutable_min_heap_.push(current_);
532 }
533
534 // For async_io, it should be updated when seek_after_async_io is true (in
535 // second call).
536 if (seek_to_first || !read_options_.async_io || seek_after_async_io) {
537 UpdateCurrent();
538 }
539 TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
540 }
541
542 void ForwardIterator::Next() {
543 assert(valid_);
544 bool update_prev_key = false;
545
546 if (sv_ == nullptr || sv_->version_number != cfd_->GetSuperVersionNumber()) {
547 std::string current_key = key().ToString();
548 Slice old_key(current_key.data(), current_key.size());
549
550 if (sv_ == nullptr) {
551 RebuildIterators(true);
552 } else {
553 RenewIterators();
554 }
555
556 SeekInternal(old_key, false, false);
557 if (read_options_.async_io) {
558 SeekInternal(old_key, false, true);
559 }
560
561 if (!valid_ || key().compare(old_key) != 0) {
562 return;
563 }
564 } else if (current_ != mutable_iter_) {
565 // It is going to advance immutable iterator
566
567 if (is_prev_set_ && prefix_extractor_) {
568 // advance prev_key_ to current_ only if they share the same prefix
569 update_prev_key =
570 prefix_extractor_->Transform(prev_key_.GetUserKey())
571 .compare(prefix_extractor_->Transform(current_->key())) == 0;
572 } else {
573 update_prev_key = true;
574 }
575
576 if (update_prev_key) {
577 prev_key_.SetInternalKey(current_->key());
578 is_prev_set_ = true;
579 is_prev_inclusive_ = false;
580 }
581 }
582
583 current_->Next();
584 if (current_ != mutable_iter_) {
585 if (!current_->status().ok()) {
586 immutable_status_ = current_->status();
587 } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
588 immutable_min_heap_.push(current_);
589 } else {
590 if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
591 // remove the current iterator
592 DeleteCurrentIter();
593 current_ = nullptr;
594 }
595 if (update_prev_key) {
596 mutable_iter_->Seek(prev_key_.GetInternalKey());
597 }
598 }
599 }
600 UpdateCurrent();
601 TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
602 }
603
604 Slice ForwardIterator::key() const {
605 assert(valid_);
606 return current_->key();
607 }
608
609 Slice ForwardIterator::value() const {
610 assert(valid_);
611 return current_->value();
612 }
613
614 Status ForwardIterator::status() const {
615 if (!status_.ok()) {
616 return status_;
617 } else if (!mutable_iter_->status().ok()) {
618 return mutable_iter_->status();
619 }
620
621 return immutable_status_;
622 }
623
624 bool ForwardIterator::PrepareValue() {
625 assert(valid_);
626 if (current_->PrepareValue()) {
627 return true;
628 }
629
630 assert(!current_->Valid());
631 assert(!current_->status().ok());
632 assert(current_ != mutable_iter_); // memtable iterator can't fail
633 assert(immutable_status_.ok());
634
635 valid_ = false;
636 immutable_status_ = current_->status();
637 return false;
638 }
639
640 Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
641 assert(prop != nullptr);
642 if (prop_name == "rocksdb.iterator.super-version-number") {
643 *prop = std::to_string(sv_->version_number);
644 return Status::OK();
645 }
646 return Status::InvalidArgument();
647 }
648
649 void ForwardIterator::SetPinnedItersMgr(
650 PinnedIteratorsManager* pinned_iters_mgr) {
651 pinned_iters_mgr_ = pinned_iters_mgr;
652 UpdateChildrenPinnedItersMgr();
653 }
654
655 void ForwardIterator::UpdateChildrenPinnedItersMgr() {
656 // Set PinnedIteratorsManager for mutable memtable iterator.
657 if (mutable_iter_) {
658 mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
659 }
660
661 // Set PinnedIteratorsManager for immutable memtable iterators.
662 for (InternalIterator* child_iter : imm_iters_) {
663 if (child_iter) {
664 child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
665 }
666 }
667
668 // Set PinnedIteratorsManager for L0 files iterators.
669 for (InternalIterator* child_iter : l0_iters_) {
670 if (child_iter) {
671 child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
672 }
673 }
674
675 // Set PinnedIteratorsManager for L1+ levels iterators.
676 for (ForwardLevelIterator* child_iter : level_iters_) {
677 if (child_iter) {
678 child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
679 }
680 }
681 }
682
683 bool ForwardIterator::IsKeyPinned() const {
684 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
685 current_->IsKeyPinned();
686 }
687
688 bool ForwardIterator::IsValuePinned() const {
689 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
690 current_->IsValuePinned();
691 }
692
693 void ForwardIterator::RebuildIterators(bool refresh_sv) {
694 // Clean up
695 Cleanup(refresh_sv);
696 if (refresh_sv) {
697 // New
698 sv_ = cfd_->GetReferencedSuperVersion(db_);
699 }
700 ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
701 kMaxSequenceNumber /* upper_bound */);
702 mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
703 sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
704 if (!read_options_.ignore_range_deletions) {
705 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
706 sv_->mem->NewRangeTombstoneIterator(
707 read_options_, sv_->current->version_set()->LastSequence(),
708 false /* immutable_memtable */));
709 range_del_agg.AddTombstones(std::move(range_del_iter));
710 // Always return Status::OK().
711 Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
712 &range_del_agg);
713 assert(temp_s.ok());
714 }
715 has_iter_trimmed_for_upper_bound_ = false;
716
717 const auto* vstorage = sv_->current->storage_info();
718 const auto& l0_files = vstorage->LevelFiles(0);
719 l0_iters_.reserve(l0_files.size());
720 for (const auto* l0 : l0_files) {
721 if ((read_options_.iterate_upper_bound != nullptr) &&
722 cfd_->internal_comparator().user_comparator()->Compare(
723 l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
724 // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
725 // will never be interested in files with smallest key above
726 // iterate_upper_bound, since iterate_upper_bound can't be changed.
727 l0_iters_.push_back(nullptr);
728 continue;
729 }
730 l0_iters_.push_back(cfd_->table_cache()->NewIterator(
731 read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
732 read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
733 sv_->mutable_cf_options.prefix_extractor,
734 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
735 TableReaderCaller::kUserIterator, /*arena=*/nullptr,
736 /*skip_filters=*/false, /*level=*/-1,
737 MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
738 /*smallest_compaction_key=*/nullptr,
739 /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
740 }
741 BuildLevelIterators(vstorage, sv_);
742 current_ = nullptr;
743 is_prev_set_ = false;
744
745 UpdateChildrenPinnedItersMgr();
746 if (!range_del_agg.IsEmpty()) {
747 status_ = Status::NotSupported(
748 "Range tombstones unsupported with ForwardIterator");
749 valid_ = false;
750 }
751 }
752
753 void ForwardIterator::RenewIterators() {
754 SuperVersion* svnew;
755 assert(sv_);
756 svnew = cfd_->GetReferencedSuperVersion(db_);
757
758 if (mutable_iter_ != nullptr) {
759 DeleteIterator(mutable_iter_, true /* is_arena */);
760 }
761 for (auto* m : imm_iters_) {
762 DeleteIterator(m, true /* is_arena */);
763 }
764 imm_iters_.clear();
765
766 mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
767 svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
768 ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
769 kMaxSequenceNumber /* upper_bound */);
770 if (!read_options_.ignore_range_deletions) {
771 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
772 svnew->mem->NewRangeTombstoneIterator(
773 read_options_, sv_->current->version_set()->LastSequence(),
774 false /* immutable_memtable */));
775 range_del_agg.AddTombstones(std::move(range_del_iter));
776 // Always return Status::OK().
777 Status temp_s = svnew->imm->AddRangeTombstoneIterators(
778 read_options_, &arena_, &range_del_agg);
779 assert(temp_s.ok());
780 }
781
782 const auto* vstorage = sv_->current->storage_info();
783 const auto& l0_files = vstorage->LevelFiles(0);
784 const auto* vstorage_new = svnew->current->storage_info();
785 const auto& l0_files_new = vstorage_new->LevelFiles(0);
786 size_t iold, inew;
787 bool found;
788 std::vector<InternalIterator*> l0_iters_new;
789 l0_iters_new.reserve(l0_files_new.size());
790
791 for (inew = 0; inew < l0_files_new.size(); inew++) {
792 found = false;
793 for (iold = 0; iold < l0_files.size(); iold++) {
794 if (l0_files[iold] == l0_files_new[inew]) {
795 found = true;
796 break;
797 }
798 }
799 if (found) {
800 if (l0_iters_[iold] == nullptr) {
801 l0_iters_new.push_back(nullptr);
802 TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
803 } else {
804 l0_iters_new.push_back(l0_iters_[iold]);
805 l0_iters_[iold] = nullptr;
806 TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
807 }
808 continue;
809 }
810 l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
811 read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
812 *l0_files_new[inew],
813 read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
814 svnew->mutable_cf_options.prefix_extractor,
815 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
816 TableReaderCaller::kUserIterator, /*arena=*/nullptr,
817 /*skip_filters=*/false, /*level=*/-1,
818 MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
819 /*smallest_compaction_key=*/nullptr,
820 /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
821 }
822
823 for (auto* f : l0_iters_) {
824 DeleteIterator(f);
825 }
826 l0_iters_.clear();
827 l0_iters_ = l0_iters_new;
828
829 for (auto* l : level_iters_) {
830 DeleteIterator(l);
831 }
832 level_iters_.clear();
833 BuildLevelIterators(vstorage_new, svnew);
834 current_ = nullptr;
835 is_prev_set_ = false;
836 SVCleanup();
837 sv_ = svnew;
838
839 UpdateChildrenPinnedItersMgr();
840 if (!range_del_agg.IsEmpty()) {
841 status_ = Status::NotSupported(
842 "Range tombstones unsupported with ForwardIterator");
843 valid_ = false;
844 }
845 }
846
847 void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
848 SuperVersion* sv) {
849 level_iters_.reserve(vstorage->num_levels() - 1);
850 for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
851 const auto& level_files = vstorage->LevelFiles(level);
852 if ((level_files.empty()) ||
853 ((read_options_.iterate_upper_bound != nullptr) &&
854 (user_comparator_->Compare(*read_options_.iterate_upper_bound,
855 level_files[0]->smallest.user_key()) <
856 0))) {
857 level_iters_.push_back(nullptr);
858 if (!level_files.empty()) {
859 has_iter_trimmed_for_upper_bound_ = true;
860 }
861 } else {
862 level_iters_.push_back(new ForwardLevelIterator(
863 cfd_, read_options_, level_files,
864 sv->mutable_cf_options.prefix_extractor, allow_unprepared_value_));
865 }
866 }
867 }
868
869 void ForwardIterator::ResetIncompleteIterators() {
870 const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
871 for (size_t i = 0; i < l0_iters_.size(); ++i) {
872 assert(i < l0_files.size());
873 if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
874 continue;
875 }
876 DeleteIterator(l0_iters_[i]);
877 l0_iters_[i] = cfd_->table_cache()->NewIterator(
878 read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
879 *l0_files[i], /*range_del_agg=*/nullptr,
880 sv_->mutable_cf_options.prefix_extractor,
881 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
882 TableReaderCaller::kUserIterator, /*arena=*/nullptr,
883 /*skip_filters=*/false, /*level=*/-1,
884 MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
885 /*smallest_compaction_key=*/nullptr,
886 /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
887 l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
888 }
889
890 for (auto* level_iter : level_iters_) {
891 if (level_iter && level_iter->status().IsIncomplete()) {
892 level_iter->Reset();
893 }
894 }
895
896 current_ = nullptr;
897 is_prev_set_ = false;
898 }
899
900 void ForwardIterator::UpdateCurrent() {
901 if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
902 current_ = nullptr;
903 } else if (immutable_min_heap_.empty()) {
904 current_ = mutable_iter_;
905 } else if (!mutable_iter_->Valid()) {
906 current_ = immutable_min_heap_.top();
907 immutable_min_heap_.pop();
908 } else {
909 current_ = immutable_min_heap_.top();
910 assert(current_ != nullptr);
911 assert(current_->Valid());
912 int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
913 mutable_iter_->key(), current_->key());
914 assert(cmp != 0);
915 if (cmp > 0) {
916 immutable_min_heap_.pop();
917 } else {
918 current_ = mutable_iter_;
919 }
920 }
921 valid_ = current_ != nullptr && immutable_status_.ok();
922 if (!status_.ok()) {
923 status_ = Status::OK();
924 }
925
926 // Upper bound doesn't apply to the memtable iterator. We want Valid() to
927 // return false when all iterators are over iterate_upper_bound, but can't
928 // just set valid_ to false, as that would effectively disable the tailing
929 // optimization (Seek() would be called on all immutable iterators regardless
930 // of whether the target key is greater than prev_key_).
931 current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key());
932 }
933
934 bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
935 // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
936 // such that there are no records with keys within that range in
937 // immutable_min_heap_. Since immutable structures (SST files and immutable
938 // memtables) can't change in this version, we don't need to do a seek if
939 // 'target' belongs to that interval (immutable_min_heap_.top() is already
940 // at the correct position).
941
942 if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
943 return true;
944 }
945 Slice prev_key = prev_key_.GetInternalKey();
946 if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
947 prefix_extractor_->Transform(prev_key)) != 0) {
948 return true;
949 }
950 if (cfd_->internal_comparator().InternalKeyComparator::Compare(
951 prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
952 return true;
953 }
954
955 if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
956 // Nothing to seek on.
957 return false;
958 }
959 if (cfd_->internal_comparator().InternalKeyComparator::Compare(
960 target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
961 : current_->key()) > 0) {
962 return true;
963 }
964 return false;
965 }
966
967 void ForwardIterator::DeleteCurrentIter() {
968 const VersionStorageInfo* vstorage = sv_->current->storage_info();
969 const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
970 for (size_t i = 0; i < l0.size(); ++i) {
971 if (!l0_iters_[i]) {
972 continue;
973 }
974 if (l0_iters_[i] == current_) {
975 has_iter_trimmed_for_upper_bound_ = true;
976 DeleteIterator(l0_iters_[i]);
977 l0_iters_[i] = nullptr;
978 return;
979 }
980 }
981
982 for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
983 if (level_iters_[level - 1] == nullptr) {
984 continue;
985 }
986 if (level_iters_[level - 1] == current_) {
987 has_iter_trimmed_for_upper_bound_ = true;
988 DeleteIterator(level_iters_[level - 1]);
989 level_iters_[level - 1] = nullptr;
990 }
991 }
992 }
993
994 bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
995 int* pnum_iters) {
996 bool retval = false;
997 int deleted_iters = 0;
998 int num_iters = 0;
999
1000 const VersionStorageInfo* vstorage = sv_->current->storage_info();
1001 const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
1002 for (size_t i = 0; i < l0.size(); ++i) {
1003 if (!l0_iters_[i]) {
1004 retval = true;
1005 deleted_iters++;
1006 } else {
1007 num_iters++;
1008 }
1009 }
1010
1011 for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
1012 if ((level_iters_[level - 1] == nullptr) &&
1013 (!vstorage->LevelFiles(level).empty())) {
1014 retval = true;
1015 deleted_iters++;
1016 } else if (!vstorage->LevelFiles(level).empty()) {
1017 num_iters++;
1018 }
1019 }
1020 if ((!retval) && num_iters <= 1) {
1021 retval = true;
1022 }
1023 if (pdeleted_iters) {
1024 *pdeleted_iters = deleted_iters;
1025 }
1026 if (pnum_iters) {
1027 *pnum_iters = num_iters;
1028 }
1029 return retval;
1030 }
1031
1032 uint32_t ForwardIterator::FindFileInRange(
1033 const std::vector<FileMetaData*>& files, const Slice& internal_key,
1034 uint32_t left, uint32_t right) {
1035 auto cmp = [&](const FileMetaData* f, const Slice& k) -> bool {
1036 return cfd_->internal_comparator().InternalKeyComparator::Compare(
1037 f->largest.Encode(), k) < 0;
1038 };
1039 const auto& b = files.begin();
1040 return static_cast<uint32_t>(
1041 std::lower_bound(b + left, b + right, internal_key, cmp) - b);
1042 }
1043
1044 void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
1045 if (iter == nullptr) {
1046 return;
1047 }
1048
1049 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1050 pinned_iters_mgr_->PinIterator(iter, is_arena);
1051 } else {
1052 if (is_arena) {
1053 iter->~InternalIterator();
1054 } else {
1055 delete iter;
1056 }
1057 }
1058 }
1059
1060 } // namespace ROCKSDB_NAMESPACE
1061
1062 #endif // ROCKSDB_LITE