1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #include "db/forward_iterator.h"
13 #include "db/column_family.h"
14 #include "db/db_impl/db_impl.h"
15 #include "db/db_iter.h"
16 #include "db/dbformat.h"
17 #include "db/job_context.h"
18 #include "db/range_del_aggregator.h"
19 #include "db/range_tombstone_fragmenter.h"
20 #include "rocksdb/env.h"
21 #include "rocksdb/slice.h"
22 #include "rocksdb/slice_transform.h"
23 #include "table/merging_iterator.h"
24 #include "test_util/sync_point.h"
25 #include "util/string_util.h"
27 namespace ROCKSDB_NAMESPACE
{
30 // ForwardLevelIterator iter;
31 // iter.SetFileIndex(file_index);
32 // iter.Seek(target); // or iter.SeekToFirst();
34 class ForwardLevelIterator
: public InternalIterator
{
37 const ColumnFamilyData
* const cfd
, const ReadOptions
& read_options
,
38 const std::vector
<FileMetaData
*>& files
,
39 const std::shared_ptr
<const SliceTransform
>& prefix_extractor
,
40 bool allow_unprepared_value
)
42 read_options_(read_options
),
45 file_index_(std::numeric_limits
<uint32_t>::max()),
47 pinned_iters_mgr_(nullptr),
48 prefix_extractor_(prefix_extractor
),
49 allow_unprepared_value_(allow_unprepared_value
) {
50 status_
.PermitUncheckedError(); // Allow uninitialized status through
53 ~ForwardLevelIterator() override
{
54 // Reset current pointer
55 if (pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled()) {
56 pinned_iters_mgr_
->PinIterator(file_iter_
);
62 void SetFileIndex(uint32_t file_index
) {
63 assert(file_index
< files_
.size());
64 status_
= Status::OK();
65 if (file_index
!= file_index_
) {
66 file_index_
= file_index
;
71 assert(file_index_
< files_
.size());
73 // Reset current pointer
74 if (pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled()) {
75 pinned_iters_mgr_
->PinIterator(file_iter_
);
80 ReadRangeDelAggregator
range_del_agg(&cfd_
->internal_comparator(),
81 kMaxSequenceNumber
/* upper_bound */);
82 file_iter_
= cfd_
->table_cache()->NewIterator(
83 read_options_
, *(cfd_
->soptions()), cfd_
->internal_comparator(),
85 read_options_
.ignore_range_deletions
? nullptr : &range_del_agg
,
86 prefix_extractor_
, /*table_reader_ptr=*/nullptr,
87 /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator
,
88 /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
89 /*max_file_size_for_l0_meta_pin=*/0,
90 /*smallest_compaction_key=*/nullptr,
91 /*largest_compaction_key=*/nullptr, allow_unprepared_value_
);
92 file_iter_
->SetPinnedItersMgr(pinned_iters_mgr_
);
94 if (!range_del_agg
.IsEmpty()) {
95 status_
= Status::NotSupported(
96 "Range tombstones unsupported with ForwardIterator");
99 void SeekToLast() override
{
100 status_
= Status::NotSupported("ForwardLevelIterator::SeekToLast()");
103 void Prev() override
{
104 status_
= Status::NotSupported("ForwardLevelIterator::Prev()");
107 bool Valid() const override
{ return valid_
; }
108 void SeekToFirst() override
{
109 assert(file_iter_
!= nullptr);
114 file_iter_
->SeekToFirst();
115 valid_
= file_iter_
->Valid();
117 void Seek(const Slice
& internal_key
) override
{
118 assert(file_iter_
!= nullptr);
120 // This deviates from the usual convention for InternalIterator::Seek() in
121 // that it doesn't discard pre-existing error status. That's because this
122 // Seek() is only supposed to be called immediately after SetFileIndex()
123 // (which discards pre-existing error status), and SetFileIndex() may set
124 // an error status, which we shouldn't discard.
130 file_iter_
->Seek(internal_key
);
131 valid_
= file_iter_
->Valid();
133 void SeekForPrev(const Slice
& /*internal_key*/) override
{
134 status_
= Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
137 void Next() override
{
141 valid_
= file_iter_
->Valid();
142 if (!file_iter_
->status().ok()) {
149 if (file_index_
+ 1 >= files_
.size()) {
153 SetFileIndex(file_index_
+ 1);
158 file_iter_
->SeekToFirst();
161 Slice
key() const override
{
163 return file_iter_
->key();
165 Slice
value() const override
{
167 return file_iter_
->value();
169 Status
status() const override
{
172 } else if (file_iter_
) {
173 return file_iter_
->status();
177 bool PrepareValue() override
{
179 if (file_iter_
->PrepareValue()) {
183 assert(!file_iter_
->Valid());
187 bool IsKeyPinned() const override
{
188 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
189 file_iter_
->IsKeyPinned();
191 bool IsValuePinned() const override
{
192 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
193 file_iter_
->IsValuePinned();
195 void SetPinnedItersMgr(PinnedIteratorsManager
* pinned_iters_mgr
) override
{
196 pinned_iters_mgr_
= pinned_iters_mgr
;
198 file_iter_
->SetPinnedItersMgr(pinned_iters_mgr_
);
203 const ColumnFamilyData
* const cfd_
;
204 const ReadOptions
& read_options_
;
205 const std::vector
<FileMetaData
*>& files_
;
208 uint32_t file_index_
;
210 InternalIterator
* file_iter_
;
211 PinnedIteratorsManager
* pinned_iters_mgr_
;
212 // Kept alive by ForwardIterator::sv_->mutable_cf_options
213 const std::shared_ptr
<const SliceTransform
>& prefix_extractor_
;
214 const bool allow_unprepared_value_
;
217 ForwardIterator::ForwardIterator(DBImpl
* db
, const ReadOptions
& read_options
,
218 ColumnFamilyData
* cfd
,
219 SuperVersion
* current_sv
,
220 bool allow_unprepared_value
)
222 read_options_(read_options
),
224 prefix_extractor_(current_sv
->mutable_cf_options
.prefix_extractor
.get()),
225 user_comparator_(cfd
->user_comparator()),
226 allow_unprepared_value_(allow_unprepared_value
),
227 immutable_min_heap_(MinIterComparator(&cfd_
->internal_comparator())),
229 mutable_iter_(nullptr),
232 status_(Status::OK()),
233 immutable_status_(Status::OK()),
234 has_iter_trimmed_for_upper_bound_(false),
235 current_over_upper_bound_(false),
237 is_prev_inclusive_(false),
238 pinned_iters_mgr_(nullptr) {
240 RebuildIterators(false);
243 // immutable_status_ is a local aggregation of the
244 // status of the immutable Iterators.
245 // We have to PermitUncheckedError in case it is never
246 // used, otherwise it will fail ASSERT_STATUS_CHECKED.
247 immutable_status_
.PermitUncheckedError();
250 ForwardIterator::~ForwardIterator() { Cleanup(true); }
252 void ForwardIterator::SVCleanup(DBImpl
* db
, SuperVersion
* sv
,
253 bool background_purge_on_iterator_cleanup
) {
255 // Job id == 0 means that this is not our background process, but rather
257 JobContext
job_context(0);
260 db
->FindObsoleteFiles(&job_context
, false, true);
261 if (background_purge_on_iterator_cleanup
) {
262 db
->ScheduleBgLogWriterClose(&job_context
);
263 db
->AddSuperVersionsToFreeQueue(sv
);
267 if (!background_purge_on_iterator_cleanup
) {
270 if (job_context
.HaveSomethingToDelete()) {
271 db
->PurgeObsoleteFiles(job_context
, background_purge_on_iterator_cleanup
);
278 struct SVCleanupParams
{
281 bool background_purge_on_iterator_cleanup
;
283 } // anonymous namespace
285 // Used in PinnedIteratorsManager to release pinned SuperVersion
286 void ForwardIterator::DeferredSVCleanup(void* arg
) {
287 auto d
= reinterpret_cast<SVCleanupParams
*>(arg
);
288 ForwardIterator::SVCleanup(d
->db
, d
->sv
,
289 d
->background_purge_on_iterator_cleanup
);
293 void ForwardIterator::SVCleanup() {
294 if (sv_
== nullptr) {
297 bool background_purge
=
298 read_options_
.background_purge_on_iterator_cleanup
||
299 db_
->immutable_db_options().avoid_unnecessary_blocking_io
;
300 if (pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled()) {
301 // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
302 // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
303 // The slices may point into some memtables owned by sv_, so we need to keep
304 // sv_ referenced until pinned_iters_mgr_ unpins everything.
305 auto p
= new SVCleanupParams
{db_
, sv_
, background_purge
};
306 pinned_iters_mgr_
->PinPtr(p
, &ForwardIterator::DeferredSVCleanup
);
308 SVCleanup(db_
, sv_
, background_purge
);
312 void ForwardIterator::Cleanup(bool release_sv
) {
313 if (mutable_iter_
!= nullptr) {
314 DeleteIterator(mutable_iter_
, true /* is_arena */);
317 for (auto* m
: imm_iters_
) {
318 DeleteIterator(m
, true /* is_arena */);
322 for (auto* f
: l0_iters_
) {
327 for (auto* l
: level_iters_
) {
330 level_iters_
.clear();
337 bool ForwardIterator::Valid() const {
338 // See UpdateCurrent().
339 return valid_
? !current_over_upper_bound_
: false;
342 void ForwardIterator::SeekToFirst() {
343 if (sv_
== nullptr) {
344 RebuildIterators(true);
345 } else if (sv_
->version_number
!= cfd_
->GetSuperVersionNumber()) {
347 } else if (immutable_status_
.IsIncomplete()) {
348 ResetIncompleteIterators();
350 SeekInternal(Slice(), true, false);
353 bool ForwardIterator::IsOverUpperBound(const Slice
& internal_key
) const {
354 return !(read_options_
.iterate_upper_bound
== nullptr ||
355 cfd_
->internal_comparator().user_comparator()->Compare(
356 ExtractUserKey(internal_key
),
357 *read_options_
.iterate_upper_bound
) < 0);
360 void ForwardIterator::Seek(const Slice
& internal_key
) {
361 if (sv_
== nullptr) {
362 RebuildIterators(true);
363 } else if (sv_
->version_number
!= cfd_
->GetSuperVersionNumber()) {
365 } else if (immutable_status_
.IsIncomplete()) {
366 ResetIncompleteIterators();
369 SeekInternal(internal_key
, false, false);
370 if (read_options_
.async_io
) {
371 SeekInternal(internal_key
, false, true);
375 // In case of async_io, SeekInternal is called twice with seek_after_async_io
376 // enabled in second call which only does seeking part to retrieve the blocks.
377 void ForwardIterator::SeekInternal(const Slice
& internal_key
,
379 bool seek_after_async_io
) {
380 assert(mutable_iter_
);
382 if (!seek_after_async_io
) {
383 seek_to_first
? mutable_iter_
->SeekToFirst()
384 : mutable_iter_
->Seek(internal_key
);
388 // TODO(ljin): NeedToSeekImmutable has negative impact on performance
389 // if it turns to need to seek immutable often. We probably want to have
390 // an option to turn it off.
391 if (seek_to_first
|| seek_after_async_io
||
392 NeedToSeekImmutable(internal_key
)) {
393 if (!seek_after_async_io
) {
394 immutable_status_
= Status::OK();
395 if (has_iter_trimmed_for_upper_bound_
&&
397 // prev_ is not set yet
398 is_prev_set_
== false ||
399 // We are doing SeekToFirst() and internal_key.size() = 0
401 // prev_key_ > internal_key
402 cfd_
->internal_comparator().InternalKeyComparator::Compare(
403 prev_key_
.GetInternalKey(), internal_key
) > 0)) {
404 // Some iterators are trimmed. Need to rebuild.
405 RebuildIterators(true);
406 // Already seeked mutable iter, so seek again
407 seek_to_first
? mutable_iter_
->SeekToFirst()
408 : mutable_iter_
->Seek(internal_key
);
411 auto tmp
= MinIterHeap(MinIterComparator(&cfd_
->internal_comparator()));
412 immutable_min_heap_
.swap(tmp
);
414 for (size_t i
= 0; i
< imm_iters_
.size(); i
++) {
415 auto* m
= imm_iters_
[i
];
416 seek_to_first
? m
->SeekToFirst() : m
->Seek(internal_key
);
417 if (!m
->status().ok()) {
418 immutable_status_
= m
->status();
419 } else if (m
->Valid()) {
420 immutable_min_heap_
.push(m
);
425 Slice target_user_key
;
426 if (!seek_to_first
) {
427 target_user_key
= ExtractUserKey(internal_key
);
429 const VersionStorageInfo
* vstorage
= sv_
->current
->storage_info();
430 const std::vector
<FileMetaData
*>& l0
= vstorage
->LevelFiles(0);
431 for (size_t i
= 0; i
< l0
.size(); ++i
) {
435 if (seek_after_async_io
) {
436 if (!l0_iters_
[i
]->status().IsTryAgain()) {
442 l0_iters_
[i
]->SeekToFirst();
444 // If the target key passes over the largest key, we are sure Next()
445 // won't go over this file.
446 if (seek_after_async_io
== false &&
447 user_comparator_
->Compare(target_user_key
,
448 l0
[i
]->largest
.user_key()) > 0) {
449 if (read_options_
.iterate_upper_bound
!= nullptr) {
450 has_iter_trimmed_for_upper_bound_
= true;
451 DeleteIterator(l0_iters_
[i
]);
452 l0_iters_
[i
] = nullptr;
456 l0_iters_
[i
]->Seek(internal_key
);
459 if (l0_iters_
[i
]->status().IsTryAgain()) {
460 assert(!seek_after_async_io
);
462 } else if (!l0_iters_
[i
]->status().ok()) {
463 immutable_status_
= l0_iters_
[i
]->status();
464 } else if (l0_iters_
[i
]->Valid() &&
465 !IsOverUpperBound(l0_iters_
[i
]->key())) {
466 immutable_min_heap_
.push(l0_iters_
[i
]);
468 has_iter_trimmed_for_upper_bound_
= true;
469 DeleteIterator(l0_iters_
[i
]);
470 l0_iters_
[i
] = nullptr;
474 for (int32_t level
= 1; level
< vstorage
->num_levels(); ++level
) {
475 const std::vector
<FileMetaData
*>& level_files
=
476 vstorage
->LevelFiles(level
);
477 if (level_files
.empty()) {
480 if (level_iters_
[level
- 1] == nullptr) {
484 if (seek_after_async_io
) {
485 if (!level_iters_
[level
- 1]->status().IsTryAgain()) {
490 if (!seek_to_first
&& !seek_after_async_io
) {
491 f_idx
= FindFileInRange(level_files
, internal_key
, 0,
492 static_cast<uint32_t>(level_files
.size()));
496 if (seek_after_async_io
|| f_idx
< level_files
.size()) {
497 if (!seek_after_async_io
) {
498 level_iters_
[level
- 1]->SetFileIndex(f_idx
);
500 seek_to_first
? level_iters_
[level
- 1]->SeekToFirst()
501 : level_iters_
[level
- 1]->Seek(internal_key
);
503 if (level_iters_
[level
- 1]->status().IsTryAgain()) {
504 assert(!seek_after_async_io
);
506 } else if (!level_iters_
[level
- 1]->status().ok()) {
507 immutable_status_
= level_iters_
[level
- 1]->status();
508 } else if (level_iters_
[level
- 1]->Valid() &&
509 !IsOverUpperBound(level_iters_
[level
- 1]->key())) {
510 immutable_min_heap_
.push(level_iters_
[level
- 1]);
512 // Nothing in this level is interesting. Remove.
513 has_iter_trimmed_for_upper_bound_
= true;
514 DeleteIterator(level_iters_
[level
- 1]);
515 level_iters_
[level
- 1] = nullptr;
521 is_prev_set_
= false;
523 prev_key_
.SetInternalKey(internal_key
);
525 is_prev_inclusive_
= true;
528 TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
529 } else if (current_
&& current_
!= mutable_iter_
) {
530 // current_ is one of immutable iterators, push it back to the heap
531 immutable_min_heap_
.push(current_
);
534 // For async_io, it should be updated when seek_after_async_io is true (in
536 if (seek_to_first
|| !read_options_
.async_io
|| seek_after_async_io
) {
539 TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
542 void ForwardIterator::Next() {
544 bool update_prev_key
= false;
546 if (sv_
== nullptr || sv_
->version_number
!= cfd_
->GetSuperVersionNumber()) {
547 std::string current_key
= key().ToString();
548 Slice
old_key(current_key
.data(), current_key
.size());
550 if (sv_
== nullptr) {
551 RebuildIterators(true);
556 SeekInternal(old_key
, false, false);
557 if (read_options_
.async_io
) {
558 SeekInternal(old_key
, false, true);
561 if (!valid_
|| key().compare(old_key
) != 0) {
564 } else if (current_
!= mutable_iter_
) {
565 // It is going to advance immutable iterator
567 if (is_prev_set_
&& prefix_extractor_
) {
568 // advance prev_key_ to current_ only if they share the same prefix
570 prefix_extractor_
->Transform(prev_key_
.GetUserKey())
571 .compare(prefix_extractor_
->Transform(current_
->key())) == 0;
573 update_prev_key
= true;
576 if (update_prev_key
) {
577 prev_key_
.SetInternalKey(current_
->key());
579 is_prev_inclusive_
= false;
584 if (current_
!= mutable_iter_
) {
585 if (!current_
->status().ok()) {
586 immutable_status_
= current_
->status();
587 } else if ((current_
->Valid()) && (!IsOverUpperBound(current_
->key()))) {
588 immutable_min_heap_
.push(current_
);
590 if ((current_
->Valid()) && (IsOverUpperBound(current_
->key()))) {
591 // remove the current iterator
595 if (update_prev_key
) {
596 mutable_iter_
->Seek(prev_key_
.GetInternalKey());
601 TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
604 Slice
ForwardIterator::key() const {
606 return current_
->key();
609 Slice
ForwardIterator::value() const {
611 return current_
->value();
614 Status
ForwardIterator::status() const {
617 } else if (!mutable_iter_
->status().ok()) {
618 return mutable_iter_
->status();
621 return immutable_status_
;
624 bool ForwardIterator::PrepareValue() {
626 if (current_
->PrepareValue()) {
630 assert(!current_
->Valid());
631 assert(!current_
->status().ok());
632 assert(current_
!= mutable_iter_
); // memtable iterator can't fail
633 assert(immutable_status_
.ok());
636 immutable_status_
= current_
->status();
640 Status
ForwardIterator::GetProperty(std::string prop_name
, std::string
* prop
) {
641 assert(prop
!= nullptr);
642 if (prop_name
== "rocksdb.iterator.super-version-number") {
643 *prop
= std::to_string(sv_
->version_number
);
646 return Status::InvalidArgument();
649 void ForwardIterator::SetPinnedItersMgr(
650 PinnedIteratorsManager
* pinned_iters_mgr
) {
651 pinned_iters_mgr_
= pinned_iters_mgr
;
652 UpdateChildrenPinnedItersMgr();
655 void ForwardIterator::UpdateChildrenPinnedItersMgr() {
656 // Set PinnedIteratorsManager for mutable memtable iterator.
658 mutable_iter_
->SetPinnedItersMgr(pinned_iters_mgr_
);
661 // Set PinnedIteratorsManager for immutable memtable iterators.
662 for (InternalIterator
* child_iter
: imm_iters_
) {
664 child_iter
->SetPinnedItersMgr(pinned_iters_mgr_
);
668 // Set PinnedIteratorsManager for L0 files iterators.
669 for (InternalIterator
* child_iter
: l0_iters_
) {
671 child_iter
->SetPinnedItersMgr(pinned_iters_mgr_
);
675 // Set PinnedIteratorsManager for L1+ levels iterators.
676 for (ForwardLevelIterator
* child_iter
: level_iters_
) {
678 child_iter
->SetPinnedItersMgr(pinned_iters_mgr_
);
683 bool ForwardIterator::IsKeyPinned() const {
684 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
685 current_
->IsKeyPinned();
688 bool ForwardIterator::IsValuePinned() const {
689 return pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled() &&
690 current_
->IsValuePinned();
693 void ForwardIterator::RebuildIterators(bool refresh_sv
) {
698 sv_
= cfd_
->GetReferencedSuperVersion(db_
);
700 ReadRangeDelAggregator
range_del_agg(&cfd_
->internal_comparator(),
701 kMaxSequenceNumber
/* upper_bound */);
702 mutable_iter_
= sv_
->mem
->NewIterator(read_options_
, &arena_
);
703 sv_
->imm
->AddIterators(read_options_
, &imm_iters_
, &arena_
);
704 if (!read_options_
.ignore_range_deletions
) {
705 std::unique_ptr
<FragmentedRangeTombstoneIterator
> range_del_iter(
706 sv_
->mem
->NewRangeTombstoneIterator(
707 read_options_
, sv_
->current
->version_set()->LastSequence(),
708 false /* immutable_memtable */));
709 range_del_agg
.AddTombstones(std::move(range_del_iter
));
710 // Always return Status::OK().
711 Status temp_s
= sv_
->imm
->AddRangeTombstoneIterators(read_options_
, &arena_
,
715 has_iter_trimmed_for_upper_bound_
= false;
717 const auto* vstorage
= sv_
->current
->storage_info();
718 const auto& l0_files
= vstorage
->LevelFiles(0);
719 l0_iters_
.reserve(l0_files
.size());
720 for (const auto* l0
: l0_files
) {
721 if ((read_options_
.iterate_upper_bound
!= nullptr) &&
722 cfd_
->internal_comparator().user_comparator()->Compare(
723 l0
->smallest
.user_key(), *read_options_
.iterate_upper_bound
) > 0) {
724 // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
725 // will never be interested in files with smallest key above
726 // iterate_upper_bound, since iterate_upper_bound can't be changed.
727 l0_iters_
.push_back(nullptr);
730 l0_iters_
.push_back(cfd_
->table_cache()->NewIterator(
731 read_options_
, *cfd_
->soptions(), cfd_
->internal_comparator(), *l0
,
732 read_options_
.ignore_range_deletions
? nullptr : &range_del_agg
,
733 sv_
->mutable_cf_options
.prefix_extractor
,
734 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
735 TableReaderCaller::kUserIterator
, /*arena=*/nullptr,
736 /*skip_filters=*/false, /*level=*/-1,
737 MaxFileSizeForL0MetaPin(sv_
->mutable_cf_options
),
738 /*smallest_compaction_key=*/nullptr,
739 /*largest_compaction_key=*/nullptr, allow_unprepared_value_
));
741 BuildLevelIterators(vstorage
, sv_
);
743 is_prev_set_
= false;
745 UpdateChildrenPinnedItersMgr();
746 if (!range_del_agg
.IsEmpty()) {
747 status_
= Status::NotSupported(
748 "Range tombstones unsupported with ForwardIterator");
753 void ForwardIterator::RenewIterators() {
756 svnew
= cfd_
->GetReferencedSuperVersion(db_
);
758 if (mutable_iter_
!= nullptr) {
759 DeleteIterator(mutable_iter_
, true /* is_arena */);
761 for (auto* m
: imm_iters_
) {
762 DeleteIterator(m
, true /* is_arena */);
766 mutable_iter_
= svnew
->mem
->NewIterator(read_options_
, &arena_
);
767 svnew
->imm
->AddIterators(read_options_
, &imm_iters_
, &arena_
);
768 ReadRangeDelAggregator
range_del_agg(&cfd_
->internal_comparator(),
769 kMaxSequenceNumber
/* upper_bound */);
770 if (!read_options_
.ignore_range_deletions
) {
771 std::unique_ptr
<FragmentedRangeTombstoneIterator
> range_del_iter(
772 svnew
->mem
->NewRangeTombstoneIterator(
773 read_options_
, sv_
->current
->version_set()->LastSequence(),
774 false /* immutable_memtable */));
775 range_del_agg
.AddTombstones(std::move(range_del_iter
));
776 // Always return Status::OK().
777 Status temp_s
= svnew
->imm
->AddRangeTombstoneIterators(
778 read_options_
, &arena_
, &range_del_agg
);
782 const auto* vstorage
= sv_
->current
->storage_info();
783 const auto& l0_files
= vstorage
->LevelFiles(0);
784 const auto* vstorage_new
= svnew
->current
->storage_info();
785 const auto& l0_files_new
= vstorage_new
->LevelFiles(0);
788 std::vector
<InternalIterator
*> l0_iters_new
;
789 l0_iters_new
.reserve(l0_files_new
.size());
791 for (inew
= 0; inew
< l0_files_new
.size(); inew
++) {
793 for (iold
= 0; iold
< l0_files
.size(); iold
++) {
794 if (l0_files
[iold
] == l0_files_new
[inew
]) {
800 if (l0_iters_
[iold
] == nullptr) {
801 l0_iters_new
.push_back(nullptr);
802 TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
804 l0_iters_new
.push_back(l0_iters_
[iold
]);
805 l0_iters_
[iold
] = nullptr;
806 TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
810 l0_iters_new
.push_back(cfd_
->table_cache()->NewIterator(
811 read_options_
, *cfd_
->soptions(), cfd_
->internal_comparator(),
813 read_options_
.ignore_range_deletions
? nullptr : &range_del_agg
,
814 svnew
->mutable_cf_options
.prefix_extractor
,
815 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
816 TableReaderCaller::kUserIterator
, /*arena=*/nullptr,
817 /*skip_filters=*/false, /*level=*/-1,
818 MaxFileSizeForL0MetaPin(svnew
->mutable_cf_options
),
819 /*smallest_compaction_key=*/nullptr,
820 /*largest_compaction_key=*/nullptr, allow_unprepared_value_
));
823 for (auto* f
: l0_iters_
) {
827 l0_iters_
= l0_iters_new
;
829 for (auto* l
: level_iters_
) {
832 level_iters_
.clear();
833 BuildLevelIterators(vstorage_new
, svnew
);
835 is_prev_set_
= false;
839 UpdateChildrenPinnedItersMgr();
840 if (!range_del_agg
.IsEmpty()) {
841 status_
= Status::NotSupported(
842 "Range tombstones unsupported with ForwardIterator");
847 void ForwardIterator::BuildLevelIterators(const VersionStorageInfo
* vstorage
,
849 level_iters_
.reserve(vstorage
->num_levels() - 1);
850 for (int32_t level
= 1; level
< vstorage
->num_levels(); ++level
) {
851 const auto& level_files
= vstorage
->LevelFiles(level
);
852 if ((level_files
.empty()) ||
853 ((read_options_
.iterate_upper_bound
!= nullptr) &&
854 (user_comparator_
->Compare(*read_options_
.iterate_upper_bound
,
855 level_files
[0]->smallest
.user_key()) <
857 level_iters_
.push_back(nullptr);
858 if (!level_files
.empty()) {
859 has_iter_trimmed_for_upper_bound_
= true;
862 level_iters_
.push_back(new ForwardLevelIterator(
863 cfd_
, read_options_
, level_files
,
864 sv
->mutable_cf_options
.prefix_extractor
, allow_unprepared_value_
));
869 void ForwardIterator::ResetIncompleteIterators() {
870 const auto& l0_files
= sv_
->current
->storage_info()->LevelFiles(0);
871 for (size_t i
= 0; i
< l0_iters_
.size(); ++i
) {
872 assert(i
< l0_files
.size());
873 if (!l0_iters_
[i
] || !l0_iters_
[i
]->status().IsIncomplete()) {
876 DeleteIterator(l0_iters_
[i
]);
877 l0_iters_
[i
] = cfd_
->table_cache()->NewIterator(
878 read_options_
, *cfd_
->soptions(), cfd_
->internal_comparator(),
879 *l0_files
[i
], /*range_del_agg=*/nullptr,
880 sv_
->mutable_cf_options
.prefix_extractor
,
881 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
882 TableReaderCaller::kUserIterator
, /*arena=*/nullptr,
883 /*skip_filters=*/false, /*level=*/-1,
884 MaxFileSizeForL0MetaPin(sv_
->mutable_cf_options
),
885 /*smallest_compaction_key=*/nullptr,
886 /*largest_compaction_key=*/nullptr, allow_unprepared_value_
);
887 l0_iters_
[i
]->SetPinnedItersMgr(pinned_iters_mgr_
);
890 for (auto* level_iter
: level_iters_
) {
891 if (level_iter
&& level_iter
->status().IsIncomplete()) {
897 is_prev_set_
= false;
900 void ForwardIterator::UpdateCurrent() {
901 if (immutable_min_heap_
.empty() && !mutable_iter_
->Valid()) {
903 } else if (immutable_min_heap_
.empty()) {
904 current_
= mutable_iter_
;
905 } else if (!mutable_iter_
->Valid()) {
906 current_
= immutable_min_heap_
.top();
907 immutable_min_heap_
.pop();
909 current_
= immutable_min_heap_
.top();
910 assert(current_
!= nullptr);
911 assert(current_
->Valid());
912 int cmp
= cfd_
->internal_comparator().InternalKeyComparator::Compare(
913 mutable_iter_
->key(), current_
->key());
916 immutable_min_heap_
.pop();
918 current_
= mutable_iter_
;
921 valid_
= current_
!= nullptr && immutable_status_
.ok();
923 status_
= Status::OK();
926 // Upper bound doesn't apply to the memtable iterator. We want Valid() to
927 // return false when all iterators are over iterate_upper_bound, but can't
928 // just set valid_ to false, as that would effectively disable the tailing
929 // optimization (Seek() would be called on all immutable iterators regardless
930 // of whether the target key is greater than prev_key_).
931 current_over_upper_bound_
= valid_
&& IsOverUpperBound(current_
->key());
934 bool ForwardIterator::NeedToSeekImmutable(const Slice
& target
) {
935 // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
936 // such that there are no records with keys within that range in
937 // immutable_min_heap_. Since immutable structures (SST files and immutable
938 // memtables) can't change in this version, we don't need to do a seek if
939 // 'target' belongs to that interval (immutable_min_heap_.top() is already
940 // at the correct position).
942 if (!valid_
|| !current_
|| !is_prev_set_
|| !immutable_status_
.ok()) {
945 Slice prev_key
= prev_key_
.GetInternalKey();
946 if (prefix_extractor_
&& prefix_extractor_
->Transform(target
).compare(
947 prefix_extractor_
->Transform(prev_key
)) != 0) {
950 if (cfd_
->internal_comparator().InternalKeyComparator::Compare(
951 prev_key
, target
) >= (is_prev_inclusive_
? 1 : 0)) {
955 if (immutable_min_heap_
.empty() && current_
== mutable_iter_
) {
956 // Nothing to seek on.
959 if (cfd_
->internal_comparator().InternalKeyComparator::Compare(
960 target
, current_
== mutable_iter_
? immutable_min_heap_
.top()->key()
961 : current_
->key()) > 0) {
967 void ForwardIterator::DeleteCurrentIter() {
968 const VersionStorageInfo
* vstorage
= sv_
->current
->storage_info();
969 const std::vector
<FileMetaData
*>& l0
= vstorage
->LevelFiles(0);
970 for (size_t i
= 0; i
< l0
.size(); ++i
) {
974 if (l0_iters_
[i
] == current_
) {
975 has_iter_trimmed_for_upper_bound_
= true;
976 DeleteIterator(l0_iters_
[i
]);
977 l0_iters_
[i
] = nullptr;
982 for (int32_t level
= 1; level
< vstorage
->num_levels(); ++level
) {
983 if (level_iters_
[level
- 1] == nullptr) {
986 if (level_iters_
[level
- 1] == current_
) {
987 has_iter_trimmed_for_upper_bound_
= true;
988 DeleteIterator(level_iters_
[level
- 1]);
989 level_iters_
[level
- 1] = nullptr;
994 bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters
,
997 int deleted_iters
= 0;
1000 const VersionStorageInfo
* vstorage
= sv_
->current
->storage_info();
1001 const std::vector
<FileMetaData
*>& l0
= vstorage
->LevelFiles(0);
1002 for (size_t i
= 0; i
< l0
.size(); ++i
) {
1003 if (!l0_iters_
[i
]) {
1011 for (int32_t level
= 1; level
< vstorage
->num_levels(); ++level
) {
1012 if ((level_iters_
[level
- 1] == nullptr) &&
1013 (!vstorage
->LevelFiles(level
).empty())) {
1016 } else if (!vstorage
->LevelFiles(level
).empty()) {
1020 if ((!retval
) && num_iters
<= 1) {
1023 if (pdeleted_iters
) {
1024 *pdeleted_iters
= deleted_iters
;
1027 *pnum_iters
= num_iters
;
1032 uint32_t ForwardIterator::FindFileInRange(
1033 const std::vector
<FileMetaData
*>& files
, const Slice
& internal_key
,
1034 uint32_t left
, uint32_t right
) {
1035 auto cmp
= [&](const FileMetaData
* f
, const Slice
& k
) -> bool {
1036 return cfd_
->internal_comparator().InternalKeyComparator::Compare(
1037 f
->largest
.Encode(), k
) < 0;
1039 const auto& b
= files
.begin();
1040 return static_cast<uint32_t>(
1041 std::lower_bound(b
+ left
, b
+ right
, internal_key
, cmp
) - b
);
1044 void ForwardIterator::DeleteIterator(InternalIterator
* iter
, bool is_arena
) {
1045 if (iter
== nullptr) {
1049 if (pinned_iters_mgr_
&& pinned_iters_mgr_
->PinningEnabled()) {
1050 pinned_iters_mgr_
->PinIterator(iter
, is_arena
);
1053 iter
->~InternalIterator();
1060 } // namespace ROCKSDB_NAMESPACE
1062 #endif // ROCKSDB_LITE