]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_iter.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_iter.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/db_iter.h"
7c673cae 11#include <string>
11fdf7f2 12#include <iostream>
7c673cae
FG
13#include <limits>
14
15#include "db/dbformat.h"
16#include "db/merge_context.h"
17#include "db/merge_helper.h"
18#include "db/pinned_iterators_manager.h"
19#include "monitoring/perf_context_imp.h"
7c673cae
FG
20#include "rocksdb/env.h"
21#include "rocksdb/iterator.h"
22#include "rocksdb/merge_operator.h"
23#include "rocksdb/options.h"
24#include "table/internal_iterator.h"
25#include "util/arena.h"
26#include "util/filename.h"
27#include "util/logging.h"
28#include "util/mutexlock.h"
29#include "util/string_util.h"
11fdf7f2 30#include "util/trace_replay.h"
7c673cae
FG
31
32namespace rocksdb {
33
34#if 0
35static void DumpInternalIter(Iterator* iter) {
36 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
37 ParsedInternalKey k;
38 if (!ParseInternalKey(iter->key(), &k)) {
39 fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
40 } else {
41 fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
42 }
43 }
44}
45#endif
46
47// Memtables and sstables that make the DB representation contain
48// (userkey,seq,type) => uservalue entries. DBIter
49// combines multiple entries for the same userkey found in the DB
50// representation into a single entry while accounting for sequence
51// numbers, deletion markers, overwrites, etc.
11fdf7f2 52class DBIter final: public Iterator {
7c673cae
FG
53 public:
54 // The following is grossly complicated. TODO: clean it up
55 // Which direction is the iterator currently moving?
11fdf7f2
TL
56 // (1) When moving forward:
57 // (1a) if current_entry_is_merged_ = false, the internal iterator is
58 // positioned at the exact entry that yields this->key(), this->value()
59 // (1b) if current_entry_is_merged_ = true, the internal iterator is
60 // positioned immediately after the last entry that contributed to the
61 // current this->value(). That entry may or may not have key equal to
62 // this->key().
7c673cae
FG
63 // (2) When moving backwards, the internal iterator is positioned
64 // just before all entries whose user key == this->key().
65 enum Direction {
66 kForward,
67 kReverse
68 };
69
70 // LocalStatistics contain Statistics counters that will be aggregated per
71 // each iterator instance and then will be sent to the global statistics when
72 // the iterator is destroyed.
73 //
74 // The purpose of this approach is to avoid perf regression happening
75 // when multiple threads bump the atomic counters from a DBIter::Next().
76 struct LocalStatistics {
77 explicit LocalStatistics() { ResetCounters(); }
78
79 void ResetCounters() {
80 next_count_ = 0;
81 next_found_count_ = 0;
82 prev_count_ = 0;
83 prev_found_count_ = 0;
84 bytes_read_ = 0;
11fdf7f2 85 skip_count_ = 0;
7c673cae
FG
86 }
87
88 void BumpGlobalStatistics(Statistics* global_statistics) {
89 RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
90 RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
91 RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
92 RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
93 RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
11fdf7f2
TL
94 RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_);
95 PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
7c673cae
FG
96 ResetCounters();
97 }
98
99 // Map to Tickers::NUMBER_DB_NEXT
100 uint64_t next_count_;
101 // Map to Tickers::NUMBER_DB_NEXT_FOUND
102 uint64_t next_found_count_;
103 // Map to Tickers::NUMBER_DB_PREV
104 uint64_t prev_count_;
105 // Map to Tickers::NUMBER_DB_PREV_FOUND
106 uint64_t prev_found_count_;
107 // Map to Tickers::ITER_BYTES_READ
108 uint64_t bytes_read_;
11fdf7f2
TL
109 // Map to Tickers::NUMBER_ITER_SKIP
110 uint64_t skip_count_;
7c673cae
FG
111 };
112
11fdf7f2
TL
113 DBIter(Env* _env, const ReadOptions& read_options,
114 const ImmutableCFOptions& cf_options,
115 const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
7c673cae 116 InternalIterator* iter, SequenceNumber s, bool arena_mode,
11fdf7f2
TL
117 uint64_t max_sequential_skip_in_iterations,
118 ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
119 bool allow_blob)
7c673cae 120 : arena_mode_(arena_mode),
11fdf7f2 121 env_(_env),
7c673cae
FG
122 logger_(cf_options.info_log),
123 user_comparator_(cmp),
124 merge_operator_(cf_options.merge_operator),
125 iter_(iter),
126 sequence_(s),
127 direction_(kForward),
128 valid_(false),
129 current_entry_is_merged_(false),
130 statistics_(cf_options.statistics),
11fdf7f2
TL
131 num_internal_keys_skipped_(0),
132 iterate_lower_bound_(read_options.iterate_lower_bound),
7c673cae
FG
133 iterate_upper_bound_(read_options.iterate_upper_bound),
134 prefix_same_as_start_(read_options.prefix_same_as_start),
135 pin_thru_lifetime_(read_options.pin_data),
136 total_order_seek_(read_options.total_order_seek),
137 range_del_agg_(cf_options.internal_comparator, s,
11fdf7f2
TL
138 true /* collapse_deletions */),
139 read_callback_(read_callback),
140 db_impl_(db_impl),
141 cfd_(cfd),
142 allow_blob_(allow_blob),
143 is_blob_(false),
144 start_seqnum_(read_options.iter_start_seqnum) {
7c673cae 145 RecordTick(statistics_, NO_ITERATORS);
11fdf7f2 146 prefix_extractor_ = mutable_cf_options.prefix_extractor.get();
7c673cae
FG
147 max_skip_ = max_sequential_skip_in_iterations;
148 max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
149 if (pin_thru_lifetime_) {
150 pinned_iters_mgr_.StartPinning();
151 }
152 if (iter_) {
153 iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
154 }
155 }
156 virtual ~DBIter() {
157 // Release pinned data if any
158 if (pinned_iters_mgr_.PinningEnabled()) {
159 pinned_iters_mgr_.ReleasePinnedData();
160 }
11fdf7f2
TL
161 // Compiler warning issue filed:
162 // https://github.com/facebook/rocksdb/issues/3013
163 RecordTick(statistics_, NO_ITERATORS, uint64_t(-1));
164 ResetInternalKeysSkippedCounter();
7c673cae
FG
165 local_stats_.BumpGlobalStatistics(statistics_);
166 if (!arena_mode_) {
167 delete iter_;
168 } else {
169 iter_->~InternalIterator();
170 }
171 }
172 virtual void SetIter(InternalIterator* iter) {
173 assert(iter_ == nullptr);
174 iter_ = iter;
175 iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
176 }
177 virtual RangeDelAggregator* GetRangeDelAggregator() {
178 return &range_del_agg_;
179 }
180
181 virtual bool Valid() const override { return valid_; }
182 virtual Slice key() const override {
183 assert(valid_);
11fdf7f2
TL
184 if(start_seqnum_ > 0) {
185 return saved_key_.GetInternalKey();
186 } else {
187 return saved_key_.GetUserKey();
188 }
189
7c673cae
FG
190 }
191 virtual Slice value() const override {
192 assert(valid_);
193 if (current_entry_is_merged_) {
194 // If pinned_value_ is set then the result of merge operator is one of
195 // the merge operands and we should return it.
196 return pinned_value_.data() ? pinned_value_ : saved_value_;
197 } else if (direction_ == kReverse) {
198 return pinned_value_;
199 } else {
200 return iter_->value();
201 }
202 }
203 virtual Status status() const override {
204 if (status_.ok()) {
205 return iter_->status();
206 } else {
11fdf7f2 207 assert(!valid_);
7c673cae
FG
208 return status_;
209 }
210 }
11fdf7f2
TL
211 bool IsBlob() const {
212 assert(valid_ && (allow_blob_ || !is_blob_));
213 return is_blob_;
214 }
7c673cae
FG
215
216 virtual Status GetProperty(std::string prop_name,
217 std::string* prop) override {
218 if (prop == nullptr) {
219 return Status::InvalidArgument("prop is nullptr");
220 }
221 if (prop_name == "rocksdb.iterator.super-version-number") {
222 // First try to pass the value returned from inner iterator.
11fdf7f2 223 return iter_->GetProperty(prop_name, prop);
7c673cae
FG
224 } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
225 if (valid_) {
226 *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
227 } else {
228 *prop = "Iterator is not valid.";
229 }
230 return Status::OK();
11fdf7f2
TL
231 } else if (prop_name == "rocksdb.iterator.internal-key") {
232 *prop = saved_key_.GetUserKey().ToString();
233 return Status::OK();
7c673cae
FG
234 }
235 return Status::InvalidArgument("Undentified property.");
236 }
237
238 virtual void Next() override;
239 virtual void Prev() override;
240 virtual void Seek(const Slice& target) override;
241 virtual void SeekForPrev(const Slice& target) override;
242 virtual void SeekToFirst() override;
243 virtual void SeekToLast() override;
11fdf7f2
TL
244 Env* env() { return env_; }
245 void set_sequence(uint64_t s) { sequence_ = s; }
246 void set_valid(bool v) { valid_ = v; }
7c673cae
FG
247
248 private:
11fdf7f2
TL
249 // For all methods in this block:
250 // PRE: iter_->Valid() && status_.ok()
251 // Return false if there was an error, and status() is non-ok, valid_ = false;
252 // in this case callers would usually stop what they were doing and return.
253 bool ReverseToForward();
254 bool ReverseToBackward();
7c673cae
FG
255 bool FindValueForCurrentKey();
256 bool FindValueForCurrentKeyUsingSeek();
11fdf7f2
TL
257 bool FindUserKeyBeforeSavedKey();
258 inline bool FindNextUserEntry(bool skipping, bool prefix_check);
259 bool FindNextUserEntryInternal(bool skipping, bool prefix_check);
7c673cae 260 bool ParseKey(ParsedInternalKey* key);
11fdf7f2
TL
261 bool MergeValuesNewToOld();
262
263 void PrevInternal();
7c673cae 264 bool TooManyInternalKeysSkipped(bool increment = true);
11fdf7f2
TL
265 bool IsVisible(SequenceNumber sequence);
266
267 // CanReseekToSkip() returns whether the iterator can use the optimization
268 // where it reseek by sequence number to get the next key when there are too
269 // many versions. This is disabled for write unprepared because seeking to
270 // sequence number does not guarantee that it is visible.
271 inline bool CanReseekToSkip();
272
273 // MaxVisibleSequenceNumber() returns the maximum visible sequence number
274 // for this snapshot. This sequence number may be greater than snapshot
275 // seqno because uncommitted data written to DB for write unprepared will
276 // have a higher sequence number.
277 inline SequenceNumber MaxVisibleSequenceNumber();
7c673cae
FG
278
279 // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
280 // is called
281 void TempPinData() {
282 if (!pin_thru_lifetime_) {
283 pinned_iters_mgr_.StartPinning();
284 }
285 }
286
287 // Release blocks pinned by TempPinData()
288 void ReleaseTempPinnedData() {
289 if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
290 pinned_iters_mgr_.ReleasePinnedData();
291 }
292 }
293
294 inline void ClearSavedValue() {
295 if (saved_value_.capacity() > 1048576) {
296 std::string empty;
297 swap(empty, saved_value_);
298 } else {
299 saved_value_.clear();
300 }
301 }
302
303 inline void ResetInternalKeysSkippedCounter() {
11fdf7f2
TL
304 local_stats_.skip_count_ += num_internal_keys_skipped_;
305 if (valid_) {
306 local_stats_.skip_count_--;
307 }
7c673cae
FG
308 num_internal_keys_skipped_ = 0;
309 }
310
311 const SliceTransform* prefix_extractor_;
312 bool arena_mode_;
313 Env* const env_;
314 Logger* logger_;
315 const Comparator* const user_comparator_;
316 const MergeOperator* const merge_operator_;
317 InternalIterator* iter_;
11fdf7f2 318 SequenceNumber sequence_;
7c673cae
FG
319
320 Status status_;
321 IterKey saved_key_;
11fdf7f2
TL
322 // Reusable internal key data structure. This is only used inside one function
323 // and should not be used across functions. Reusing this object can reduce
324 // overhead of calling construction of the function if creating it each time.
325 ParsedInternalKey ikey_;
7c673cae
FG
326 std::string saved_value_;
327 Slice pinned_value_;
328 Direction direction_;
329 bool valid_;
330 bool current_entry_is_merged_;
331 // for prefix seek mode to support prev()
332 Statistics* statistics_;
333 uint64_t max_skip_;
334 uint64_t max_skippable_internal_keys_;
335 uint64_t num_internal_keys_skipped_;
11fdf7f2 336 const Slice* iterate_lower_bound_;
7c673cae
FG
337 const Slice* iterate_upper_bound_;
338 IterKey prefix_start_buf_;
339 Slice prefix_start_key_;
340 const bool prefix_same_as_start_;
341 // Means that we will pin all data blocks we read as long the Iterator
342 // is not deleted, will be true if ReadOptions::pin_data is true
343 const bool pin_thru_lifetime_;
344 const bool total_order_seek_;
345 // List of operands for merge operator.
346 MergeContext merge_context_;
347 RangeDelAggregator range_del_agg_;
348 LocalStatistics local_stats_;
349 PinnedIteratorsManager pinned_iters_mgr_;
11fdf7f2
TL
350 ReadCallback* read_callback_;
351 DBImpl* db_impl_;
352 ColumnFamilyData* cfd_;
353 bool allow_blob_;
354 bool is_blob_;
355 // for diff snapshots we want the lower bound on the seqnum;
356 // if this value > 0 iterator will return internal keys
357 SequenceNumber start_seqnum_;
7c673cae
FG
358
359 // No copying allowed
360 DBIter(const DBIter&);
361 void operator=(const DBIter&);
362};
363
364inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
365 if (!ParseInternalKey(iter_->key(), ikey)) {
366 status_ = Status::Corruption("corrupted internal key in DBIter");
11fdf7f2 367 valid_ = false;
7c673cae
FG
368 ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
369 iter_->key().ToString(true).c_str());
370 return false;
371 } else {
372 return true;
373 }
374}
375
376void DBIter::Next() {
377 assert(valid_);
11fdf7f2 378 assert(status_.ok());
7c673cae
FG
379
380 // Release temporarily pinned blocks from last operation
381 ReleaseTempPinnedData();
382 ResetInternalKeysSkippedCounter();
11fdf7f2 383 bool ok = true;
7c673cae 384 if (direction_ == kReverse) {
11fdf7f2
TL
385 if (!ReverseToForward()) {
386 ok = false;
387 }
7c673cae
FG
388 } else if (iter_->Valid() && !current_entry_is_merged_) {
389 // If the current value is not a merge, the iter position is the
390 // current key, which is already returned. We can safely issue a
391 // Next() without checking the current key.
392 // If the current key is a merge, very likely iter already points
393 // to the next internal position.
394 iter_->Next();
395 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
396 }
397
398 if (statistics_ != nullptr) {
399 local_stats_.next_count_++;
400 }
11fdf7f2
TL
401 if (ok && iter_->Valid()) {
402 FindNextUserEntry(true /* skipping the current user key */,
403 prefix_same_as_start_);
404 } else {
7c673cae 405 valid_ = false;
7c673cae 406 }
7c673cae
FG
407 if (statistics_ != nullptr && valid_) {
408 local_stats_.next_found_count_++;
409 local_stats_.bytes_read_ += (key().size() + value().size());
410 }
411}
412
413// PRE: saved_key_ has the current user key if skipping
414// POST: saved_key_ should have the next user key if valid_,
415// if the current entry is a result of merge
416// current_entry_is_merged_ => true
417// saved_value_ => the merged value
418//
419// NOTE: In between, saved_key_ can point to a user key that has
420// a delete marker or a sequence number higher than sequence_
421// saved_key_ MUST have a proper user_key before calling this function
422//
423// The prefix_check parameter controls whether we check the iterated
424// keys against the prefix of the seeked key. Set to false when
425// performing a seek without a key (e.g. SeekToFirst). Set to
426// prefix_same_as_start_ for other iterations.
11fdf7f2 427inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
7c673cae 428 PERF_TIMER_GUARD(find_next_user_entry_time);
11fdf7f2 429 return FindNextUserEntryInternal(skipping, prefix_check);
7c673cae
FG
430}
431
432// Actual implementation of DBIter::FindNextUserEntry()
11fdf7f2 433bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
7c673cae
FG
434 // Loop until we hit an acceptable entry to yield
435 assert(iter_->Valid());
11fdf7f2 436 assert(status_.ok());
7c673cae
FG
437 assert(direction_ == kForward);
438 current_entry_is_merged_ = false;
439
440 // How many times in a row we have skipped an entry with user key less than
441 // or equal to saved_key_. We could skip these entries either because
442 // sequence numbers were too high or because skipping = true.
443 // What saved_key_ contains throughout this method:
444 // - if skipping : saved_key_ contains the key that we need to skip,
445 // and we haven't seen any keys greater than that,
446 // - if num_skipped > 0 : saved_key_ contains the key that we have skipped
447 // num_skipped times, and we haven't seen any keys
448 // greater than that,
449 // - none of the above : saved_key_ can contain anything, it doesn't matter.
450 uint64_t num_skipped = 0;
451
11fdf7f2 452 is_blob_ = false;
7c673cae 453
11fdf7f2
TL
454 do {
455 if (!ParseKey(&ikey_)) {
456 return false;
7c673cae
FG
457 }
458
459 if (iterate_upper_bound_ != nullptr &&
11fdf7f2 460 user_comparator_->Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
7c673cae
FG
461 break;
462 }
463
464 if (prefix_extractor_ && prefix_check &&
11fdf7f2
TL
465 prefix_extractor_->Transform(ikey_.user_key)
466 .compare(prefix_start_key_) != 0) {
7c673cae
FG
467 break;
468 }
469
470 if (TooManyInternalKeysSkipped()) {
11fdf7f2 471 return false;
7c673cae
FG
472 }
473
11fdf7f2
TL
474 if (IsVisible(ikey_.sequence)) {
475 if (skipping && user_comparator_->Compare(ikey_.user_key,
476 saved_key_.GetUserKey()) <= 0) {
7c673cae
FG
477 num_skipped++; // skip this entry
478 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
479 } else {
480 num_skipped = 0;
11fdf7f2 481 switch (ikey_.type) {
7c673cae
FG
482 case kTypeDeletion:
483 case kTypeSingleDeletion:
484 // Arrange to skip all upcoming entries for this key since
485 // they are hidden by this deletion.
11fdf7f2
TL
486 // if iterartor specified start_seqnum we
487 // 1) return internal key, including the type
488 // 2) return ikey only if ikey.seqnum >= start_seqnum_
489 // note that if deletion seqnum is < start_seqnum_ we
490 // just skip it like in normal iterator.
491 if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) {
492 saved_key_.SetInternalKey(ikey_);
493 valid_ = true;
494 return true;
495 } else {
496 saved_key_.SetUserKey(
497 ikey_.user_key,
498 !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
7c673cae 499 skipping = true;
7c673cae 500 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
11fdf7f2
TL
501 }
502 break;
503 case kTypeValue:
504 case kTypeBlobIndex:
505 if (start_seqnum_ > 0) {
506 // we are taking incremental snapshot here
507 // incremental snapshots aren't supported on DB with range deletes
508 assert(!(
509 (ikey_.type == kTypeBlobIndex) && (start_seqnum_ > 0)
510 ));
511 if (ikey_.sequence >= start_seqnum_) {
512 saved_key_.SetInternalKey(ikey_);
513 valid_ = true;
514 return true;
515 } else {
516 // this key and all previous versions shouldn't be included,
517 // skipping
518 saved_key_.SetUserKey(ikey_.user_key,
519 !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
520 skipping = true;
521 }
7c673cae 522 } else {
11fdf7f2
TL
523 saved_key_.SetUserKey(
524 ikey_.user_key,
525 !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
526 if (range_del_agg_.ShouldDelete(
527 ikey_, RangeDelPositioningMode::kForwardTraversal)) {
528 // Arrange to skip all upcoming entries for this key since
529 // they are hidden by this deletion.
530 skipping = true;
531 num_skipped = 0;
532 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
533 } else if (ikey_.type == kTypeBlobIndex) {
534 if (!allow_blob_) {
535 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
536 status_ = Status::NotSupported(
537 "Encounter unexpected blob index. Please open DB with "
538 "rocksdb::blob_db::BlobDB instead.");
539 valid_ = false;
540 return false;
541 }
542
543 is_blob_ = true;
544 valid_ = true;
545 return true;
546 } else {
547 valid_ = true;
548 return true;
549 }
7c673cae
FG
550 }
551 break;
552 case kTypeMerge:
553 saved_key_.SetUserKey(
11fdf7f2
TL
554 ikey_.user_key,
555 !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
7c673cae 556 if (range_del_agg_.ShouldDelete(
11fdf7f2 557 ikey_, RangeDelPositioningMode::kForwardTraversal)) {
7c673cae
FG
558 // Arrange to skip all upcoming entries for this key since
559 // they are hidden by this deletion.
560 skipping = true;
561 num_skipped = 0;
562 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
563 } else {
564 // By now, we are sure the current ikey is going to yield a
565 // value
566 current_entry_is_merged_ = true;
567 valid_ = true;
11fdf7f2 568 return MergeValuesNewToOld(); // Go to a different state machine
7c673cae
FG
569 }
570 break;
571 default:
572 assert(false);
573 break;
574 }
575 }
576 } else {
7c673cae
FG
577 PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
578
11fdf7f2
TL
579 // This key was inserted after our snapshot was taken.
580 // If this happens too many times in a row for the same user key, we want
581 // to seek to the target sequence number.
582 int cmp =
583 user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey());
584 if (cmp == 0 || (skipping && cmp <= 0)) {
7c673cae
FG
585 num_skipped++;
586 } else {
587 saved_key_.SetUserKey(
11fdf7f2 588 ikey_.user_key,
7c673cae
FG
589 !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
590 skipping = false;
591 num_skipped = 0;
592 }
593 }
594
595 // If we have sequentially iterated via numerous equal keys, then it's
596 // better to seek so that we can avoid too many key comparisons.
11fdf7f2 597 if (num_skipped > max_skip_ && CanReseekToSkip()) {
7c673cae
FG
598 num_skipped = 0;
599 std::string last_key;
600 if (skipping) {
601 // We're looking for the next user-key but all we see are the same
602 // user-key with decreasing sequence numbers. Fast forward to
603 // sequence number 0 and type deletion (the smallest type).
604 AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
605 0, kTypeDeletion));
606 // Don't set skipping = false because we may still see more user-keys
607 // equal to saved_key_.
608 } else {
609 // We saw multiple entries with this user key and sequence numbers
610 // higher than sequence_. Fast forward to sequence_.
611 // Note that this only covers a case when a higher key was overwritten
612 // many times since our snapshot was taken, not the case when a lot of
613 // different keys were inserted after our snapshot was taken.
614 AppendInternalKey(&last_key,
615 ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
616 kValueTypeForSeek));
617 }
618 iter_->Seek(last_key);
619 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
620 } else {
621 iter_->Next();
622 }
623 } while (iter_->Valid());
11fdf7f2 624
7c673cae 625 valid_ = false;
11fdf7f2 626 return iter_->status().ok();
7c673cae
FG
627}
628
629// Merge values of the same user key starting from the current iter_ position
630// Scan from the newer entries to older entries.
631// PRE: iter_->key() points to the first merge type entry
632// saved_key_ stores the user key
633// POST: saved_value_ has the merged value for the user key
634// iter_ points to the next entry (or invalid)
11fdf7f2 635bool DBIter::MergeValuesNewToOld() {
7c673cae
FG
636 if (!merge_operator_) {
637 ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
638 status_ = Status::InvalidArgument("merge_operator_ must be set.");
639 valid_ = false;
11fdf7f2 640 return false;
7c673cae
FG
641 }
642
643 // Temporarily pin the blocks that hold merge operands
644 TempPinData();
645 merge_context_.Clear();
646 // Start the merge process by pushing the first operand
647 merge_context_.PushOperand(iter_->value(),
648 iter_->IsValuePinned() /* operand_pinned */);
11fdf7f2 649 TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
7c673cae
FG
650
651 ParsedInternalKey ikey;
652 Status s;
653 for (iter_->Next(); iter_->Valid(); iter_->Next()) {
11fdf7f2 654 TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
7c673cae 655 if (!ParseKey(&ikey)) {
11fdf7f2 656 return false;
7c673cae
FG
657 }
658
659 if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
660 // hit the next user key, stop right here
661 break;
662 } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
663 range_del_agg_.ShouldDelete(
11fdf7f2 664 ikey, RangeDelPositioningMode::kForwardTraversal)) {
7c673cae
FG
665 // hit a delete with the same user key, stop right here
666 // iter_ is positioned after delete
667 iter_->Next();
668 break;
669 } else if (kTypeValue == ikey.type) {
670 // hit a put, merge the put value with operands and store the
671 // final result in saved_value_. We are done!
7c673cae
FG
672 const Slice val = iter_->value();
673 s = MergeHelper::TimedFullMerge(
674 merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
11fdf7f2 675 &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
7c673cae 676 if (!s.ok()) {
11fdf7f2 677 valid_ = false;
7c673cae 678 status_ = s;
11fdf7f2 679 return false;
7c673cae
FG
680 }
681 // iter_ is positioned after put
682 iter_->Next();
11fdf7f2
TL
683 if (!iter_->status().ok()) {
684 valid_ = false;
685 return false;
686 }
687 return true;
7c673cae
FG
688 } else if (kTypeMerge == ikey.type) {
689 // hit a merge, add the value as an operand and run associative merge.
690 // when complete, add result to operands and continue.
691 merge_context_.PushOperand(iter_->value(),
692 iter_->IsValuePinned() /* operand_pinned */);
693 PERF_COUNTER_ADD(internal_merge_count, 1);
11fdf7f2
TL
694 } else if (kTypeBlobIndex == ikey.type) {
695 if (!allow_blob_) {
696 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
697 status_ = Status::NotSupported(
698 "Encounter unexpected blob index. Please open DB with "
699 "rocksdb::blob_db::BlobDB instead.");
700 } else {
701 status_ =
702 Status::NotSupported("Blob DB does not support merge operator.");
703 }
704 valid_ = false;
705 return false;
7c673cae
FG
706 } else {
707 assert(false);
708 }
709 }
710
11fdf7f2
TL
711 if (!iter_->status().ok()) {
712 valid_ = false;
713 return false;
714 }
715
7c673cae
FG
716 // we either exhausted all internal keys under this user key, or hit
717 // a deletion marker.
718 // feed null as the existing value to the merge operator, such that
719 // client can differentiate this scenario and do things accordingly.
720 s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
721 nullptr, merge_context_.GetOperands(),
722 &saved_value_, logger_, statistics_, env_,
11fdf7f2 723 &pinned_value_, true);
7c673cae 724 if (!s.ok()) {
11fdf7f2 725 valid_ = false;
7c673cae 726 status_ = s;
11fdf7f2 727 return false;
7c673cae 728 }
11fdf7f2
TL
729
730 assert(status_.ok());
731 return true;
7c673cae
FG
732}
733
734void DBIter::Prev() {
735 assert(valid_);
11fdf7f2 736 assert(status_.ok());
7c673cae
FG
737 ReleaseTempPinnedData();
738 ResetInternalKeysSkippedCounter();
11fdf7f2 739 bool ok = true;
7c673cae 740 if (direction_ == kForward) {
11fdf7f2
TL
741 if (!ReverseToBackward()) {
742 ok = false;
743 }
744 }
745 if (ok) {
746 PrevInternal();
7c673cae 747 }
7c673cae
FG
748 if (statistics_ != nullptr) {
749 local_stats_.prev_count_++;
750 if (valid_) {
751 local_stats_.prev_found_count_++;
752 local_stats_.bytes_read_ += (key().size() + value().size());
753 }
754 }
755}
756
11fdf7f2
TL
757bool DBIter::ReverseToForward() {
758 assert(iter_->status().ok());
759
760 // When moving backwards, iter_ is positioned on _previous_ key, which may
761 // not exist or may have different prefix than the current key().
762 // If that's the case, seek iter_ to current key.
763 if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) {
7c673cae
FG
764 IterKey last_key;
765 last_key.SetInternalKey(ParsedInternalKey(
766 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
767 iter_->Seek(last_key.GetInternalKey());
768 }
11fdf7f2 769
7c673cae 770 direction_ = kForward;
11fdf7f2
TL
771 // Skip keys less than the current key() (a.k.a. saved_key_).
772 while (iter_->Valid()) {
773 ParsedInternalKey ikey;
774 if (!ParseKey(&ikey)) {
775 return false;
776 }
777 if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >=
778 0) {
779 return true;
780 }
781 iter_->Next();
782 }
783
784 if (!iter_->status().ok()) {
785 valid_ = false;
786 return false;
7c673cae 787 }
11fdf7f2
TL
788
789 return true;
7c673cae
FG
790}
791
11fdf7f2
TL
792// Move iter_ to the key before saved_key_.
793bool DBIter::ReverseToBackward() {
794 assert(iter_->status().ok());
795
796 // When current_entry_is_merged_ is true, iter_ may be positioned on the next
797 // key, which may not exist or may have prefix different from current.
798 // If that's the case, seek to saved_key_.
799 if (current_entry_is_merged_ &&
800 ((prefix_extractor_ != nullptr && !total_order_seek_) ||
801 !iter_->Valid())) {
7c673cae 802 IterKey last_key;
11fdf7f2
TL
803 // Using kMaxSequenceNumber and kValueTypeForSeek
804 // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
805 // than saved_key_.
806 last_key.SetInternalKey(ParsedInternalKey(
807 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
808 if (prefix_extractor_ != nullptr && !total_order_seek_) {
809 iter_->SeekForPrev(last_key.GetInternalKey());
810 } else {
811 // Some iterators may not support SeekForPrev(), so we avoid using it
812 // when prefix seek mode is disabled. This is somewhat expensive
813 // (an extra Prev(), as well as an extra change of direction of iter_),
814 // so we may need to reconsider it later.
815 iter_->Seek(last_key.GetInternalKey());
816 if (!iter_->Valid() && iter_->status().ok()) {
817 iter_->SeekToLast();
7c673cae 818 }
7c673cae
FG
819 }
820 }
7c673cae 821
7c673cae 822 direction_ = kReverse;
11fdf7f2 823 return FindUserKeyBeforeSavedKey();
7c673cae
FG
824}
825
826void DBIter::PrevInternal() {
7c673cae
FG
827 while (iter_->Valid()) {
828 saved_key_.SetUserKey(
829 ExtractUserKey(iter_->key()),
830 !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
831
11fdf7f2
TL
832 if (prefix_extractor_ && prefix_same_as_start_ &&
833 prefix_extractor_->Transform(saved_key_.GetUserKey())
834 .compare(prefix_start_key_) != 0) {
835 // Current key does not have the same prefix as start
836 valid_ = false;
7c673cae
FG
837 return;
838 }
839
11fdf7f2
TL
840 if (iterate_lower_bound_ != nullptr &&
841 user_comparator_->Compare(saved_key_.GetUserKey(),
842 *iterate_lower_bound_) < 0) {
843 // We've iterated earlier than the user-specified lower bound.
844 valid_ = false;
7c673cae
FG
845 return;
846 }
847
11fdf7f2
TL
848 if (!FindValueForCurrentKey()) { // assigns valid_
849 return;
850 }
851
852 // Whether or not we found a value for current key, we need iter_ to end up
853 // on a smaller key.
854 if (!FindUserKeyBeforeSavedKey()) {
855 return;
7c673cae 856 }
11fdf7f2
TL
857
858 if (valid_) {
859 // Found the value.
860 return;
861 }
862
863 if (TooManyInternalKeysSkipped(false)) {
864 return;
7c673cae
FG
865 }
866 }
11fdf7f2 867
7c673cae 868 // We haven't found any key - iterator is not valid
7c673cae
FG
869 valid_ = false;
870}
871
11fdf7f2
TL
872// Used for backwards iteration.
873// Looks at the entries with user key saved_key_ and finds the most up-to-date
874// value for it, or executes a merge, or determines that the value was deleted.
875// Sets valid_ to true if the value is found and is ready to be presented to
876// the user through value().
877// Sets valid_ to false if the value was deleted, and we should try another key.
878// Returns false if an error occurred, and !status().ok() and !valid_.
879//
880// PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
881// POST: iter_ is positioned on one of the entries equal to saved_key_, or on
882// the entry just before them, or on the entry just after them.
7c673cae
FG
883bool DBIter::FindValueForCurrentKey() {
884 assert(iter_->Valid());
885 merge_context_.Clear();
886 current_entry_is_merged_ = false;
887 // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
888 // kTypeValue)
889 ValueType last_not_merge_type = kTypeDeletion;
890 ValueType last_key_entry_type = kTypeDeletion;
891
7c673cae
FG
892 // Temporarily pin blocks that hold (merge operands / the value)
893 ReleaseTempPinnedData();
894 TempPinData();
895 size_t num_skipped = 0;
11fdf7f2
TL
896 while (iter_->Valid()) {
897 ParsedInternalKey ikey;
898 if (!ParseKey(&ikey)) {
899 return false;
900 }
901
902 if (!IsVisible(ikey.sequence) ||
903 !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
904 break;
905 }
7c673cae
FG
906 if (TooManyInternalKeysSkipped()) {
907 return false;
908 }
909
11fdf7f2
TL
910 // This user key has lots of entries.
911 // We're going from old to new, and it's taking too long. Let's do a Seek()
912 // and go from new to old. This helps when a key was overwritten many times.
913 if (num_skipped >= max_skip_ && CanReseekToSkip()) {
7c673cae
FG
914 return FindValueForCurrentKeyUsingSeek();
915 }
916
917 last_key_entry_type = ikey.type;
918 switch (last_key_entry_type) {
919 case kTypeValue:
11fdf7f2 920 case kTypeBlobIndex:
7c673cae 921 if (range_del_agg_.ShouldDelete(
11fdf7f2 922 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
7c673cae
FG
923 last_key_entry_type = kTypeRangeDeletion;
924 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
925 } else {
926 assert(iter_->IsValuePinned());
927 pinned_value_ = iter_->value();
928 }
929 merge_context_.Clear();
930 last_not_merge_type = last_key_entry_type;
931 break;
932 case kTypeDeletion:
933 case kTypeSingleDeletion:
934 merge_context_.Clear();
935 last_not_merge_type = last_key_entry_type;
936 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
937 break;
938 case kTypeMerge:
939 if (range_del_agg_.ShouldDelete(
11fdf7f2 940 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
7c673cae
FG
941 merge_context_.Clear();
942 last_key_entry_type = kTypeRangeDeletion;
943 last_not_merge_type = last_key_entry_type;
944 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
945 } else {
946 assert(merge_operator_ != nullptr);
947 merge_context_.PushOperandBack(
948 iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
949 PERF_COUNTER_ADD(internal_merge_count, 1);
950 }
951 break;
952 default:
953 assert(false);
954 }
955
956 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
7c673cae
FG
957 iter_->Prev();
958 ++num_skipped;
11fdf7f2
TL
959 }
960
961 if (!iter_->status().ok()) {
962 valid_ = false;
963 return false;
7c673cae
FG
964 }
965
966 Status s;
11fdf7f2 967 is_blob_ = false;
7c673cae
FG
968 switch (last_key_entry_type) {
969 case kTypeDeletion:
970 case kTypeSingleDeletion:
971 case kTypeRangeDeletion:
972 valid_ = false;
11fdf7f2 973 return true;
7c673cae
FG
974 case kTypeMerge:
975 current_entry_is_merged_ = true;
976 if (last_not_merge_type == kTypeDeletion ||
977 last_not_merge_type == kTypeSingleDeletion ||
978 last_not_merge_type == kTypeRangeDeletion) {
979 s = MergeHelper::TimedFullMerge(
980 merge_operator_, saved_key_.GetUserKey(), nullptr,
981 merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
11fdf7f2
TL
982 env_, &pinned_value_, true);
983 } else if (last_not_merge_type == kTypeBlobIndex) {
984 if (!allow_blob_) {
985 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
986 status_ = Status::NotSupported(
987 "Encounter unexpected blob index. Please open DB with "
988 "rocksdb::blob_db::BlobDB instead.");
989 } else {
990 status_ =
991 Status::NotSupported("Blob DB does not support merge operator.");
992 }
993 valid_ = false;
994 return false;
7c673cae
FG
995 } else {
996 assert(last_not_merge_type == kTypeValue);
997 s = MergeHelper::TimedFullMerge(
998 merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
999 merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
11fdf7f2 1000 env_, &pinned_value_, true);
7c673cae
FG
1001 }
1002 break;
1003 case kTypeValue:
11fdf7f2
TL
1004 // do nothing - we've already has value in pinned_value_
1005 break;
1006 case kTypeBlobIndex:
1007 if (!allow_blob_) {
1008 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
1009 status_ = Status::NotSupported(
1010 "Encounter unexpected blob index. Please open DB with "
1011 "rocksdb::blob_db::BlobDB instead.");
1012 valid_ = false;
1013 return false;
1014 }
1015 is_blob_ = true;
7c673cae
FG
1016 break;
1017 default:
1018 assert(false);
1019 break;
1020 }
7c673cae 1021 if (!s.ok()) {
11fdf7f2 1022 valid_ = false;
7c673cae 1023 status_ = s;
11fdf7f2 1024 return false;
7c673cae 1025 }
11fdf7f2 1026 valid_ = true;
7c673cae
FG
1027 return true;
1028}
1029
1030// This function is used in FindValueForCurrentKey.
1031// We use Seek() function instead of Prev() to find necessary value
11fdf7f2
TL
1032// TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
1033// Would be nice to reuse some code.
7c673cae
FG
1034bool DBIter::FindValueForCurrentKeyUsingSeek() {
1035 // FindValueForCurrentKey will enable pinning before calling
1036 // FindValueForCurrentKeyUsingSeek()
1037 assert(pinned_iters_mgr_.PinningEnabled());
1038 std::string last_key;
1039 AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
1040 sequence_, kValueTypeForSeek));
1041 iter_->Seek(last_key);
1042 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1043
11fdf7f2
TL
1044 // In case read_callback presents, the value we seek to may not be visible.
1045 // Find the next value that's visible.
7c673cae 1046 ParsedInternalKey ikey;
11fdf7f2
TL
1047 while (true) {
1048 if (!iter_->Valid()) {
1049 valid_ = false;
1050 return iter_->status().ok();
1051 }
1052
1053 if (!ParseKey(&ikey)) {
1054 return false;
1055 }
1056 if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
1057 // No visible values for this key, even though FindValueForCurrentKey()
1058 // has seen some. This is possible if we're using a tailing iterator, and
1059 // the entries were discarded in a compaction.
1060 valid_ = false;
1061 return true;
1062 }
1063
1064 if (IsVisible(ikey.sequence)) {
1065 break;
1066 }
1067
1068 iter_->Next();
1069 }
7c673cae
FG
1070
1071 if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
1072 range_del_agg_.ShouldDelete(
11fdf7f2
TL
1073 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
1074 valid_ = false;
1075 return true;
1076 }
1077 if (ikey.type == kTypeBlobIndex && !allow_blob_) {
1078 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
1079 status_ = Status::NotSupported(
1080 "Encounter unexpected blob index. Please open DB with "
1081 "rocksdb::blob_db::BlobDB instead.");
7c673cae
FG
1082 valid_ = false;
1083 return false;
1084 }
11fdf7f2 1085 if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
7c673cae
FG
1086 assert(iter_->IsValuePinned());
1087 pinned_value_ = iter_->value();
1088 valid_ = true;
1089 return true;
1090 }
1091
1092 // kTypeMerge. We need to collect all kTypeMerge values and save them
1093 // in operands
11fdf7f2 1094 assert(ikey.type == kTypeMerge);
7c673cae
FG
1095 current_entry_is_merged_ = true;
1096 merge_context_.Clear();
11fdf7f2
TL
1097 merge_context_.PushOperand(iter_->value(),
1098 iter_->IsValuePinned() /* operand_pinned */);
1099 while (true) {
7c673cae 1100 iter_->Next();
7c673cae 1101
11fdf7f2
TL
1102 if (!iter_->Valid()) {
1103 if (!iter_->status().ok()) {
1104 valid_ = false;
1105 return false;
1106 }
1107 break;
7c673cae 1108 }
11fdf7f2
TL
1109 if (!ParseKey(&ikey)) {
1110 return false;
1111 }
1112 if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
1113 break;
1114 }
1115
1116 if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
1117 range_del_agg_.ShouldDelete(
1118 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
1119 break;
1120 } else if (ikey.type == kTypeValue) {
1121 const Slice val = iter_->value();
1122 Status s = MergeHelper::TimedFullMerge(
1123 merge_operator_, saved_key_.GetUserKey(), &val,
1124 merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
1125 env_, &pinned_value_, true);
1126 if (!s.ok()) {
1127 valid_ = false;
1128 status_ = s;
1129 return false;
1130 }
1131 valid_ = true;
1132 return true;
1133 } else if (ikey.type == kTypeMerge) {
1134 merge_context_.PushOperand(iter_->value(),
1135 iter_->IsValuePinned() /* operand_pinned */);
1136 PERF_COUNTER_ADD(internal_merge_count, 1);
1137 } else if (ikey.type == kTypeBlobIndex) {
1138 if (!allow_blob_) {
1139 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
1140 status_ = Status::NotSupported(
1141 "Encounter unexpected blob index. Please open DB with "
1142 "rocksdb::blob_db::BlobDB instead.");
1143 } else {
1144 status_ =
1145 Status::NotSupported("Blob DB does not support merge operator.");
1146 }
1147 valid_ = false;
1148 return false;
1149 } else {
1150 assert(false);
7c673cae 1151 }
7c673cae
FG
1152 }
1153
11fdf7f2
TL
1154 Status s = MergeHelper::TimedFullMerge(
1155 merge_operator_, saved_key_.GetUserKey(), nullptr,
1156 merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
1157 &pinned_value_, true);
7c673cae 1158 if (!s.ok()) {
11fdf7f2 1159 valid_ = false;
7c673cae 1160 status_ = s;
11fdf7f2 1161 return false;
7c673cae 1162 }
7c673cae 1163
11fdf7f2
TL
1164 // Make sure we leave iter_ in a good state. If it's valid and we don't care
1165 // about prefixes, that's already good enough. Otherwise it needs to be
1166 // seeked to the current key.
1167 if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) {
1168 if (prefix_extractor_ != nullptr && !total_order_seek_) {
1169 iter_->SeekForPrev(last_key);
1170 } else {
1171 iter_->Seek(last_key);
1172 if (!iter_->Valid() && iter_->status().ok()) {
1173 iter_->SeekToLast();
1174 }
1175 }
1176 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
7c673cae 1177 }
11fdf7f2
TL
1178
1179 valid_ = true;
1180 return true;
7c673cae
FG
1181}
1182
11fdf7f2
TL
1183// Move backwards until the key smaller than saved_key_.
1184// Changes valid_ only if return value is false.
1185bool DBIter::FindUserKeyBeforeSavedKey() {
1186 assert(status_.ok());
7c673cae 1187 size_t num_skipped = 0;
11fdf7f2
TL
1188 while (iter_->Valid()) {
1189 ParsedInternalKey ikey;
1190 if (!ParseKey(&ikey)) {
1191 return false;
7c673cae
FG
1192 }
1193
11fdf7f2
TL
1194 if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
1195 return true;
7c673cae 1196 }
11fdf7f2
TL
1197
1198 if (TooManyInternalKeysSkipped()) {
1199 return false;
1200 }
1201
1202 assert(ikey.sequence != kMaxSequenceNumber);
1203 if (!IsVisible(ikey.sequence)) {
7c673cae
FG
1204 PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
1205 } else {
1206 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
1207 }
11fdf7f2
TL
1208
1209 if (num_skipped >= max_skip_ && CanReseekToSkip()) {
1210 num_skipped = 0;
1211 IterKey last_key;
1212 last_key.SetInternalKey(ParsedInternalKey(
1213 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
1214 // It would be more efficient to use SeekForPrev() here, but some
1215 // iterators may not support it.
1216 iter_->Seek(last_key.GetInternalKey());
1217 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1218 if (!iter_->Valid()) {
1219 break;
1220 }
1221 } else {
1222 ++num_skipped;
1223 }
1224
7c673cae 1225 iter_->Prev();
7c673cae 1226 }
11fdf7f2
TL
1227
1228 if (!iter_->status().ok()) {
1229 valid_ = false;
1230 return false;
1231 }
1232
1233 return true;
7c673cae
FG
1234}
1235
1236bool DBIter::TooManyInternalKeysSkipped(bool increment) {
1237 if ((max_skippable_internal_keys_ > 0) &&
1238 (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
1239 valid_ = false;
1240 status_ = Status::Incomplete("Too many internal keys skipped.");
1241 return true;
1242 } else if (increment) {
1243 num_internal_keys_skipped_++;
1244 }
1245 return false;
1246}
1247
11fdf7f2
TL
1248bool DBIter::IsVisible(SequenceNumber sequence) {
1249 return sequence <= MaxVisibleSequenceNumber() &&
1250 (read_callback_ == nullptr || read_callback_->IsVisible(sequence));
1251}
1252
1253bool DBIter::CanReseekToSkip() {
1254 return read_callback_ == nullptr ||
1255 read_callback_->MaxUnpreparedSequenceNumber() == 0;
1256}
1257
1258SequenceNumber DBIter::MaxVisibleSequenceNumber() {
1259 if (read_callback_ == nullptr) {
1260 return sequence_;
7c673cae 1261 }
11fdf7f2
TL
1262
1263 return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber());
7c673cae
FG
1264}
1265
1266void DBIter::Seek(const Slice& target) {
1267 StopWatch sw(env_, statistics_, DB_SEEK);
11fdf7f2 1268 status_ = Status::OK();
7c673cae
FG
1269 ReleaseTempPinnedData();
1270 ResetInternalKeysSkippedCounter();
11fdf7f2
TL
1271
1272 SequenceNumber seq = MaxVisibleSequenceNumber();
7c673cae 1273 saved_key_.Clear();
11fdf7f2
TL
1274 saved_key_.SetInternalKey(target, seq);
1275
1276#ifndef ROCKSDB_LITE
1277 if (db_impl_ != nullptr && cfd_ != nullptr) {
1278 db_impl_->TraceIteratorSeek(cfd_->GetID(), target);
1279 }
1280#endif // ROCKSDB_LITE
1281
1282 if (iterate_lower_bound_ != nullptr &&
1283 user_comparator_->Compare(saved_key_.GetUserKey(),
1284 *iterate_lower_bound_) < 0) {
1285 saved_key_.Clear();
1286 saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
1287 }
7c673cae
FG
1288
1289 {
1290 PERF_TIMER_GUARD(seek_internal_seek_time);
1291 iter_->Seek(saved_key_.GetInternalKey());
11fdf7f2 1292 range_del_agg_.InvalidateRangeDelMapPositions();
7c673cae
FG
1293 }
1294 RecordTick(statistics_, NUMBER_DB_SEEK);
1295 if (iter_->Valid()) {
1296 if (prefix_extractor_ && prefix_same_as_start_) {
1297 prefix_start_key_ = prefix_extractor_->Transform(target);
1298 }
1299 direction_ = kForward;
1300 ClearSavedValue();
1301 FindNextUserEntry(false /* not skipping */, prefix_same_as_start_);
1302 if (!valid_) {
1303 prefix_start_key_.clear();
1304 }
1305 if (statistics_ != nullptr) {
1306 if (valid_) {
11fdf7f2 1307 // Decrement since we don't want to count this key as skipped
7c673cae
FG
1308 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1309 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
11fdf7f2 1310 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
7c673cae
FG
1311 }
1312 }
1313 } else {
1314 valid_ = false;
1315 }
1316
1317 if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1318 prefix_start_buf_.SetUserKey(prefix_start_key_);
1319 prefix_start_key_ = prefix_start_buf_.GetUserKey();
1320 }
1321}
1322
1323void DBIter::SeekForPrev(const Slice& target) {
1324 StopWatch sw(env_, statistics_, DB_SEEK);
11fdf7f2 1325 status_ = Status::OK();
7c673cae
FG
1326 ReleaseTempPinnedData();
1327 ResetInternalKeysSkippedCounter();
1328 saved_key_.Clear();
1329 // now saved_key is used to store internal key.
1330 saved_key_.SetInternalKey(target, 0 /* sequence_number */,
1331 kValueTypeForSeekForPrev);
1332
11fdf7f2
TL
1333 if (iterate_upper_bound_ != nullptr &&
1334 user_comparator_->Compare(saved_key_.GetUserKey(),
1335 *iterate_upper_bound_) >= 0) {
1336 saved_key_.Clear();
1337 saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
1338 }
1339
7c673cae
FG
1340 {
1341 PERF_TIMER_GUARD(seek_internal_seek_time);
1342 iter_->SeekForPrev(saved_key_.GetInternalKey());
11fdf7f2 1343 range_del_agg_.InvalidateRangeDelMapPositions();
7c673cae
FG
1344 }
1345
11fdf7f2
TL
1346#ifndef ROCKSDB_LITE
1347 if (db_impl_ != nullptr && cfd_ != nullptr) {
1348 db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target);
1349 }
1350#endif // ROCKSDB_LITE
1351
7c673cae
FG
1352 RecordTick(statistics_, NUMBER_DB_SEEK);
1353 if (iter_->Valid()) {
1354 if (prefix_extractor_ && prefix_same_as_start_) {
1355 prefix_start_key_ = prefix_extractor_->Transform(target);
1356 }
1357 direction_ = kReverse;
1358 ClearSavedValue();
1359 PrevInternal();
1360 if (!valid_) {
1361 prefix_start_key_.clear();
1362 }
1363 if (statistics_ != nullptr) {
1364 if (valid_) {
1365 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1366 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
11fdf7f2 1367 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
7c673cae
FG
1368 }
1369 }
1370 } else {
1371 valid_ = false;
1372 }
1373 if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1374 prefix_start_buf_.SetUserKey(prefix_start_key_);
1375 prefix_start_key_ = prefix_start_buf_.GetUserKey();
1376 }
1377}
1378
1379void DBIter::SeekToFirst() {
11fdf7f2
TL
1380 if (iterate_lower_bound_ != nullptr) {
1381 Seek(*iterate_lower_bound_);
1382 return;
1383 }
7c673cae
FG
1384 // Don't use iter_::Seek() if we set a prefix extractor
1385 // because prefix seek will be used.
11fdf7f2 1386 if (prefix_extractor_ != nullptr && !total_order_seek_) {
7c673cae
FG
1387 max_skip_ = std::numeric_limits<uint64_t>::max();
1388 }
11fdf7f2 1389 status_ = Status::OK();
7c673cae
FG
1390 direction_ = kForward;
1391 ReleaseTempPinnedData();
1392 ResetInternalKeysSkippedCounter();
1393 ClearSavedValue();
1394
1395 {
1396 PERF_TIMER_GUARD(seek_internal_seek_time);
1397 iter_->SeekToFirst();
11fdf7f2 1398 range_del_agg_.InvalidateRangeDelMapPositions();
7c673cae
FG
1399 }
1400
1401 RecordTick(statistics_, NUMBER_DB_SEEK);
1402 if (iter_->Valid()) {
1403 saved_key_.SetUserKey(
1404 ExtractUserKey(iter_->key()),
1405 !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1406 FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
1407 if (statistics_ != nullptr) {
1408 if (valid_) {
1409 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1410 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
11fdf7f2 1411 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
7c673cae
FG
1412 }
1413 }
1414 } else {
1415 valid_ = false;
1416 }
1417 if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1418 prefix_start_buf_.SetUserKey(
1419 prefix_extractor_->Transform(saved_key_.GetUserKey()));
1420 prefix_start_key_ = prefix_start_buf_.GetUserKey();
1421 }
1422}
1423
1424void DBIter::SeekToLast() {
11fdf7f2
TL
1425 if (iterate_upper_bound_ != nullptr) {
1426 // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
1427 SeekForPrev(*iterate_upper_bound_);
1428 if (Valid() && user_comparator_->Equal(*iterate_upper_bound_, key())) {
1429 ReleaseTempPinnedData();
1430 PrevInternal();
1431 }
1432 return;
1433 }
1434
7c673cae
FG
1435 // Don't use iter_::Seek() if we set a prefix extractor
1436 // because prefix seek will be used.
11fdf7f2 1437 if (prefix_extractor_ != nullptr && !total_order_seek_) {
7c673cae
FG
1438 max_skip_ = std::numeric_limits<uint64_t>::max();
1439 }
11fdf7f2 1440 status_ = Status::OK();
7c673cae
FG
1441 direction_ = kReverse;
1442 ReleaseTempPinnedData();
1443 ResetInternalKeysSkippedCounter();
1444 ClearSavedValue();
1445
1446 {
1447 PERF_TIMER_GUARD(seek_internal_seek_time);
1448 iter_->SeekToLast();
11fdf7f2 1449 range_del_agg_.InvalidateRangeDelMapPositions();
7c673cae 1450 }
11fdf7f2 1451 PrevInternal();
7c673cae
FG
1452 if (statistics_ != nullptr) {
1453 RecordTick(statistics_, NUMBER_DB_SEEK);
1454 if (valid_) {
1455 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1456 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
11fdf7f2 1457 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
7c673cae
FG
1458 }
1459 }
1460 if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
1461 prefix_start_buf_.SetUserKey(
1462 prefix_extractor_->Transform(saved_key_.GetUserKey()));
1463 prefix_start_key_ = prefix_start_buf_.GetUserKey();
1464 }
1465}
1466
1467Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
1468 const ImmutableCFOptions& cf_options,
11fdf7f2 1469 const MutableCFOptions& mutable_cf_options,
7c673cae
FG
1470 const Comparator* user_key_comparator,
1471 InternalIterator* internal_iter,
1472 const SequenceNumber& sequence,
1473 uint64_t max_sequential_skip_in_iterations,
11fdf7f2
TL
1474 ReadCallback* read_callback, DBImpl* db_impl,
1475 ColumnFamilyData* cfd, bool allow_blob) {
7c673cae 1476 DBIter* db_iter = new DBIter(
11fdf7f2
TL
1477 env, read_options, cf_options, mutable_cf_options, user_key_comparator,
1478 internal_iter, sequence, false, max_sequential_skip_in_iterations,
1479 read_callback, db_impl, cfd, allow_blob);
7c673cae
FG
1480 return db_iter;
1481}
1482
1483ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
1484
7c673cae
FG
1485RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
1486 return db_iter_->GetRangeDelAggregator();
1487}
1488
1489void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
1490 static_cast<DBIter*>(db_iter_)->SetIter(iter);
1491}
1492
1493inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
1494inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
1495inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
1496inline void ArenaWrappedDBIter::Seek(const Slice& target) {
1497 db_iter_->Seek(target);
1498}
1499inline void ArenaWrappedDBIter::SeekForPrev(const Slice& target) {
1500 db_iter_->SeekForPrev(target);
1501}
1502inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
1503inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
1504inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
1505inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
1506inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
11fdf7f2 1507bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); }
7c673cae
FG
1508inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
1509 std::string* prop) {
11fdf7f2
TL
1510 if (prop_name == "rocksdb.iterator.super-version-number") {
1511 // First try to pass the value returned from inner iterator.
1512 if (!db_iter_->GetProperty(prop_name, prop).ok()) {
1513 *prop = ToString(sv_number_);
1514 }
1515 return Status::OK();
1516 }
7c673cae
FG
1517 return db_iter_->GetProperty(prop_name, prop);
1518}
11fdf7f2
TL
1519
1520void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
1521 const ImmutableCFOptions& cf_options,
1522 const MutableCFOptions& mutable_cf_options,
1523 const SequenceNumber& sequence,
1524 uint64_t max_sequential_skip_in_iteration,
1525 uint64_t version_number,
1526 ReadCallback* read_callback, DBImpl* db_impl,
1527 ColumnFamilyData* cfd, bool allow_blob,
1528 bool allow_refresh) {
1529 auto mem = arena_.AllocateAligned(sizeof(DBIter));
1530 db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options,
1531 cf_options.user_comparator, nullptr, sequence,
1532 true, max_sequential_skip_in_iteration,
1533 read_callback, db_impl, cfd, allow_blob);
1534 sv_number_ = version_number;
1535 allow_refresh_ = allow_refresh;
1536}
1537
1538Status ArenaWrappedDBIter::Refresh() {
1539 if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
1540 return Status::NotSupported("Creating renew iterator is not allowed.");
1541 }
1542 assert(db_iter_ != nullptr);
1543 // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
1544 // correct behavior. Will be corrected automatically when we take a snapshot
1545 // here for the case of WritePreparedTxnDB.
1546 SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
1547 uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
1548 if (sv_number_ != cur_sv_number) {
1549 Env* env = db_iter_->env();
1550 db_iter_->~DBIter();
1551 arena_.~Arena();
1552 new (&arena_) Arena();
1553
1554 SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
1555 Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
1556 latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
1557 cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
1558 allow_refresh_);
1559
1560 InternalIterator* internal_iter = db_impl_->NewInternalIterator(
1561 read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
1562 SetIterUnderDBIter(internal_iter);
1563 } else {
1564 db_iter_->set_sequence(latest_seq);
1565 db_iter_->set_valid(false);
1566 }
1567 return Status::OK();
7c673cae
FG
1568}
1569
1570ArenaWrappedDBIter* NewArenaWrappedDbIterator(
1571 Env* env, const ReadOptions& read_options,
11fdf7f2
TL
1572 const ImmutableCFOptions& cf_options,
1573 const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
1574 uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
1575 ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
1576 bool allow_blob, bool allow_refresh) {
7c673cae 1577 ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
11fdf7f2
TL
1578 iter->Init(env, read_options, cf_options, mutable_cf_options, sequence,
1579 max_sequential_skip_in_iterations, version_number, read_callback,
1580 db_impl, cfd, allow_blob, allow_refresh);
1581 if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
1582 iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback,
1583 allow_blob);
1584 }
7c673cae
FG
1585
1586 return iter;
1587}
1588
1589} // namespace rocksdb