#include "util/mutexlock.h"
#include "util/string_util.h"
#include "util/trace_replay.h"
+#include "util/user_comparator_wrapper.h"
namespace rocksdb {
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob)
- : arena_mode_(arena_mode),
- env_(_env),
+ : env_(_env),
logger_(cf_options.info_log),
user_comparator_(cmp),
merge_operator_(cf_options.merge_operator),
iter_(iter),
+ read_callback_(read_callback),
sequence_(s),
- direction_(kForward),
- valid_(false),
- current_entry_is_merged_(false),
statistics_(cf_options.statistics),
num_internal_keys_skipped_(0),
iterate_lower_bound_(read_options.iterate_lower_bound),
iterate_upper_bound_(read_options.iterate_upper_bound),
+ direction_(kForward),
+ valid_(false),
+ current_entry_is_merged_(false),
prefix_same_as_start_(read_options.prefix_same_as_start),
pin_thru_lifetime_(read_options.pin_data),
total_order_seek_(read_options.total_order_seek),
- range_del_agg_(cf_options.internal_comparator, s,
- true /* collapse_deletions */),
- read_callback_(read_callback),
- db_impl_(db_impl),
- cfd_(cfd),
allow_blob_(allow_blob),
is_blob_(false),
+ arena_mode_(arena_mode),
+ range_del_agg_(&cf_options.internal_comparator, s),
+ db_impl_(db_impl),
+ cfd_(cfd),
start_seqnum_(read_options.iter_start_seqnum) {
- RecordTick(statistics_, NO_ITERATORS);
+ RecordTick(statistics_, NO_ITERATOR_CREATED);
prefix_extractor_ = mutable_cf_options.prefix_extractor.get();
max_skip_ = max_sequential_skip_in_iterations;
max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
}
}
- virtual ~DBIter() {
+ ~DBIter() override {
// Release pinned data if any
if (pinned_iters_mgr_.PinningEnabled()) {
pinned_iters_mgr_.ReleasePinnedData();
}
- // Compiler warning issue filed:
- // https://github.com/facebook/rocksdb/issues/3013
- RecordTick(statistics_, NO_ITERATORS, uint64_t(-1));
+ RecordTick(statistics_, NO_ITERATOR_DELETED);
ResetInternalKeysSkippedCounter();
local_stats_.BumpGlobalStatistics(statistics_);
if (!arena_mode_) {
iter_ = iter;
iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
}
- virtual RangeDelAggregator* GetRangeDelAggregator() {
+ virtual ReadRangeDelAggregator* GetRangeDelAggregator() {
return &range_del_agg_;
}
- virtual bool Valid() const override { return valid_; }
- virtual Slice key() const override {
+ bool Valid() const override { return valid_; }
+ Slice key() const override {
assert(valid_);
if(start_seqnum_ > 0) {
return saved_key_.GetInternalKey();
} else {
return saved_key_.GetUserKey();
}
-
}
- virtual Slice value() const override {
+ Slice value() const override {
assert(valid_);
if (current_entry_is_merged_) {
// If pinned_value_ is set then the result of merge operator is one of
return iter_->value();
}
}
- virtual Status status() const override {
+ Status status() const override {
if (status_.ok()) {
return iter_->status();
} else {
return is_blob_;
}
- virtual Status GetProperty(std::string prop_name,
- std::string* prop) override {
+ Status GetProperty(std::string prop_name, std::string* prop) override {
if (prop == nullptr) {
return Status::InvalidArgument("prop is nullptr");
}
*prop = saved_key_.GetUserKey().ToString();
return Status::OK();
}
- return Status::InvalidArgument("Undentified property.");
+ return Status::InvalidArgument("Unidentified property.");
}
- virtual void Next() override;
- virtual void Prev() override;
- virtual void Seek(const Slice& target) override;
- virtual void SeekForPrev(const Slice& target) override;
- virtual void SeekToFirst() override;
- virtual void SeekToLast() override;
+ void Next() override;
+ void Prev() override;
+ void Seek(const Slice& target) override;
+ void SeekForPrev(const Slice& target) override;
+ void SeekToFirst() override;
+ void SeekToLast() override;
Env* env() { return env_; }
- void set_sequence(uint64_t s) { sequence_ = s; }
+ void set_sequence(uint64_t s) {
+ sequence_ = s;
+ if (read_callback_) {
+ read_callback_->Refresh(s);
+ }
+ }
void set_valid(bool v) { valid_ = v; }
private:
void PrevInternal();
bool TooManyInternalKeysSkipped(bool increment = true);
- bool IsVisible(SequenceNumber sequence);
+ inline bool IsVisible(SequenceNumber sequence);
// CanReseekToSkip() returns whether the iterator can use the optimization
// where it reseek by sequence number to get the next key when there are too
// sequence number does not guarantee that it is visible.
inline bool CanReseekToSkip();
- // MaxVisibleSequenceNumber() returns the maximum visible sequence number
- // for this snapshot. This sequence number may be greater than snapshot
- // seqno because uncommitted data written to DB for write unprepared will
- // have a higher sequence number.
- inline SequenceNumber MaxVisibleSequenceNumber();
-
// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called
void TempPinData() {
}
const SliceTransform* prefix_extractor_;
- bool arena_mode_;
Env* const env_;
Logger* logger_;
- const Comparator* const user_comparator_;
+ UserComparatorWrapper user_comparator_;
const MergeOperator* const merge_operator_;
InternalIterator* iter_;
+ ReadCallback* read_callback_;
+ // Max visible sequence number. It is normally the snapshot seq unless we have
+ // uncommitted data in db as in WriteUnCommitted.
SequenceNumber sequence_;
- Status status_;
IterKey saved_key_;
// Reusable internal key data structure. This is only used inside one function
// and should not be used across functions. Reusing this object can reduce
ParsedInternalKey ikey_;
std::string saved_value_;
Slice pinned_value_;
- Direction direction_;
- bool valid_;
- bool current_entry_is_merged_;
// for prefix seek mode to support prev()
Statistics* statistics_;
uint64_t max_skip_;
uint64_t num_internal_keys_skipped_;
const Slice* iterate_lower_bound_;
const Slice* iterate_upper_bound_;
+
IterKey prefix_start_buf_;
+
+ Status status_;
Slice prefix_start_key_;
+ Direction direction_;
+ bool valid_;
+ bool current_entry_is_merged_;
const bool prefix_same_as_start_;
// Means that we will pin all data blocks we read as long the Iterator
// is not deleted, will be true if ReadOptions::pin_data is true
const bool pin_thru_lifetime_;
const bool total_order_seek_;
+ bool allow_blob_;
+ bool is_blob_;
+ bool arena_mode_;
// List of operands for merge operator.
MergeContext merge_context_;
- RangeDelAggregator range_del_agg_;
+ ReadRangeDelAggregator range_del_agg_;
LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_;
- ReadCallback* read_callback_;
DBImpl* db_impl_;
ColumnFamilyData* cfd_;
- bool allow_blob_;
- bool is_blob_;
// for diff snapshots we want the lower bound on the seqnum;
// if this value > 0 iterator will return internal keys
SequenceNumber start_seqnum_;
assert(valid_);
assert(status_.ok());
+ PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
// Release temporarily pinned blocks from last operation
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
}
if (iterate_upper_bound_ != nullptr &&
- user_comparator_->Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
+ user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
break;
}
}
if (IsVisible(ikey_.sequence)) {
- if (skipping && user_comparator_->Compare(ikey_.user_key,
- saved_key_.GetUserKey()) <= 0) {
+ if (skipping && user_comparator_.Compare(ikey_.user_key,
+ saved_key_.GetUserKey()) <= 0) {
num_skipped++; // skip this entry
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
} else {
// If this happens too many times in a row for the same user key, we want
// to seek to the target sequence number.
int cmp =
- user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey());
+ user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey());
if (cmp == 0 || (skipping && cmp <= 0)) {
num_skipped++;
} else {
return false;
}
- if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+ if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
// hit the next user key, stop right here
break;
} else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
void DBIter::Prev() {
assert(valid_);
assert(status_.ok());
+
+ PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
bool ok = true;
if (!ParseKey(&ikey)) {
return false;
}
- if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >=
- 0) {
+ if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
return true;
}
iter_->Next();
}
if (iterate_lower_bound_ != nullptr &&
- user_comparator_->Compare(saved_key_.GetUserKey(),
- *iterate_lower_bound_) < 0) {
+ user_comparator_.Compare(saved_key_.GetUserKey(),
+ *iterate_lower_bound_) < 0) {
// We've iterated earlier than the user-specified lower bound.
valid_ = false;
return;
}
if (!IsVisible(ikey.sequence) ||
- !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+ !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
break;
}
if (TooManyInternalKeysSkipped()) {
if (!ParseKey(&ikey)) {
return false;
}
- if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+ if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
// No visible values for this key, even though FindValueForCurrentKey()
// has seen some. This is possible if we're using a tailing iterator, and
// the entries were discarded in a compaction.
if (!ParseKey(&ikey)) {
return false;
}
- if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) {
+ if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
break;
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
- ikey, RangeDelPositioningMode::kBackwardTraversal)) {
+ ikey, RangeDelPositioningMode::kForwardTraversal)) {
break;
} else if (ikey.type == kTypeValue) {
const Slice val = iter_->value();
return false;
}
- if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
+ if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
return true;
}
}
bool DBIter::IsVisible(SequenceNumber sequence) {
- return sequence <= MaxVisibleSequenceNumber() &&
- (read_callback_ == nullptr || read_callback_->IsVisible(sequence));
-}
-
-bool DBIter::CanReseekToSkip() {
- return read_callback_ == nullptr ||
- read_callback_->MaxUnpreparedSequenceNumber() == 0;
-}
-
-SequenceNumber DBIter::MaxVisibleSequenceNumber() {
if (read_callback_ == nullptr) {
- return sequence_;
+ return sequence <= sequence_;
+ } else {
+ return read_callback_->IsVisible(sequence);
}
+}
- return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber());
+bool DBIter::CanReseekToSkip() {
+ return read_callback_ == nullptr || read_callback_->CanReseekToSkip();
}
void DBIter::Seek(const Slice& target) {
+ PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
StopWatch sw(env_, statistics_, DB_SEEK);
status_ = Status::OK();
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
- SequenceNumber seq = MaxVisibleSequenceNumber();
+ SequenceNumber seq = sequence_;
saved_key_.Clear();
saved_key_.SetInternalKey(target, seq);
#endif // ROCKSDB_LITE
if (iterate_lower_bound_ != nullptr &&
- user_comparator_->Compare(saved_key_.GetUserKey(),
- *iterate_lower_bound_) < 0) {
+ user_comparator_.Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) <
+ 0) {
saved_key_.Clear();
saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
}
}
void DBIter::SeekForPrev(const Slice& target) {
+ PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
StopWatch sw(env_, statistics_, DB_SEEK);
status_ = Status::OK();
ReleaseTempPinnedData();
kValueTypeForSeekForPrev);
if (iterate_upper_bound_ != nullptr &&
- user_comparator_->Compare(saved_key_.GetUserKey(),
- *iterate_upper_bound_) >= 0) {
+ user_comparator_.Compare(saved_key_.GetUserKey(),
+ *iterate_upper_bound_) >= 0) {
saved_key_.Clear();
saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
}
Seek(*iterate_lower_bound_);
return;
}
+ PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek will be used.
if (prefix_extractor_ != nullptr && !total_order_seek_) {
if (iterate_upper_bound_ != nullptr) {
// Seek to last key strictly less than ReadOptions.iterate_upper_bound.
SeekForPrev(*iterate_upper_bound_);
- if (Valid() && user_comparator_->Equal(*iterate_upper_bound_, key())) {
+ if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
ReleaseTempPinnedData();
PrevInternal();
}
return;
}
+ PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek will be used.
if (prefix_extractor_ != nullptr && !total_order_seek_) {
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
-RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
+ReadRangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
return db_iter_->GetRangeDelAggregator();
}
new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
+ if (read_callback_) {
+ read_callback_->Refresh(latest_seq);
+ }
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
- read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
+ read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
+ latest_seq);
SetIterUnderDBIter(internal_iter);
} else {
db_iter_->set_sequence(latest_seq);