]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_iter.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_iter.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_iter.h"
11
12 #include <iostream>
13 #include <limits>
14 #include <string>
15
16 #include "db/dbformat.h"
17 #include "db/merge_context.h"
18 #include "db/merge_helper.h"
19 #include "db/pinned_iterators_manager.h"
20 #include "db/wide/wide_column_serialization.h"
21 #include "file/filename.h"
22 #include "logging/logging.h"
23 #include "memory/arena.h"
24 #include "monitoring/perf_context_imp.h"
25 #include "rocksdb/env.h"
26 #include "rocksdb/iterator.h"
27 #include "rocksdb/merge_operator.h"
28 #include "rocksdb/options.h"
29 #include "rocksdb/system_clock.h"
30 #include "table/internal_iterator.h"
31 #include "table/iterator_wrapper.h"
32 #include "trace_replay/trace_replay.h"
33 #include "util/mutexlock.h"
34 #include "util/string_util.h"
35 #include "util/user_comparator_wrapper.h"
36
37 namespace ROCKSDB_NAMESPACE {
38
39 DBIter::DBIter(Env* _env, const ReadOptions& read_options,
40 const ImmutableOptions& ioptions,
41 const MutableCFOptions& mutable_cf_options,
42 const Comparator* cmp, InternalIterator* iter,
43 const Version* version, SequenceNumber s, bool arena_mode,
44 uint64_t max_sequential_skip_in_iterations,
45 ReadCallback* read_callback, DBImpl* db_impl,
46 ColumnFamilyData* cfd, bool expose_blob_index)
47 : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
48 env_(_env),
49 clock_(ioptions.clock),
50 logger_(ioptions.logger),
51 user_comparator_(cmp),
52 merge_operator_(ioptions.merge_operator.get()),
53 iter_(iter),
54 version_(version),
55 read_callback_(read_callback),
56 sequence_(s),
57 statistics_(ioptions.stats),
58 max_skip_(max_sequential_skip_in_iterations),
59 max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
60 num_internal_keys_skipped_(0),
61 iterate_lower_bound_(read_options.iterate_lower_bound),
62 iterate_upper_bound_(read_options.iterate_upper_bound),
63 direction_(kForward),
64 valid_(false),
65 current_entry_is_merged_(false),
66 is_key_seqnum_zero_(false),
67 prefix_same_as_start_(mutable_cf_options.prefix_extractor
68 ? read_options.prefix_same_as_start
69 : false),
70 pin_thru_lifetime_(read_options.pin_data),
71 expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
72 read_options.total_order_seek ||
73 read_options.auto_prefix_mode),
74 read_tier_(read_options.read_tier),
75 fill_cache_(read_options.fill_cache),
76 verify_checksums_(read_options.verify_checksums),
77 expose_blob_index_(expose_blob_index),
78 is_blob_(false),
79 arena_mode_(arena_mode),
80 db_impl_(db_impl),
81 cfd_(cfd),
82 timestamp_ub_(read_options.timestamp),
83 timestamp_lb_(read_options.iter_start_ts),
84 timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
85 RecordTick(statistics_, NO_ITERATOR_CREATED);
86 if (pin_thru_lifetime_) {
87 pinned_iters_mgr_.StartPinning();
88 }
89 if (iter_.iter()) {
90 iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
91 }
92 status_.PermitUncheckedError();
93 assert(timestamp_size_ ==
94 user_comparator_.user_comparator()->timestamp_size());
95 }
96
97 Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
98 if (prop == nullptr) {
99 return Status::InvalidArgument("prop is nullptr");
100 }
101 if (prop_name == "rocksdb.iterator.super-version-number") {
102 // First try to pass the value returned from inner iterator.
103 return iter_.iter()->GetProperty(prop_name, prop);
104 } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
105 if (valid_) {
106 *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
107 } else {
108 *prop = "Iterator is not valid.";
109 }
110 return Status::OK();
111 } else if (prop_name == "rocksdb.iterator.internal-key") {
112 *prop = saved_key_.GetUserKey().ToString();
113 return Status::OK();
114 }
115 return Status::InvalidArgument("Unidentified property.");
116 }
117
118 bool DBIter::ParseKey(ParsedInternalKey* ikey) {
119 Status s = ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);
120 if (!s.ok()) {
121 status_ = Status::Corruption("In DBIter: ", s.getState());
122 valid_ = false;
123 ROCKS_LOG_ERROR(logger_, "In DBIter: %s", status_.getState());
124 return false;
125 } else {
126 return true;
127 }
128 }
129
130 void DBIter::Next() {
131 assert(valid_);
132 assert(status_.ok());
133
134 PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_);
135 // Release temporarily pinned blocks from last operation
136 ReleaseTempPinnedData();
137 ResetBlobValue();
138 ResetValueAndColumns();
139 local_stats_.skip_count_ += num_internal_keys_skipped_;
140 local_stats_.skip_count_--;
141 num_internal_keys_skipped_ = 0;
142 bool ok = true;
143 if (direction_ == kReverse) {
144 is_key_seqnum_zero_ = false;
145 if (!ReverseToForward()) {
146 ok = false;
147 }
148 } else if (!current_entry_is_merged_) {
149 // If the current value is not a merge, the iter position is the
150 // current key, which is already returned. We can safely issue a
151 // Next() without checking the current key.
152 // If the current key is a merge, very likely iter already points
153 // to the next internal position.
154 assert(iter_.Valid());
155 iter_.Next();
156 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
157 }
158
159 local_stats_.next_count_++;
160 if (ok && iter_.Valid()) {
161 ClearSavedValue();
162
163 if (prefix_same_as_start_) {
164 assert(prefix_extractor_ != nullptr);
165 const Slice prefix = prefix_.GetUserKey();
166 FindNextUserEntry(true /* skipping the current user key */, &prefix);
167 } else {
168 FindNextUserEntry(true /* skipping the current user key */, nullptr);
169 }
170 } else {
171 is_key_seqnum_zero_ = false;
172 valid_ = false;
173 }
174 if (statistics_ != nullptr && valid_) {
175 local_stats_.next_found_count_++;
176 local_stats_.bytes_read_ += (key().size() + value().size());
177 }
178 }
179
180 bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
181 const Slice& blob_index) {
182 assert(!is_blob_);
183 assert(blob_value_.empty());
184
185 if (expose_blob_index_) { // Stacked BlobDB implementation
186 is_blob_ = true;
187 return true;
188 }
189
190 if (!version_) {
191 status_ = Status::Corruption("Encountered unexpected blob index.");
192 valid_ = false;
193 return false;
194 }
195
196 // TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
197 // avoid having to copy options back and forth.
198 ReadOptions read_options;
199 read_options.read_tier = read_tier_;
200 read_options.fill_cache = fill_cache_;
201 read_options.verify_checksums = verify_checksums_;
202
203 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
204 constexpr uint64_t* bytes_read = nullptr;
205
206 const Status s = version_->GetBlob(read_options, user_key, blob_index,
207 prefetch_buffer, &blob_value_, bytes_read);
208
209 if (!s.ok()) {
210 status_ = s;
211 valid_ = false;
212 return false;
213 }
214
215 is_blob_ = true;
216 return true;
217 }
218
219 bool DBIter::SetValueAndColumnsFromEntity(Slice slice) {
220 assert(value_.empty());
221 assert(wide_columns_.empty());
222
223 const Status s = WideColumnSerialization::Deserialize(slice, wide_columns_);
224
225 if (!s.ok()) {
226 status_ = s;
227 valid_ = false;
228 return false;
229 }
230
231 if (!wide_columns_.empty() &&
232 wide_columns_[0].name() == kDefaultWideColumnName) {
233 value_ = wide_columns_[0].value();
234 }
235
236 return true;
237 }
238
239 // PRE: saved_key_ has the current user key if skipping_saved_key
240 // POST: saved_key_ should have the next user key if valid_,
241 // if the current entry is a result of merge
242 // current_entry_is_merged_ => true
243 // saved_value_ => the merged value
244 //
245 // NOTE: In between, saved_key_ can point to a user key that has
246 // a delete marker or a sequence number higher than sequence_
247 // saved_key_ MUST have a proper user_key before calling this function
248 //
249 // The prefix parameter, if not null, indicates that we need to iterate
250 // within the prefix, and the iterator needs to be made invalid, if no
251 // more entry for the prefix can be found.
252 bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
253 PERF_TIMER_GUARD(find_next_user_entry_time);
254 return FindNextUserEntryInternal(skipping_saved_key, prefix);
255 }
256
257 // Actual implementation of DBIter::FindNextUserEntry()
258 bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
259 const Slice* prefix) {
260 // Loop until we hit an acceptable entry to yield
261 assert(iter_.Valid());
262 assert(status_.ok());
263 assert(direction_ == kForward);
264 current_entry_is_merged_ = false;
265
266 // How many times in a row we have skipped an entry with user key less than
267 // or equal to saved_key_. We could skip these entries either because
268 // sequence numbers were too high or because skipping_saved_key = true.
269 // What saved_key_ contains throughout this method:
270 // - if skipping_saved_key : saved_key_ contains the key that we need
271 // to skip, and we haven't seen any keys greater
272 // than that,
273 // - if num_skipped > 0 : saved_key_ contains the key that we have skipped
274 // num_skipped times, and we haven't seen any keys
275 // greater than that,
276 // - none of the above : saved_key_ can contain anything, it doesn't
277 // matter.
278 uint64_t num_skipped = 0;
279 // For write unprepared, the target sequence number in reseek could be larger
280 // than the snapshot, and thus needs to be skipped again. This could result in
281 // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
282 // to one.
283 bool reseek_done = false;
284
285 do {
286 // Will update is_key_seqnum_zero_ as soon as we parsed the current key
287 // but we need to save the previous value to be used in the loop.
288 bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
289 if (!ParseKey(&ikey_)) {
290 is_key_seqnum_zero_ = false;
291 return false;
292 }
293 Slice user_key_without_ts =
294 StripTimestampFromUserKey(ikey_.user_key, timestamp_size_);
295
296 is_key_seqnum_zero_ = (ikey_.sequence == 0);
297
298 assert(iterate_upper_bound_ == nullptr ||
299 iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound ||
300 user_comparator_.CompareWithoutTimestamp(
301 user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
302 /*b_has_ts=*/false) < 0);
303 if (iterate_upper_bound_ != nullptr &&
304 iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound &&
305 user_comparator_.CompareWithoutTimestamp(
306 user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
307 /*b_has_ts=*/false) >= 0) {
308 break;
309 }
310
311 assert(prefix == nullptr || prefix_extractor_ != nullptr);
312 if (prefix != nullptr &&
313 prefix_extractor_->Transform(user_key_without_ts).compare(*prefix) !=
314 0) {
315 assert(prefix_same_as_start_);
316 break;
317 }
318
319 if (TooManyInternalKeysSkipped()) {
320 return false;
321 }
322
323 assert(ikey_.user_key.size() >= timestamp_size_);
324 Slice ts = timestamp_size_ > 0 ? ExtractTimestampFromUserKey(
325 ikey_.user_key, timestamp_size_)
326 : Slice();
327 bool more_recent = false;
328 if (IsVisible(ikey_.sequence, ts, &more_recent)) {
329 // If the previous entry is of seqnum 0, the current entry will not
330 // possibly be skipped. This condition can potentially be relaxed to
331 // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
332 // prone to bugs causing the same user key with the same sequence number.
333 // Note that with current timestamp implementation, the same user key can
334 // have different timestamps and zero sequence number on the bottommost
335 // level. This may change in the future.
336 if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) &&
337 skipping_saved_key &&
338 CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
339 num_skipped++; // skip this entry
340 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
341 } else {
342 assert(!skipping_saved_key ||
343 CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0);
344 if (!iter_.PrepareValue()) {
345 assert(!iter_.status().ok());
346 valid_ = false;
347 return false;
348 }
349 num_skipped = 0;
350 reseek_done = false;
351 switch (ikey_.type) {
352 case kTypeDeletion:
353 case kTypeDeletionWithTimestamp:
354 case kTypeSingleDeletion:
355 // Arrange to skip all upcoming entries for this key since
356 // they are hidden by this deletion.
357 if (timestamp_lb_) {
358 saved_key_.SetInternalKey(ikey_);
359 valid_ = true;
360 return true;
361 } else {
362 saved_key_.SetUserKey(
363 ikey_.user_key, !pin_thru_lifetime_ ||
364 !iter_.iter()->IsKeyPinned() /* copy */);
365 skipping_saved_key = true;
366 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
367 }
368 break;
369 case kTypeValue:
370 case kTypeBlobIndex:
371 case kTypeWideColumnEntity:
372 if (timestamp_lb_) {
373 saved_key_.SetInternalKey(ikey_);
374 } else {
375 saved_key_.SetUserKey(
376 ikey_.user_key, !pin_thru_lifetime_ ||
377 !iter_.iter()->IsKeyPinned() /* copy */);
378 }
379
380 if (ikey_.type == kTypeBlobIndex) {
381 if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
382 return false;
383 }
384
385 SetValueAndColumnsFromPlain(expose_blob_index_ ? iter_.value()
386 : blob_value_);
387 } else if (ikey_.type == kTypeWideColumnEntity) {
388 if (!SetValueAndColumnsFromEntity(iter_.value())) {
389 return false;
390 }
391 } else {
392 assert(ikey_.type == kTypeValue);
393 SetValueAndColumnsFromPlain(iter_.value());
394 }
395
396 valid_ = true;
397 return true;
398 break;
399 case kTypeMerge:
400 saved_key_.SetUserKey(
401 ikey_.user_key,
402 !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
403 // By now, we are sure the current ikey is going to yield a value
404 current_entry_is_merged_ = true;
405 valid_ = true;
406 return MergeValuesNewToOld(); // Go to a different state machine
407 break;
408 default:
409 valid_ = false;
410 status_ = Status::Corruption(
411 "Unknown value type: " +
412 std::to_string(static_cast<unsigned int>(ikey_.type)));
413 return false;
414 }
415 }
416 } else {
417 if (more_recent) {
418 PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
419 }
420
421 // This key was inserted after our snapshot was taken or skipped by
422 // timestamp range. If this happens too many times in a row for the same
423 // user key, we want to seek to the target sequence number.
424 int cmp = user_comparator_.CompareWithoutTimestamp(
425 ikey_.user_key, saved_key_.GetUserKey());
426 if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
427 num_skipped++;
428 } else {
429 saved_key_.SetUserKey(
430 ikey_.user_key,
431 !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
432 skipping_saved_key = false;
433 num_skipped = 0;
434 reseek_done = false;
435 }
436 }
437
438 // If we have sequentially iterated via numerous equal keys, then it's
439 // better to seek so that we can avoid too many key comparisons.
440 //
441 // To avoid infinite loops, do not reseek if we have already attempted to
442 // reseek previously.
443 //
444 // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
445 // then it does not make sense to reseek as we would actually land further
446 // away from the desired key. There is opportunity for optimization here.
447 if (num_skipped > max_skip_ && !reseek_done) {
448 is_key_seqnum_zero_ = false;
449 num_skipped = 0;
450 reseek_done = true;
451 std::string last_key;
452 if (skipping_saved_key) {
453 // We're looking for the next user-key but all we see are the same
454 // user-key with decreasing sequence numbers. Fast forward to
455 // sequence number 0 and type deletion (the smallest type).
456 if (timestamp_size_ == 0) {
457 AppendInternalKey(
458 &last_key,
459 ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
460 } else {
461 const std::string kTsMin(timestamp_size_, '\0');
462 AppendInternalKeyWithDifferentTimestamp(
463 &last_key,
464 ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
465 kTsMin);
466 }
467 // Don't set skipping_saved_key = false because we may still see more
468 // user-keys equal to saved_key_.
469 } else {
470 // We saw multiple entries with this user key and sequence numbers
471 // higher than sequence_. Fast forward to sequence_.
472 // Note that this only covers a case when a higher key was overwritten
473 // many times since our snapshot was taken, not the case when a lot of
474 // different keys were inserted after our snapshot was taken.
475 if (timestamp_size_ == 0) {
476 AppendInternalKey(
477 &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
478 kValueTypeForSeek));
479 } else {
480 AppendInternalKeyWithDifferentTimestamp(
481 &last_key,
482 ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
483 kValueTypeForSeek),
484 *timestamp_ub_);
485 }
486 }
487 iter_.Seek(last_key);
488 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
489 } else {
490 iter_.Next();
491 }
492 } while (iter_.Valid());
493
494 valid_ = false;
495 return iter_.status().ok();
496 }
497
498 // Merge values of the same user key starting from the current iter_ position
499 // Scan from the newer entries to older entries.
500 // PRE: iter_.key() points to the first merge type entry
501 // saved_key_ stores the user key
502 // iter_.PrepareValue() has been called
503 // POST: saved_value_ has the merged value for the user key
504 // iter_ points to the next entry (or invalid)
505 bool DBIter::MergeValuesNewToOld() {
506 if (!merge_operator_) {
507 ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
508 status_ = Status::InvalidArgument("merge_operator_ must be set.");
509 valid_ = false;
510 return false;
511 }
512
513 // Temporarily pin the blocks that hold merge operands
514 TempPinData();
515 merge_context_.Clear();
516 // Start the merge process by pushing the first operand
517 merge_context_.PushOperand(
518 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
519 TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
520
521 ParsedInternalKey ikey;
522 for (iter_.Next(); iter_.Valid(); iter_.Next()) {
523 TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
524 if (!ParseKey(&ikey)) {
525 return false;
526 }
527
528 if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
529 saved_key_.GetUserKey())) {
530 // hit the next user key, stop right here
531 break;
532 }
533 if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
534 kTypeDeletionWithTimestamp == ikey.type) {
535 // hit a delete with the same user key, stop right here
536 // iter_ is positioned after delete
537 iter_.Next();
538 break;
539 }
540 if (!iter_.PrepareValue()) {
541 valid_ = false;
542 return false;
543 }
544
545 if (kTypeValue == ikey.type) {
546 // hit a put, merge the put value with operands and store the
547 // final result in saved_value_. We are done!
548 const Slice val = iter_.value();
549 if (!Merge(&val, ikey.user_key)) {
550 return false;
551 }
552 // iter_ is positioned after put
553 iter_.Next();
554 if (!iter_.status().ok()) {
555 valid_ = false;
556 return false;
557 }
558 return true;
559 } else if (kTypeMerge == ikey.type) {
560 // hit a merge, add the value as an operand and run associative merge.
561 // when complete, add result to operands and continue.
562 merge_context_.PushOperand(
563 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
564 PERF_COUNTER_ADD(internal_merge_count, 1);
565 } else if (kTypeBlobIndex == ikey.type) {
566 if (expose_blob_index_) {
567 status_ =
568 Status::NotSupported("BlobDB does not support merge operator.");
569 valid_ = false;
570 return false;
571 }
572 // hit a put, merge the put value with operands and store the
573 // final result in saved_value_. We are done!
574 if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
575 return false;
576 }
577 valid_ = true;
578 if (!Merge(&blob_value_, ikey.user_key)) {
579 return false;
580 }
581
582 ResetBlobValue();
583
584 // iter_ is positioned after put
585 iter_.Next();
586 if (!iter_.status().ok()) {
587 valid_ = false;
588 return false;
589 }
590 return true;
591 } else if (kTypeWideColumnEntity == ikey.type) {
592 if (!MergeEntity(iter_.value(), ikey.user_key)) {
593 return false;
594 }
595
596 // iter_ is positioned after put
597 iter_.Next();
598 if (!iter_.status().ok()) {
599 valid_ = false;
600 return false;
601 }
602
603 return true;
604 } else {
605 valid_ = false;
606 status_ = Status::Corruption(
607 "Unrecognized value type: " +
608 std::to_string(static_cast<unsigned int>(ikey.type)));
609 return false;
610 }
611 }
612
613 if (!iter_.status().ok()) {
614 valid_ = false;
615 return false;
616 }
617
618 // we either exhausted all internal keys under this user key, or hit
619 // a deletion marker.
620 // feed null as the existing value to the merge operator, such that
621 // client can differentiate this scenario and do things accordingly.
622 if (!Merge(nullptr, saved_key_.GetUserKey())) {
623 return false;
624 }
625 assert(status_.ok());
626 return true;
627 }
628
629 void DBIter::Prev() {
630 assert(valid_);
631 assert(status_.ok());
632
633 PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_);
634 ReleaseTempPinnedData();
635 ResetBlobValue();
636 ResetValueAndColumns();
637 ResetInternalKeysSkippedCounter();
638 bool ok = true;
639 if (direction_ == kForward) {
640 if (!ReverseToBackward()) {
641 ok = false;
642 }
643 }
644 if (ok) {
645 ClearSavedValue();
646
647 Slice prefix;
648 if (prefix_same_as_start_) {
649 assert(prefix_extractor_ != nullptr);
650 prefix = prefix_.GetUserKey();
651 }
652 PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
653 }
654
655 if (statistics_ != nullptr) {
656 local_stats_.prev_count_++;
657 if (valid_) {
658 local_stats_.prev_found_count_++;
659 local_stats_.bytes_read_ += (key().size() + value().size());
660 }
661 }
662 }
663
664 bool DBIter::ReverseToForward() {
665 assert(iter_.status().ok());
666
667 // When moving backwards, iter_ is positioned on _previous_ key, which may
668 // not exist or may have different prefix than the current key().
669 // If that's the case, seek iter_ to current key.
670 if (!expect_total_order_inner_iter() || !iter_.Valid()) {
671 IterKey last_key;
672 ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber,
673 kValueTypeForSeek);
674 if (timestamp_size_ > 0) {
675 // TODO: pre-create kTsMax.
676 const std::string kTsMax(timestamp_size_, '\xff');
677 pikey.SetTimestamp(kTsMax);
678 }
679 last_key.SetInternalKey(pikey);
680 iter_.Seek(last_key.GetInternalKey());
681 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
682 }
683
684 direction_ = kForward;
685 // Skip keys less than the current key() (a.k.a. saved_key_).
686 while (iter_.Valid()) {
687 ParsedInternalKey ikey;
688 if (!ParseKey(&ikey)) {
689 return false;
690 }
691 if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
692 return true;
693 }
694 iter_.Next();
695 }
696
697 if (!iter_.status().ok()) {
698 valid_ = false;
699 return false;
700 }
701
702 return true;
703 }
704
705 // Move iter_ to the key before saved_key_.
706 bool DBIter::ReverseToBackward() {
707 assert(iter_.status().ok());
708
709 // When current_entry_is_merged_ is true, iter_ may be positioned on the next
710 // key, which may not exist or may have prefix different from current.
711 // If that's the case, seek to saved_key_.
712 if (current_entry_is_merged_ &&
713 (!expect_total_order_inner_iter() || !iter_.Valid())) {
714 IterKey last_key;
715 // Using kMaxSequenceNumber and kValueTypeForSeek
716 // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
717 // than saved_key_.
718 last_key.SetInternalKey(ParsedInternalKey(
719 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
720 if (!expect_total_order_inner_iter()) {
721 iter_.SeekForPrev(last_key.GetInternalKey());
722 } else {
723 // Some iterators may not support SeekForPrev(), so we avoid using it
724 // when prefix seek mode is disabled. This is somewhat expensive
725 // (an extra Prev(), as well as an extra change of direction of iter_),
726 // so we may need to reconsider it later.
727 iter_.Seek(last_key.GetInternalKey());
728 if (!iter_.Valid() && iter_.status().ok()) {
729 iter_.SeekToLast();
730 }
731 }
732 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
733 }
734
735 direction_ = kReverse;
736 return FindUserKeyBeforeSavedKey();
737 }
738
739 void DBIter::PrevInternal(const Slice* prefix) {
740 while (iter_.Valid()) {
741 saved_key_.SetUserKey(
742 ExtractUserKey(iter_.key()),
743 !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
744
745 assert(prefix == nullptr || prefix_extractor_ != nullptr);
746 if (prefix != nullptr &&
747 prefix_extractor_
748 ->Transform(StripTimestampFromUserKey(saved_key_.GetUserKey(),
749 timestamp_size_))
750 .compare(*prefix) != 0) {
751 assert(prefix_same_as_start_);
752 // Current key does not have the same prefix as start
753 valid_ = false;
754 return;
755 }
756
757 assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
758 user_comparator_.CompareWithoutTimestamp(
759 saved_key_.GetUserKey(), /*a_has_ts=*/true,
760 *iterate_lower_bound_, /*b_has_ts=*/false) >= 0);
761 if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
762 user_comparator_.CompareWithoutTimestamp(
763 saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
764 /*b_has_ts=*/false) < 0) {
765 // We've iterated earlier than the user-specified lower bound.
766 valid_ = false;
767 return;
768 }
769
770 if (!FindValueForCurrentKey()) { // assigns valid_
771 return;
772 }
773
774 // Whether or not we found a value for current key, we need iter_ to end up
775 // on a smaller key.
776 if (!FindUserKeyBeforeSavedKey()) {
777 return;
778 }
779
780 if (valid_) {
781 // Found the value.
782 return;
783 }
784
785 if (TooManyInternalKeysSkipped(false)) {
786 return;
787 }
788 }
789
790 // We haven't found any key - iterator is not valid
791 valid_ = false;
792 }
793
794 // Used for backwards iteration.
795 // Looks at the entries with user key saved_key_ and finds the most up-to-date
796 // value for it, or executes a merge, or determines that the value was deleted.
797 // Sets valid_ to true if the value is found and is ready to be presented to
798 // the user through value().
799 // Sets valid_ to false if the value was deleted, and we should try another key.
800 // Returns false if an error occurred, and !status().ok() and !valid_.
801 //
802 // PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
803 // POST: iter_ is positioned on one of the entries equal to saved_key_, or on
804 // the entry just before them, or on the entry just after them.
805 bool DBIter::FindValueForCurrentKey() {
806 assert(iter_.Valid());
807 merge_context_.Clear();
808 current_entry_is_merged_ = false;
809 // last entry before merge (could be kTypeDeletion,
810 // kTypeDeletionWithTimestamp, kTypeSingleDeletion, kTypeValue,
811 // kTypeBlobIndex, or kTypeWideColumnEntity)
812 ValueType last_not_merge_type = kTypeDeletion;
813 ValueType last_key_entry_type = kTypeDeletion;
814
815 // If false, it indicates that we have not seen any valid entry, even though
816 // last_key_entry_type is initialized to kTypeDeletion.
817 bool valid_entry_seen = false;
818
819 // Temporarily pin blocks that hold (merge operands / the value)
820 ReleaseTempPinnedData();
821 TempPinData();
822 size_t num_skipped = 0;
823 while (iter_.Valid()) {
824 ParsedInternalKey ikey;
825 if (!ParseKey(&ikey)) {
826 return false;
827 }
828
829 if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
830 saved_key_.GetUserKey())) {
831 // Found a smaller user key, thus we are done with current user key.
832 break;
833 }
834
835 assert(ikey.user_key.size() >= timestamp_size_);
836 Slice ts;
837 if (timestamp_size_ > 0) {
838 ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
839 timestamp_size_);
840 }
841
842 bool visible = IsVisible(ikey.sequence, ts);
843 if (!visible &&
844 (timestamp_lb_ == nullptr ||
845 user_comparator_.CompareTimestamp(ts, *timestamp_ub_) > 0)) {
846 // Found an invisible version of the current user key, and it must have
847 // a higher sequence number or timestamp. Therefore, we are done with the
848 // current user key.
849 break;
850 }
851
852 if (!ts.empty()) {
853 saved_timestamp_.assign(ts.data(), ts.size());
854 }
855
856 if (TooManyInternalKeysSkipped()) {
857 return false;
858 }
859
860 // This user key has lots of entries.
861 // We're going from old to new, and it's taking too long. Let's do a Seek()
862 // and go from new to old. This helps when a key was overwritten many times.
863 if (num_skipped >= max_skip_) {
864 return FindValueForCurrentKeyUsingSeek();
865 }
866
867 if (!iter_.PrepareValue()) {
868 valid_ = false;
869 return false;
870 }
871
872 if (timestamp_lb_ != nullptr) {
873 // Only needed when timestamp_lb_ is not null
874 [[maybe_unused]] const bool ret = ParseKey(&ikey_);
875 saved_ikey_.assign(iter_.key().data(), iter_.key().size());
876 // Since the preceding ParseKey(&ikey) succeeds, so must this.
877 assert(ret);
878 }
879
880 valid_entry_seen = true;
881 last_key_entry_type = ikey.type;
882 switch (last_key_entry_type) {
883 case kTypeValue:
884 case kTypeBlobIndex:
885 case kTypeWideColumnEntity:
886 if (iter_.iter()->IsValuePinned()) {
887 pinned_value_ = iter_.value();
888 } else {
889 valid_ = false;
890 status_ = Status::NotSupported(
891 "Backward iteration not supported if underlying iterator's value "
892 "cannot be pinned.");
893 }
894 merge_context_.Clear();
895 last_not_merge_type = last_key_entry_type;
896 if (!status_.ok()) {
897 return false;
898 }
899 break;
900 case kTypeDeletion:
901 case kTypeDeletionWithTimestamp:
902 case kTypeSingleDeletion:
903 merge_context_.Clear();
904 last_not_merge_type = last_key_entry_type;
905 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
906 break;
907 case kTypeMerge: {
908 assert(merge_operator_ != nullptr);
909 merge_context_.PushOperandBack(
910 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
911 PERF_COUNTER_ADD(internal_merge_count, 1);
912 } break;
913 default:
914 valid_ = false;
915 status_ = Status::Corruption(
916 "Unknown value type: " +
917 std::to_string(static_cast<unsigned int>(last_key_entry_type)));
918 return false;
919 }
920
921 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
922 iter_.Prev();
923 ++num_skipped;
924
925 if (visible && timestamp_lb_ != nullptr) {
926 // If timestamp_lb_ is not nullptr, we do not have to look further for
927 // another internal key. We can return this current internal key. Yet we
928 // still keep the invariant that iter_ is positioned before the returned
929 // key.
930 break;
931 }
932 }
933
934 if (!iter_.status().ok()) {
935 valid_ = false;
936 return false;
937 }
938
939 if (!valid_entry_seen) {
940 // Since we haven't seen any valid entry, last_key_entry_type remains
941 // unchanged and the same as its initial value.
942 assert(last_key_entry_type == kTypeDeletion);
943 assert(last_not_merge_type == kTypeDeletion);
944 valid_ = false;
945 return true;
946 }
947
948 if (timestamp_lb_ != nullptr) {
949 assert(last_key_entry_type == ikey_.type);
950 }
951
952 Status s;
953 s.PermitUncheckedError();
954
955 switch (last_key_entry_type) {
956 case kTypeDeletion:
957 case kTypeDeletionWithTimestamp:
958 case kTypeSingleDeletion:
959 if (timestamp_lb_ == nullptr) {
960 valid_ = false;
961 } else {
962 saved_key_.SetInternalKey(saved_ikey_);
963 valid_ = true;
964 }
965 return true;
966 case kTypeMerge:
967 current_entry_is_merged_ = true;
968 if (last_not_merge_type == kTypeDeletion ||
969 last_not_merge_type == kTypeSingleDeletion ||
970 last_not_merge_type == kTypeDeletionWithTimestamp) {
971 if (!Merge(nullptr, saved_key_.GetUserKey())) {
972 return false;
973 }
974 return true;
975 } else if (last_not_merge_type == kTypeBlobIndex) {
976 if (expose_blob_index_) {
977 status_ =
978 Status::NotSupported("BlobDB does not support merge operator.");
979 valid_ = false;
980 return false;
981 }
982 if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
983 return false;
984 }
985 valid_ = true;
986 if (!Merge(&blob_value_, saved_key_.GetUserKey())) {
987 return false;
988 }
989
990 ResetBlobValue();
991
992 return true;
993 } else if (last_not_merge_type == kTypeWideColumnEntity) {
994 if (!MergeEntity(pinned_value_, saved_key_.GetUserKey())) {
995 return false;
996 }
997
998 return true;
999 } else {
1000 assert(last_not_merge_type == kTypeValue);
1001 if (!Merge(&pinned_value_, saved_key_.GetUserKey())) {
1002 return false;
1003 }
1004 return true;
1005 }
1006 break;
1007 case kTypeValue:
1008 if (timestamp_lb_ != nullptr) {
1009 saved_key_.SetInternalKey(saved_ikey_);
1010 }
1011
1012 SetValueAndColumnsFromPlain(pinned_value_);
1013
1014 break;
1015 case kTypeBlobIndex:
1016 if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
1017 return false;
1018 }
1019
1020 SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
1021 : blob_value_);
1022
1023 break;
1024 case kTypeWideColumnEntity:
1025 if (!SetValueAndColumnsFromEntity(pinned_value_)) {
1026 return false;
1027 }
1028 break;
1029 default:
1030 valid_ = false;
1031 status_ = Status::Corruption(
1032 "Unknown value type: " +
1033 std::to_string(static_cast<unsigned int>(last_key_entry_type)));
1034 return false;
1035 }
1036 if (!s.ok()) {
1037 valid_ = false;
1038 status_ = s;
1039 return false;
1040 }
1041 valid_ = true;
1042 return true;
1043 }
1044
1045 // This function is used in FindValueForCurrentKey.
1046 // We use Seek() function instead of Prev() to find necessary value
1047 // TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
1048 // Would be nice to reuse some code.
1049 bool DBIter::FindValueForCurrentKeyUsingSeek() {
1050 // FindValueForCurrentKey will enable pinning before calling
1051 // FindValueForCurrentKeyUsingSeek()
1052 assert(pinned_iters_mgr_.PinningEnabled());
1053 std::string last_key;
1054 if (0 == timestamp_size_) {
1055 AppendInternalKey(&last_key,
1056 ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
1057 kValueTypeForSeek));
1058 } else {
1059 AppendInternalKeyWithDifferentTimestamp(
1060 &last_key,
1061 ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
1062 kValueTypeForSeek),
1063 timestamp_lb_ == nullptr ? *timestamp_ub_ : *timestamp_lb_);
1064 }
1065 iter_.Seek(last_key);
1066 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1067
1068 // In case read_callback presents, the value we seek to may not be visible.
1069 // Find the next value that's visible.
1070 ParsedInternalKey ikey;
1071
1072 while (true) {
1073 if (!iter_.Valid()) {
1074 valid_ = false;
1075 return iter_.status().ok();
1076 }
1077
1078 if (!ParseKey(&ikey)) {
1079 return false;
1080 }
1081 assert(ikey.user_key.size() >= timestamp_size_);
1082 Slice ts;
1083 if (timestamp_size_ > 0) {
1084 ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
1085 timestamp_size_);
1086 }
1087
1088 if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
1089 saved_key_.GetUserKey())) {
1090 // No visible values for this key, even though FindValueForCurrentKey()
1091 // has seen some. This is possible if we're using a tailing iterator, and
1092 // the entries were discarded in a compaction.
1093 valid_ = false;
1094 return true;
1095 }
1096
1097 if (IsVisible(ikey.sequence, ts)) {
1098 break;
1099 }
1100
1101 iter_.Next();
1102 }
1103
1104 if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
1105 kTypeDeletionWithTimestamp == ikey.type) {
1106 if (timestamp_lb_ == nullptr) {
1107 valid_ = false;
1108 } else {
1109 valid_ = true;
1110 saved_key_.SetInternalKey(ikey);
1111 }
1112 return true;
1113 }
1114 if (!iter_.PrepareValue()) {
1115 valid_ = false;
1116 return false;
1117 }
1118 if (timestamp_size_ > 0) {
1119 Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_);
1120 saved_timestamp_.assign(ts.data(), ts.size());
1121 }
1122 if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex ||
1123 ikey.type == kTypeWideColumnEntity) {
1124 assert(iter_.iter()->IsValuePinned());
1125 pinned_value_ = iter_.value();
1126 if (ikey.type == kTypeBlobIndex) {
1127 if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
1128 return false;
1129 }
1130
1131 SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
1132 : blob_value_);
1133 } else if (ikey.type == kTypeWideColumnEntity) {
1134 if (!SetValueAndColumnsFromEntity(pinned_value_)) {
1135 return false;
1136 }
1137 } else {
1138 assert(ikey.type == kTypeValue);
1139 SetValueAndColumnsFromPlain(pinned_value_);
1140 }
1141
1142 if (timestamp_lb_ != nullptr) {
1143 saved_key_.SetInternalKey(ikey);
1144 }
1145
1146 valid_ = true;
1147 return true;
1148 }
1149
1150 // kTypeMerge. We need to collect all kTypeMerge values and save them
1151 // in operands
1152 assert(ikey.type == kTypeMerge);
1153 current_entry_is_merged_ = true;
1154 merge_context_.Clear();
1155 merge_context_.PushOperand(
1156 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
1157 while (true) {
1158 iter_.Next();
1159
1160 if (!iter_.Valid()) {
1161 if (!iter_.status().ok()) {
1162 valid_ = false;
1163 return false;
1164 }
1165 break;
1166 }
1167 if (!ParseKey(&ikey)) {
1168 return false;
1169 }
1170 if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
1171 saved_key_.GetUserKey())) {
1172 break;
1173 }
1174 if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
1175 ikey.type == kTypeDeletionWithTimestamp) {
1176 break;
1177 }
1178 if (!iter_.PrepareValue()) {
1179 valid_ = false;
1180 return false;
1181 }
1182
1183 if (ikey.type == kTypeValue) {
1184 const Slice val = iter_.value();
1185 if (!Merge(&val, saved_key_.GetUserKey())) {
1186 return false;
1187 }
1188 return true;
1189 } else if (ikey.type == kTypeMerge) {
1190 merge_context_.PushOperand(
1191 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
1192 PERF_COUNTER_ADD(internal_merge_count, 1);
1193 } else if (ikey.type == kTypeBlobIndex) {
1194 if (expose_blob_index_) {
1195 status_ =
1196 Status::NotSupported("BlobDB does not support merge operator.");
1197 valid_ = false;
1198 return false;
1199 }
1200 if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
1201 return false;
1202 }
1203 valid_ = true;
1204 if (!Merge(&blob_value_, saved_key_.GetUserKey())) {
1205 return false;
1206 }
1207
1208 ResetBlobValue();
1209
1210 return true;
1211 } else if (ikey.type == kTypeWideColumnEntity) {
1212 if (!MergeEntity(iter_.value(), saved_key_.GetUserKey())) {
1213 return false;
1214 }
1215
1216 return true;
1217 } else {
1218 valid_ = false;
1219 status_ = Status::Corruption(
1220 "Unknown value type: " +
1221 std::to_string(static_cast<unsigned int>(ikey.type)));
1222 return false;
1223 }
1224 }
1225
1226 if (!Merge(nullptr, saved_key_.GetUserKey())) {
1227 return false;
1228 }
1229
1230 // Make sure we leave iter_ in a good state. If it's valid and we don't care
1231 // about prefixes, that's already good enough. Otherwise it needs to be
1232 // seeked to the current key.
1233 if (!expect_total_order_inner_iter() || !iter_.Valid()) {
1234 if (!expect_total_order_inner_iter()) {
1235 iter_.SeekForPrev(last_key);
1236 } else {
1237 iter_.Seek(last_key);
1238 if (!iter_.Valid() && iter_.status().ok()) {
1239 iter_.SeekToLast();
1240 }
1241 }
1242 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1243 }
1244
1245 valid_ = true;
1246 return true;
1247 }
1248
1249 bool DBIter::Merge(const Slice* val, const Slice& user_key) {
1250 Status s = MergeHelper::TimedFullMerge(
1251 merge_operator_, user_key, val, merge_context_.GetOperands(),
1252 &saved_value_, logger_, statistics_, clock_, &pinned_value_,
1253 /* update_num_ops_stats */ true);
1254 if (!s.ok()) {
1255 valid_ = false;
1256 status_ = s;
1257 return false;
1258 }
1259
1260 SetValueAndColumnsFromPlain(pinned_value_.data() ? pinned_value_
1261 : saved_value_);
1262
1263 valid_ = true;
1264 return true;
1265 }
1266
1267 bool DBIter::MergeEntity(const Slice& entity, const Slice& user_key) {
1268 Status s = MergeHelper::TimedFullMergeWithEntity(
1269 merge_operator_, user_key, entity, merge_context_.GetOperands(),
1270 &saved_value_, logger_, statistics_, clock_,
1271 /* update_num_ops_stats */ true);
1272 if (!s.ok()) {
1273 valid_ = false;
1274 status_ = s;
1275 return false;
1276 }
1277
1278 if (!SetValueAndColumnsFromEntity(saved_value_)) {
1279 return false;
1280 }
1281
1282 valid_ = true;
1283 return true;
1284 }
1285
1286 // Move backwards until the key smaller than saved_key_.
1287 // Changes valid_ only if return value is false.
1288 bool DBIter::FindUserKeyBeforeSavedKey() {
1289 assert(status_.ok());
1290 size_t num_skipped = 0;
1291 while (iter_.Valid()) {
1292 ParsedInternalKey ikey;
1293 if (!ParseKey(&ikey)) {
1294 return false;
1295 }
1296
1297 if (CompareKeyForSkip(ikey.user_key, saved_key_.GetUserKey()) < 0) {
1298 return true;
1299 }
1300
1301 if (TooManyInternalKeysSkipped()) {
1302 return false;
1303 }
1304
1305 assert(ikey.sequence != kMaxSequenceNumber);
1306 assert(ikey.user_key.size() >= timestamp_size_);
1307 Slice ts;
1308 if (timestamp_size_ > 0) {
1309 ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
1310 timestamp_size_);
1311 }
1312 if (!IsVisible(ikey.sequence, ts)) {
1313 PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
1314 } else {
1315 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
1316 }
1317
1318 if (num_skipped >= max_skip_) {
1319 num_skipped = 0;
1320 IterKey last_key;
1321 ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber,
1322 kValueTypeForSeek);
1323 if (timestamp_size_ > 0) {
1324 // TODO: pre-create kTsMax.
1325 const std::string kTsMax(timestamp_size_, '\xff');
1326 pikey.SetTimestamp(kTsMax);
1327 }
1328 last_key.SetInternalKey(pikey);
1329 // It would be more efficient to use SeekForPrev() here, but some
1330 // iterators may not support it.
1331 iter_.Seek(last_key.GetInternalKey());
1332 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1333 if (!iter_.Valid()) {
1334 break;
1335 }
1336 } else {
1337 ++num_skipped;
1338 }
1339
1340 iter_.Prev();
1341 }
1342
1343 if (!iter_.status().ok()) {
1344 valid_ = false;
1345 return false;
1346 }
1347
1348 return true;
1349 }
1350
1351 bool DBIter::TooManyInternalKeysSkipped(bool increment) {
1352 if ((max_skippable_internal_keys_ > 0) &&
1353 (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
1354 valid_ = false;
1355 status_ = Status::Incomplete("Too many internal keys skipped.");
1356 return true;
1357 } else if (increment) {
1358 num_internal_keys_skipped_++;
1359 }
1360 return false;
1361 }
1362
1363 bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts,
1364 bool* more_recent) {
1365 // Remember that comparator orders preceding timestamp as larger.
1366 // TODO(yanqin): support timestamp in read_callback_.
1367 bool visible_by_seq = (read_callback_ == nullptr)
1368 ? sequence <= sequence_
1369 : read_callback_->IsVisible(sequence);
1370
1371 bool visible_by_ts =
1372 (timestamp_ub_ == nullptr ||
1373 user_comparator_.CompareTimestamp(ts, *timestamp_ub_) <= 0) &&
1374 (timestamp_lb_ == nullptr ||
1375 user_comparator_.CompareTimestamp(ts, *timestamp_lb_) >= 0);
1376
1377 if (more_recent) {
1378 *more_recent = !visible_by_seq;
1379 }
1380 return visible_by_seq && visible_by_ts;
1381 }
1382
1383 void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
1384 is_key_seqnum_zero_ = false;
1385 SequenceNumber seq = sequence_;
1386 saved_key_.Clear();
1387 saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_);
1388
1389 if (iterate_lower_bound_ != nullptr &&
1390 user_comparator_.CompareWithoutTimestamp(
1391 saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
1392 /*b_has_ts=*/false) < 0) {
1393 // Seek key is smaller than the lower bound.
1394 saved_key_.Clear();
1395 saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek,
1396 timestamp_ub_);
1397 }
1398 }
1399
1400 void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
1401 is_key_seqnum_zero_ = false;
1402 saved_key_.Clear();
1403 // now saved_key is used to store internal key.
1404 saved_key_.SetInternalKey(target, 0 /* sequence_number */,
1405 kValueTypeForSeekForPrev, timestamp_ub_);
1406
1407 if (timestamp_size_ > 0) {
1408 const std::string kTsMin(timestamp_size_, '\0');
1409 Slice ts = kTsMin;
1410 saved_key_.UpdateInternalKey(
1411 /*seq=*/0, kValueTypeForSeekForPrev,
1412 timestamp_lb_ == nullptr ? &ts : timestamp_lb_);
1413 }
1414
1415 if (iterate_upper_bound_ != nullptr &&
1416 user_comparator_.CompareWithoutTimestamp(
1417 saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_upper_bound_,
1418 /*b_has_ts=*/false) >= 0) {
1419 saved_key_.Clear();
1420 saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber,
1421 kValueTypeForSeekForPrev, timestamp_ub_);
1422 if (timestamp_size_ > 0) {
1423 const std::string kTsMax(timestamp_size_, '\xff');
1424 Slice ts = kTsMax;
1425 saved_key_.UpdateInternalKey(
1426 kMaxSequenceNumber, kValueTypeForSeekForPrev,
1427 timestamp_lb_ != nullptr ? timestamp_lb_ : &ts);
1428 }
1429 }
1430 }
1431
1432 void DBIter::Seek(const Slice& target) {
1433 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
1434 StopWatch sw(clock_, statistics_, DB_SEEK);
1435
1436 #ifndef ROCKSDB_LITE
1437 if (db_impl_ != nullptr && cfd_ != nullptr) {
1438 // TODO: What do we do if this returns an error?
1439 Slice lower_bound, upper_bound;
1440 if (iterate_lower_bound_ != nullptr) {
1441 lower_bound = *iterate_lower_bound_;
1442 } else {
1443 lower_bound = Slice("");
1444 }
1445 if (iterate_upper_bound_ != nullptr) {
1446 upper_bound = *iterate_upper_bound_;
1447 } else {
1448 upper_bound = Slice("");
1449 }
1450 db_impl_->TraceIteratorSeek(cfd_->GetID(), target, lower_bound, upper_bound)
1451 .PermitUncheckedError();
1452 }
1453 #endif // ROCKSDB_LITE
1454
1455 status_ = Status::OK();
1456 ReleaseTempPinnedData();
1457 ResetBlobValue();
1458 ResetValueAndColumns();
1459 ResetInternalKeysSkippedCounter();
1460
1461 // Seek the inner iterator based on the target key.
1462 {
1463 PERF_TIMER_GUARD(seek_internal_seek_time);
1464
1465 SetSavedKeyToSeekTarget(target);
1466 iter_.Seek(saved_key_.GetInternalKey());
1467
1468 RecordTick(statistics_, NUMBER_DB_SEEK);
1469 }
1470 if (!iter_.Valid()) {
1471 valid_ = false;
1472 return;
1473 }
1474 direction_ = kForward;
1475
1476 // Now the inner iterator is placed to the target position. From there,
1477 // we need to find out the next key that is visible to the user.
1478 ClearSavedValue();
1479 if (prefix_same_as_start_) {
1480 // The case where the iterator needs to be invalidated if it has exhausted
1481 // keys within the same prefix of the seek key.
1482 assert(prefix_extractor_ != nullptr);
1483 Slice target_prefix = prefix_extractor_->Transform(target);
1484 FindNextUserEntry(false /* not skipping saved_key */,
1485 &target_prefix /* prefix */);
1486 if (valid_) {
1487 // Remember the prefix of the seek key for the future Next() call to
1488 // check.
1489 prefix_.SetUserKey(target_prefix);
1490 }
1491 } else {
1492 FindNextUserEntry(false /* not skipping saved_key */, nullptr);
1493 }
1494 if (!valid_) {
1495 return;
1496 }
1497
1498 // Updating stats and perf context counters.
1499 if (statistics_ != nullptr) {
1500 // Decrement since we don't want to count this key as skipped
1501 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1502 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1503 }
1504 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1505 }
1506
1507 void DBIter::SeekForPrev(const Slice& target) {
1508 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
1509 StopWatch sw(clock_, statistics_, DB_SEEK);
1510
1511 #ifndef ROCKSDB_LITE
1512 if (db_impl_ != nullptr && cfd_ != nullptr) {
1513 // TODO: What do we do if this returns an error?
1514 Slice lower_bound, upper_bound;
1515 if (iterate_lower_bound_ != nullptr) {
1516 lower_bound = *iterate_lower_bound_;
1517 } else {
1518 lower_bound = Slice("");
1519 }
1520 if (iterate_upper_bound_ != nullptr) {
1521 upper_bound = *iterate_upper_bound_;
1522 } else {
1523 upper_bound = Slice("");
1524 }
1525 db_impl_
1526 ->TraceIteratorSeekForPrev(cfd_->GetID(), target, lower_bound,
1527 upper_bound)
1528 .PermitUncheckedError();
1529 }
1530 #endif // ROCKSDB_LITE
1531
1532 status_ = Status::OK();
1533 ReleaseTempPinnedData();
1534 ResetBlobValue();
1535 ResetValueAndColumns();
1536 ResetInternalKeysSkippedCounter();
1537
1538 // Seek the inner iterator based on the target key.
1539 {
1540 PERF_TIMER_GUARD(seek_internal_seek_time);
1541 SetSavedKeyToSeekForPrevTarget(target);
1542 iter_.SeekForPrev(saved_key_.GetInternalKey());
1543 RecordTick(statistics_, NUMBER_DB_SEEK);
1544 }
1545 if (!iter_.Valid()) {
1546 valid_ = false;
1547 return;
1548 }
1549 direction_ = kReverse;
1550
1551 // Now the inner iterator is placed to the target position. From there,
1552 // we need to find out the first key that is visible to the user in the
1553 // backward direction.
1554 ClearSavedValue();
1555 if (prefix_same_as_start_) {
1556 // The case where the iterator needs to be invalidated if it has exhausted
1557 // keys within the same prefix of the seek key.
1558 assert(prefix_extractor_ != nullptr);
1559 Slice target_prefix = prefix_extractor_->Transform(target);
1560 PrevInternal(&target_prefix);
1561 if (valid_) {
1562 // Remember the prefix of the seek key for the future Prev() call to
1563 // check.
1564 prefix_.SetUserKey(target_prefix);
1565 }
1566 } else {
1567 PrevInternal(nullptr);
1568 }
1569
1570 // Report stats and perf context.
1571 if (statistics_ != nullptr && valid_) {
1572 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1573 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1574 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1575 }
1576 }
1577
1578 void DBIter::SeekToFirst() {
1579 if (iterate_lower_bound_ != nullptr) {
1580 Seek(*iterate_lower_bound_);
1581 return;
1582 }
1583 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
1584 // Don't use iter_::Seek() if we set a prefix extractor
1585 // because prefix seek will be used.
1586 if (!expect_total_order_inner_iter()) {
1587 max_skip_ = std::numeric_limits<uint64_t>::max();
1588 }
1589 status_ = Status::OK();
1590 // if iterator is empty, this status_ could be unchecked.
1591 status_.PermitUncheckedError();
1592 direction_ = kForward;
1593 ReleaseTempPinnedData();
1594 ResetBlobValue();
1595 ResetValueAndColumns();
1596 ResetInternalKeysSkippedCounter();
1597 ClearSavedValue();
1598 is_key_seqnum_zero_ = false;
1599
1600 {
1601 PERF_TIMER_GUARD(seek_internal_seek_time);
1602 iter_.SeekToFirst();
1603 }
1604
1605 RecordTick(statistics_, NUMBER_DB_SEEK);
1606 if (iter_.Valid()) {
1607 saved_key_.SetUserKey(
1608 ExtractUserKey(iter_.key()),
1609 !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1610 FindNextUserEntry(false /* not skipping saved_key */,
1611 nullptr /* no prefix check */);
1612 if (statistics_ != nullptr) {
1613 if (valid_) {
1614 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1615 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1616 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1617 }
1618 }
1619 } else {
1620 valid_ = false;
1621 }
1622 if (valid_ && prefix_same_as_start_) {
1623 assert(prefix_extractor_ != nullptr);
1624 prefix_.SetUserKey(prefix_extractor_->Transform(
1625 StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
1626 }
1627 }
1628
1629 void DBIter::SeekToLast() {
1630 if (iterate_upper_bound_ != nullptr) {
1631 // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
1632 SeekForPrev(*iterate_upper_bound_);
1633 const bool is_ikey = (timestamp_size_ > 0 && timestamp_lb_ != nullptr);
1634 Slice k = Valid() ? key() : Slice();
1635 if (is_ikey && Valid()) {
1636 k.remove_suffix(kNumInternalBytes + timestamp_size_);
1637 }
1638 while (Valid() && 0 == user_comparator_.CompareWithoutTimestamp(
1639 *iterate_upper_bound_, /*a_has_ts=*/false, k,
1640 /*b_has_ts=*/false)) {
1641 ReleaseTempPinnedData();
1642 ResetBlobValue();
1643 ResetValueAndColumns();
1644 PrevInternal(nullptr);
1645
1646 k = key();
1647 if (is_ikey) {
1648 k.remove_suffix(kNumInternalBytes + timestamp_size_);
1649 }
1650 }
1651 return;
1652 }
1653
1654 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
1655 // Don't use iter_::Seek() if we set a prefix extractor
1656 // because prefix seek will be used.
1657 if (!expect_total_order_inner_iter()) {
1658 max_skip_ = std::numeric_limits<uint64_t>::max();
1659 }
1660 status_ = Status::OK();
1661 // if iterator is empty, this status_ could be unchecked.
1662 status_.PermitUncheckedError();
1663 direction_ = kReverse;
1664 ReleaseTempPinnedData();
1665 ResetBlobValue();
1666 ResetValueAndColumns();
1667 ResetInternalKeysSkippedCounter();
1668 ClearSavedValue();
1669 is_key_seqnum_zero_ = false;
1670
1671 {
1672 PERF_TIMER_GUARD(seek_internal_seek_time);
1673 iter_.SeekToLast();
1674 }
1675 PrevInternal(nullptr);
1676 if (statistics_ != nullptr) {
1677 RecordTick(statistics_, NUMBER_DB_SEEK);
1678 if (valid_) {
1679 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1680 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1681 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1682 }
1683 }
1684 if (valid_ && prefix_same_as_start_) {
1685 assert(prefix_extractor_ != nullptr);
1686 prefix_.SetUserKey(prefix_extractor_->Transform(
1687 StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
1688 }
1689 }
1690
1691 Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
1692 const ImmutableOptions& ioptions,
1693 const MutableCFOptions& mutable_cf_options,
1694 const Comparator* user_key_comparator,
1695 InternalIterator* internal_iter, const Version* version,
1696 const SequenceNumber& sequence,
1697 uint64_t max_sequential_skip_in_iterations,
1698 ReadCallback* read_callback, DBImpl* db_impl,
1699 ColumnFamilyData* cfd, bool expose_blob_index) {
1700 DBIter* db_iter =
1701 new DBIter(env, read_options, ioptions, mutable_cf_options,
1702 user_key_comparator, internal_iter, version, sequence, false,
1703 max_sequential_skip_in_iterations, read_callback, db_impl, cfd,
1704 expose_blob_index);
1705 return db_iter;
1706 }
1707
1708 } // namespace ROCKSDB_NAMESPACE