// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_iter.h"
-#include <string>
+
#include <iostream>
#include <limits>
+#include <string>
#include "db/dbformat.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
+#include "db/wide/wide_column_serialization.h"
#include "file/filename.h"
#include "logging/logging.h"
#include "memory/arena.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/options.h"
+#include "rocksdb/system_clock.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "trace_replay/trace_replay.h"
namespace ROCKSDB_NAMESPACE {
DBIter::DBIter(Env* _env, const ReadOptions& read_options,
- const ImmutableCFOptions& cf_options,
+ const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
- const Comparator* cmp, InternalIterator* iter, SequenceNumber s,
- bool arena_mode, uint64_t max_sequential_skip_in_iterations,
+ const Comparator* cmp, InternalIterator* iter,
+ const Version* version, SequenceNumber s, bool arena_mode,
+ uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
- ColumnFamilyData* cfd, bool allow_blob)
+ ColumnFamilyData* cfd, bool expose_blob_index)
: prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
env_(_env),
- logger_(cf_options.info_log),
+ clock_(ioptions.clock),
+ logger_(ioptions.logger),
user_comparator_(cmp),
- merge_operator_(cf_options.merge_operator),
+ merge_operator_(ioptions.merge_operator.get()),
iter_(iter),
+ version_(version),
read_callback_(read_callback),
sequence_(s),
- statistics_(cf_options.statistics),
+ statistics_(ioptions.stats),
max_skip_(max_sequential_skip_in_iterations),
max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
num_internal_keys_skipped_(0),
expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
read_options.total_order_seek ||
read_options.auto_prefix_mode),
- allow_blob_(allow_blob),
+ read_tier_(read_options.read_tier),
+ fill_cache_(read_options.fill_cache),
+ verify_checksums_(read_options.verify_checksums),
+ expose_blob_index_(expose_blob_index),
is_blob_(false),
arena_mode_(arena_mode),
- range_del_agg_(&cf_options.internal_comparator, s),
db_impl_(db_impl),
cfd_(cfd),
- start_seqnum_(read_options.iter_start_seqnum),
timestamp_ub_(read_options.timestamp),
timestamp_lb_(read_options.iter_start_ts),
timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
if (iter_.iter()) {
iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
}
- assert(timestamp_size_ == user_comparator_.timestamp_size());
+ status_.PermitUncheckedError();
+ assert(timestamp_size_ ==
+ user_comparator_.user_comparator()->timestamp_size());
}
Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
}
bool DBIter::ParseKey(ParsedInternalKey* ikey) {
- Status s =
- ParseInternalKey(iter_.key(), ikey, false /* log_err_key */); // TODO
+ Status s = ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);
if (!s.ok()) {
status_ = Status::Corruption("In DBIter: ", s.getState());
valid_ = false;
assert(valid_);
assert(status_.ok());
- PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
+ PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_);
// Release temporarily pinned blocks from last operation
ReleaseTempPinnedData();
+ ResetBlobValue();
+ ResetValueAndColumns();
local_stats_.skip_count_ += num_internal_keys_skipped_;
local_stats_.skip_count_--;
num_internal_keys_skipped_ = 0;
local_stats_.next_count_++;
if (ok && iter_.Valid()) {
+ ClearSavedValue();
+
if (prefix_same_as_start_) {
assert(prefix_extractor_ != nullptr);
const Slice prefix = prefix_.GetUserKey();
}
}
+bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
+ const Slice& blob_index) {
+ assert(!is_blob_);
+ assert(blob_value_.empty());
+
+ if (expose_blob_index_) { // Stacked BlobDB implementation
+ is_blob_ = true;
+ return true;
+ }
+
+ if (!version_) {
+ status_ = Status::Corruption("Encountered unexpected blob index.");
+ valid_ = false;
+ return false;
+ }
+
+ // TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
+ // avoid having to copy options back and forth.
+ ReadOptions read_options;
+ read_options.read_tier = read_tier_;
+ read_options.fill_cache = fill_cache_;
+ read_options.verify_checksums = verify_checksums_;
+
+ constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
+ constexpr uint64_t* bytes_read = nullptr;
+
+ const Status s = version_->GetBlob(read_options, user_key, blob_index,
+ prefetch_buffer, &blob_value_, bytes_read);
+
+ if (!s.ok()) {
+ status_ = s;
+ valid_ = false;
+ return false;
+ }
+
+ is_blob_ = true;
+ return true;
+}
+
+bool DBIter::SetValueAndColumnsFromEntity(Slice slice) {
+ assert(value_.empty());
+ assert(wide_columns_.empty());
+
+ const Status s = WideColumnSerialization::Deserialize(slice, wide_columns_);
+
+ if (!s.ok()) {
+ status_ = s;
+ valid_ = false;
+ return false;
+ }
+
+ if (!wide_columns_.empty() &&
+ wide_columns_[0].name() == kDefaultWideColumnName) {
+ value_ = wide_columns_[0].value();
+ }
+
+ return true;
+}
+
// PRE: saved_key_ has the current user key if skipping_saved_key
// POST: saved_key_ should have the next user key if valid_,
// if the current entry is a result of merge
// to one.
bool reseek_done = false;
- is_blob_ = false;
-
do {
// Will update is_key_seqnum_zero_ as soon as we parsed the current key
// but we need to save the previous value to be used in the loop.
is_key_seqnum_zero_ = false;
return false;
}
+ Slice user_key_without_ts =
+ StripTimestampFromUserKey(ikey_.user_key, timestamp_size_);
is_key_seqnum_zero_ = (ikey_.sequence == 0);
assert(iterate_upper_bound_ == nullptr ||
iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound ||
user_comparator_.CompareWithoutTimestamp(
- ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
+ user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
/*b_has_ts=*/false) < 0);
if (iterate_upper_bound_ != nullptr &&
iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound &&
user_comparator_.CompareWithoutTimestamp(
- ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
+ user_key_without_ts, /*a_has_ts=*/false, *iterate_upper_bound_,
/*b_has_ts=*/false) >= 0) {
break;
}
assert(prefix == nullptr || prefix_extractor_ != nullptr);
if (prefix != nullptr &&
- prefix_extractor_->Transform(ikey_.user_key).compare(*prefix) != 0) {
+ prefix_extractor_->Transform(user_key_without_ts).compare(*prefix) !=
+ 0) {
assert(prefix_same_as_start_);
break;
}
case kTypeSingleDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
- // if iterartor specified start_seqnum we
- // 1) return internal key, including the type
- // 2) return ikey only if ikey.seqnum >= start_seqnum_
- // note that if deletion seqnum is < start_seqnum_ we
- // just skip it like in normal iterator.
- if (start_seqnum_ > 0) {
- if (ikey_.sequence >= start_seqnum_) {
- saved_key_.SetInternalKey(ikey_);
- valid_ = true;
- return true;
- } else {
- saved_key_.SetUserKey(
- ikey_.user_key,
- !pin_thru_lifetime_ ||
- !iter_.iter()->IsKeyPinned() /* copy */);
- skipping_saved_key = true;
- PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
- }
- } else if (timestamp_lb_) {
+ if (timestamp_lb_) {
saved_key_.SetInternalKey(ikey_);
valid_ = true;
return true;
break;
case kTypeValue:
case kTypeBlobIndex:
- if (start_seqnum_ > 0) {
- if (ikey_.sequence >= start_seqnum_) {
- assert(ikey_.type != kTypeBlobIndex);
- saved_key_.SetInternalKey(ikey_);
- valid_ = true;
- return true;
- } else {
- // this key and all previous versions shouldn't be included,
- // skipping_saved_key
- saved_key_.SetUserKey(
- ikey_.user_key,
- !pin_thru_lifetime_ ||
- !iter_.iter()->IsKeyPinned() /* copy */);
- skipping_saved_key = true;
- }
- } else if (timestamp_lb_) {
+ case kTypeWideColumnEntity:
+ if (timestamp_lb_) {
saved_key_.SetInternalKey(ikey_);
- valid_ = true;
- return true;
} else {
saved_key_.SetUserKey(
ikey_.user_key, !pin_thru_lifetime_ ||
!iter_.iter()->IsKeyPinned() /* copy */);
- if (range_del_agg_.ShouldDelete(
- ikey_, RangeDelPositioningMode::kForwardTraversal)) {
- // Arrange to skip all upcoming entries for this key since
- // they are hidden by this deletion.
- skipping_saved_key = true;
- num_skipped = 0;
- reseek_done = false;
- PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
- } else if (ikey_.type == kTypeBlobIndex) {
- if (!allow_blob_) {
- ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
- status_ = Status::NotSupported(
- "Encounter unexpected blob index. Please open DB with "
- "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
- valid_ = false;
- return false;
- }
-
- is_blob_ = true;
- valid_ = true;
- return true;
- } else {
- valid_ = true;
- return true;
+ }
+
+ if (ikey_.type == kTypeBlobIndex) {
+ if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
+ return false;
+ }
+
+ SetValueAndColumnsFromPlain(expose_blob_index_ ? iter_.value()
+ : blob_value_);
+ } else if (ikey_.type == kTypeWideColumnEntity) {
+ if (!SetValueAndColumnsFromEntity(iter_.value())) {
+ return false;
}
+ } else {
+ assert(ikey_.type == kTypeValue);
+ SetValueAndColumnsFromPlain(iter_.value());
}
+
+ valid_ = true;
+ return true;
break;
case kTypeMerge:
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
- if (range_del_agg_.ShouldDelete(
- ikey_, RangeDelPositioningMode::kForwardTraversal)) {
- // Arrange to skip all upcoming entries for this key since
- // they are hidden by this deletion.
- skipping_saved_key = true;
- num_skipped = 0;
- reseek_done = false;
- PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
- } else {
- // By now, we are sure the current ikey is going to yield a
- // value
- current_entry_is_merged_ = true;
- valid_ = true;
- return MergeValuesNewToOld(); // Go to a different state machine
- }
+ // By now, we are sure the current ikey is going to yield a value
+ current_entry_is_merged_ = true;
+ valid_ = true;
+ return MergeValuesNewToOld(); // Go to a different state machine
break;
default:
valid_ = false;
&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
} else {
- const std::string kTsMin(timestamp_size_, static_cast<char>(0));
+ const std::string kTsMin(timestamp_size_, '\0');
AppendInternalKeyWithDifferentTimestamp(
&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
ParsedInternalKey ikey;
- Status s;
for (iter_.Next(); iter_.Valid(); iter_.Next()) {
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
if (!ParseKey(&ikey)) {
return false;
}
- if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
+ if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
+ saved_key_.GetUserKey())) {
// hit the next user key, stop right here
break;
}
if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
- range_del_agg_.ShouldDelete(
- ikey, RangeDelPositioningMode::kForwardTraversal)) {
+ kTypeDeletionWithTimestamp == ikey.type) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_.Next();
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
const Slice val = iter_.value();
- s = MergeHelper::TimedFullMerge(
- merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
- &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
- if (!s.ok()) {
- valid_ = false;
- status_ = s;
+ if (!Merge(&val, ikey.user_key)) {
return false;
}
// iter_ is positioned after put
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (kTypeBlobIndex == ikey.type) {
- if (!allow_blob_) {
- ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
- status_ = Status::NotSupported(
- "Encounter unexpected blob index. Please open DB with "
- "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
- } else {
+ if (expose_blob_index_) {
status_ =
- Status::NotSupported("Blob DB does not support merge operator.");
+ Status::NotSupported("BlobDB does not support merge operator.");
+ valid_ = false;
+ return false;
}
- valid_ = false;
- return false;
+ // hit a put, merge the put value with operands and store the
+ // final result in saved_value_. We are done!
+ if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
+ return false;
+ }
+ valid_ = true;
+ if (!Merge(&blob_value_, ikey.user_key)) {
+ return false;
+ }
+
+ ResetBlobValue();
+
+ // iter_ is positioned after put
+ iter_.Next();
+ if (!iter_.status().ok()) {
+ valid_ = false;
+ return false;
+ }
+ return true;
+ } else if (kTypeWideColumnEntity == ikey.type) {
+ if (!MergeEntity(iter_.value(), ikey.user_key)) {
+ return false;
+ }
+
+ // iter_ is positioned after put
+ iter_.Next();
+ if (!iter_.status().ok()) {
+ valid_ = false;
+ return false;
+ }
+
+ return true;
} else {
valid_ = false;
status_ = Status::Corruption(
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
- s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
- nullptr, merge_context_.GetOperands(),
- &saved_value_, logger_, statistics_, env_,
- &pinned_value_, true);
- if (!s.ok()) {
- valid_ = false;
- status_ = s;
+ if (!Merge(nullptr, saved_key_.GetUserKey())) {
return false;
}
-
assert(status_.ok());
return true;
}
void DBIter::Prev() {
- if (timestamp_size_ > 0) {
- valid_ = false;
- status_ = Status::NotSupported(
- "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
- return;
- }
-
assert(valid_);
assert(status_.ok());
- PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
+ PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_);
ReleaseTempPinnedData();
+ ResetBlobValue();
+ ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
bool ok = true;
if (direction_ == kForward) {
}
}
if (ok) {
+ ClearSavedValue();
+
Slice prefix;
if (prefix_same_as_start_) {
assert(prefix_extractor_ != nullptr);
// If that's the case, seek iter_ to current key.
if (!expect_total_order_inner_iter() || !iter_.Valid()) {
IterKey last_key;
- last_key.SetInternalKey(ParsedInternalKey(
- saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
+ ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber,
+ kValueTypeForSeek);
+ if (timestamp_size_ > 0) {
+ // TODO: pre-create kTsMax.
+ const std::string kTsMax(timestamp_size_, '\xff');
+ pikey.SetTimestamp(kTsMax);
+ }
+ last_key.SetInternalKey(pikey);
iter_.Seek(last_key.GetInternalKey());
+ RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
direction_ = kForward;
iter_.SeekToLast();
}
}
+ RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
direction_ = kReverse;
assert(prefix == nullptr || prefix_extractor_ != nullptr);
if (prefix != nullptr &&
- prefix_extractor_->Transform(saved_key_.GetUserKey())
+ prefix_extractor_
+ ->Transform(StripTimestampFromUserKey(saved_key_.GetUserKey(),
+ timestamp_size_))
.compare(*prefix) != 0) {
assert(prefix_same_as_start_);
// Current key does not have the same prefix as start
}
assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
- user_comparator_.Compare(saved_key_.GetUserKey(),
- *iterate_lower_bound_) >= 0);
+ user_comparator_.CompareWithoutTimestamp(
+ saved_key_.GetUserKey(), /*a_has_ts=*/true,
+ *iterate_lower_bound_, /*b_has_ts=*/false) >= 0);
if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
- user_comparator_.Compare(saved_key_.GetUserKey(),
- *iterate_lower_bound_) < 0) {
+ user_comparator_.CompareWithoutTimestamp(
+ saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
+ /*b_has_ts=*/false) < 0) {
// We've iterated earlier than the user-specified lower bound.
valid_ = false;
return;
assert(iter_.Valid());
merge_context_.Clear();
current_entry_is_merged_ = false;
- // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
- // kTypeValue)
+ // last entry before merge (could be kTypeDeletion,
+ // kTypeDeletionWithTimestamp, kTypeSingleDeletion, kTypeValue,
+ // kTypeBlobIndex, or kTypeWideColumnEntity)
ValueType last_not_merge_type = kTypeDeletion;
ValueType last_key_entry_type = kTypeDeletion;
+ // If false, it indicates that we have not seen any valid entry, even though
+ // last_key_entry_type is initialized to kTypeDeletion.
+ bool valid_entry_seen = false;
+
// Temporarily pin blocks that hold (merge operands / the value)
ReleaseTempPinnedData();
TempPinData();
return false;
}
+ if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
+ saved_key_.GetUserKey())) {
+ // Found a smaller user key, thus we are done with current user key.
+ break;
+ }
+
assert(ikey.user_key.size() >= timestamp_size_);
Slice ts;
if (timestamp_size_ > 0) {
ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
timestamp_size_);
}
- if (!IsVisible(ikey.sequence, ts) ||
- !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
+
+ bool visible = IsVisible(ikey.sequence, ts);
+ if (!visible &&
+ (timestamp_lb_ == nullptr ||
+ user_comparator_.CompareTimestamp(ts, *timestamp_ub_) > 0)) {
+ // Found an invisible version of the current user key, and it must have
+ // a higher sequence number or timestamp. Therefore, we are done with the
+ // current user key.
break;
}
+
+ if (!ts.empty()) {
+ saved_timestamp_.assign(ts.data(), ts.size());
+ }
+
if (TooManyInternalKeysSkipped()) {
return false;
}
return false;
}
+ if (timestamp_lb_ != nullptr) {
+ // Only needed when timestamp_lb_ is not null
+ [[maybe_unused]] const bool ret = ParseKey(&ikey_);
+ saved_ikey_.assign(iter_.key().data(), iter_.key().size());
+ // Since the preceding ParseKey(&ikey) succeeds, so must this.
+ assert(ret);
+ }
+
+ valid_entry_seen = true;
last_key_entry_type = ikey.type;
switch (last_key_entry_type) {
case kTypeValue:
case kTypeBlobIndex:
- if (range_del_agg_.ShouldDelete(
- ikey, RangeDelPositioningMode::kBackwardTraversal)) {
- last_key_entry_type = kTypeRangeDeletion;
- PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
- } else if (iter_.iter()->IsValuePinned()) {
+ case kTypeWideColumnEntity:
+ if (iter_.iter()->IsValuePinned()) {
pinned_value_ = iter_.value();
} else {
valid_ = false;
}
break;
case kTypeDeletion:
+ case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion:
merge_context_.Clear();
last_not_merge_type = last_key_entry_type;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
- case kTypeMerge:
- if (range_del_agg_.ShouldDelete(
- ikey, RangeDelPositioningMode::kBackwardTraversal)) {
- merge_context_.Clear();
- last_key_entry_type = kTypeRangeDeletion;
- last_not_merge_type = last_key_entry_type;
- PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
- } else {
- assert(merge_operator_ != nullptr);
- merge_context_.PushOperandBack(
- iter_.value(),
- iter_.iter()->IsValuePinned() /* operand_pinned */);
- PERF_COUNTER_ADD(internal_merge_count, 1);
- }
- break;
+ case kTypeMerge: {
+ assert(merge_operator_ != nullptr);
+ merge_context_.PushOperandBack(
+ iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
+ PERF_COUNTER_ADD(internal_merge_count, 1);
+ } break;
default:
valid_ = false;
status_ = Status::Corruption(
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
iter_.Prev();
++num_skipped;
+
+ if (visible && timestamp_lb_ != nullptr) {
+ // If timestamp_lb_ is not nullptr, we do not have to look further for
+ // another internal key. We can return this current internal key. Yet we
+ // still keep the invariant that iter_ is positioned before the returned
+ // key.
+ break;
+ }
}
if (!iter_.status().ok()) {
return false;
}
+ if (!valid_entry_seen) {
+ // Since we haven't seen any valid entry, last_key_entry_type remains
+ // unchanged and the same as its initial value.
+ assert(last_key_entry_type == kTypeDeletion);
+ assert(last_not_merge_type == kTypeDeletion);
+ valid_ = false;
+ return true;
+ }
+
+ if (timestamp_lb_ != nullptr) {
+ assert(last_key_entry_type == ikey_.type);
+ }
+
Status s;
s.PermitUncheckedError();
- is_blob_ = false;
+
switch (last_key_entry_type) {
case kTypeDeletion:
+ case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion:
- case kTypeRangeDeletion:
- valid_ = false;
+ if (timestamp_lb_ == nullptr) {
+ valid_ = false;
+ } else {
+ saved_key_.SetInternalKey(saved_ikey_);
+ valid_ = true;
+ }
return true;
case kTypeMerge:
current_entry_is_merged_ = true;
if (last_not_merge_type == kTypeDeletion ||
last_not_merge_type == kTypeSingleDeletion ||
- last_not_merge_type == kTypeRangeDeletion) {
- s = MergeHelper::TimedFullMerge(
- merge_operator_, saved_key_.GetUserKey(), nullptr,
- merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
- env_, &pinned_value_, true);
+ last_not_merge_type == kTypeDeletionWithTimestamp) {
+ if (!Merge(nullptr, saved_key_.GetUserKey())) {
+ return false;
+ }
+ return true;
} else if (last_not_merge_type == kTypeBlobIndex) {
- if (!allow_blob_) {
- ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
- status_ = Status::NotSupported(
- "Encounter unexpected blob index. Please open DB with "
- "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
- } else {
+ if (expose_blob_index_) {
status_ =
- Status::NotSupported("Blob DB does not support merge operator.");
+ Status::NotSupported("BlobDB does not support merge operator.");
+ valid_ = false;
+ return false;
}
- valid_ = false;
- return false;
+ if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
+ return false;
+ }
+ valid_ = true;
+ if (!Merge(&blob_value_, saved_key_.GetUserKey())) {
+ return false;
+ }
+
+ ResetBlobValue();
+
+ return true;
+ } else if (last_not_merge_type == kTypeWideColumnEntity) {
+ if (!MergeEntity(pinned_value_, saved_key_.GetUserKey())) {
+ return false;
+ }
+
+ return true;
} else {
assert(last_not_merge_type == kTypeValue);
- s = MergeHelper::TimedFullMerge(
- merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
- merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
- env_, &pinned_value_, true);
+ if (!Merge(&pinned_value_, saved_key_.GetUserKey())) {
+ return false;
+ }
+ return true;
}
break;
case kTypeValue:
- // do nothing - we've already has value in pinned_value_
+ if (timestamp_lb_ != nullptr) {
+ saved_key_.SetInternalKey(saved_ikey_);
+ }
+
+ SetValueAndColumnsFromPlain(pinned_value_);
+
break;
case kTypeBlobIndex:
- if (!allow_blob_) {
- ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
- status_ = Status::NotSupported(
- "Encounter unexpected blob index. Please open DB with "
- "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
- valid_ = false;
+ if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
+ return false;
+ }
+
+ SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
+ : blob_value_);
+
+ break;
+ case kTypeWideColumnEntity:
+ if (!SetValueAndColumnsFromEntity(pinned_value_)) {
return false;
}
- is_blob_ = true;
break;
default:
valid_ = false;
// FindValueForCurrentKeyUsingSeek()
assert(pinned_iters_mgr_.PinningEnabled());
std::string last_key;
- AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
- sequence_, kValueTypeForSeek));
+ if (0 == timestamp_size_) {
+ AppendInternalKey(&last_key,
+ ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
+ kValueTypeForSeek));
+ } else {
+ AppendInternalKeyWithDifferentTimestamp(
+ &last_key,
+ ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
+ kValueTypeForSeek),
+ timestamp_lb_ == nullptr ? *timestamp_ub_ : *timestamp_lb_);
+ }
iter_.Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
// In case read_callback presents, the value we seek to may not be visible.
// Find the next value that's visible.
ParsedInternalKey ikey;
- is_blob_ = false;
+
while (true) {
if (!iter_.Valid()) {
valid_ = false;
timestamp_size_);
}
- if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
+ if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
+ saved_key_.GetUserKey())) {
// No visible values for this key, even though FindValueForCurrentKey()
// has seen some. This is possible if we're using a tailing iterator, and
// the entries were discarded in a compaction.
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
- range_del_agg_.ShouldDelete(
- ikey, RangeDelPositioningMode::kBackwardTraversal)) {
- valid_ = false;
+ kTypeDeletionWithTimestamp == ikey.type) {
+ if (timestamp_lb_ == nullptr) {
+ valid_ = false;
+ } else {
+ valid_ = true;
+ saved_key_.SetInternalKey(ikey);
+ }
return true;
}
- if (ikey.type == kTypeBlobIndex && !allow_blob_) {
- ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
- status_ = Status::NotSupported(
- "Encounter unexpected blob index. Please open DB with "
- "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
- valid_ = false;
- return false;
- }
if (!iter_.PrepareValue()) {
valid_ = false;
return false;
}
- if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
+ if (timestamp_size_ > 0) {
+ Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_);
+ saved_timestamp_.assign(ts.data(), ts.size());
+ }
+ if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex ||
+ ikey.type == kTypeWideColumnEntity) {
assert(iter_.iter()->IsValuePinned());
pinned_value_ = iter_.value();
- is_blob_ = (ikey.type == kTypeBlobIndex);
+ if (ikey.type == kTypeBlobIndex) {
+ if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
+ return false;
+ }
+
+ SetValueAndColumnsFromPlain(expose_blob_index_ ? pinned_value_
+ : blob_value_);
+ } else if (ikey.type == kTypeWideColumnEntity) {
+ if (!SetValueAndColumnsFromEntity(pinned_value_)) {
+ return false;
+ }
+ } else {
+ assert(ikey.type == kTypeValue);
+ SetValueAndColumnsFromPlain(pinned_value_);
+ }
+
+ if (timestamp_lb_ != nullptr) {
+ saved_key_.SetInternalKey(ikey);
+ }
+
valid_ = true;
return true;
}
if (!ParseKey(&ikey)) {
return false;
}
- if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
+ if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
+ saved_key_.GetUserKey())) {
break;
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
- range_del_agg_.ShouldDelete(
- ikey, RangeDelPositioningMode::kForwardTraversal)) {
+ ikey.type == kTypeDeletionWithTimestamp) {
break;
}
if (!iter_.PrepareValue()) {
if (ikey.type == kTypeValue) {
const Slice val = iter_.value();
- Status s = MergeHelper::TimedFullMerge(
- merge_operator_, saved_key_.GetUserKey(), &val,
- merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
- env_, &pinned_value_, true);
- if (!s.ok()) {
- valid_ = false;
- status_ = s;
+ if (!Merge(&val, saved_key_.GetUserKey())) {
return false;
}
- valid_ = true;
return true;
} else if (ikey.type == kTypeMerge) {
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (ikey.type == kTypeBlobIndex) {
- if (!allow_blob_) {
- ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
- status_ = Status::NotSupported(
- "Encounter unexpected blob index. Please open DB with "
- "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
- } else {
+ if (expose_blob_index_) {
status_ =
- Status::NotSupported("Blob DB does not support merge operator.");
+ Status::NotSupported("BlobDB does not support merge operator.");
+ valid_ = false;
+ return false;
}
- valid_ = false;
- return false;
+ if (!SetBlobValueIfNeeded(ikey.user_key, iter_.value())) {
+ return false;
+ }
+ valid_ = true;
+ if (!Merge(&blob_value_, saved_key_.GetUserKey())) {
+ return false;
+ }
+
+ ResetBlobValue();
+
+ return true;
+ } else if (ikey.type == kTypeWideColumnEntity) {
+ if (!MergeEntity(iter_.value(), saved_key_.GetUserKey())) {
+ return false;
+ }
+
+ return true;
} else {
valid_ = false;
status_ = Status::Corruption(
}
}
- Status s = MergeHelper::TimedFullMerge(
- merge_operator_, saved_key_.GetUserKey(), nullptr,
- merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
- &pinned_value_, true);
- if (!s.ok()) {
- valid_ = false;
- status_ = s;
+ if (!Merge(nullptr, saved_key_.GetUserKey())) {
return false;
}
return true;
}
+bool DBIter::Merge(const Slice* val, const Slice& user_key) {
+ Status s = MergeHelper::TimedFullMerge(
+ merge_operator_, user_key, val, merge_context_.GetOperands(),
+ &saved_value_, logger_, statistics_, clock_, &pinned_value_,
+ /* update_num_ops_stats */ true);
+ if (!s.ok()) {
+ valid_ = false;
+ status_ = s;
+ return false;
+ }
+
+ SetValueAndColumnsFromPlain(pinned_value_.data() ? pinned_value_
+ : saved_value_);
+
+ valid_ = true;
+ return true;
+}
+
+bool DBIter::MergeEntity(const Slice& entity, const Slice& user_key) {
+ Status s = MergeHelper::TimedFullMergeWithEntity(
+ merge_operator_, user_key, entity, merge_context_.GetOperands(),
+ &saved_value_, logger_, statistics_, clock_,
+ /* update_num_ops_stats */ true);
+ if (!s.ok()) {
+ valid_ = false;
+ status_ = s;
+ return false;
+ }
+
+ if (!SetValueAndColumnsFromEntity(saved_value_)) {
+ return false;
+ }
+
+ valid_ = true;
+ return true;
+}
+
// Move backwards until the key smaller than saved_key_.
// Changes valid_ only if return value is false.
bool DBIter::FindUserKeyBeforeSavedKey() {
return false;
}
- if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
+ if (CompareKeyForSkip(ikey.user_key, saved_key_.GetUserKey()) < 0) {
return true;
}
if (num_skipped >= max_skip_) {
num_skipped = 0;
IterKey last_key;
- last_key.SetInternalKey(ParsedInternalKey(
- saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
+ ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber,
+ kValueTypeForSeek);
+ if (timestamp_size_ > 0) {
+ // TODO: pre-create kTsMax.
+ const std::string kTsMax(timestamp_size_, '\xff');
+ pikey.SetTimestamp(kTsMax);
+ }
+ last_key.SetInternalKey(pikey);
// It would be more efficient to use SeekForPrev() here, but some
// iterators may not support it.
iter_.Seek(last_key.GetInternalKey());
saved_key_.Clear();
// now saved_key is used to store internal key.
saved_key_.SetInternalKey(target, 0 /* sequence_number */,
- kValueTypeForSeekForPrev);
+ kValueTypeForSeekForPrev, timestamp_ub_);
+
+ if (timestamp_size_ > 0) {
+ const std::string kTsMin(timestamp_size_, '\0');
+ Slice ts = kTsMin;
+ saved_key_.UpdateInternalKey(
+ /*seq=*/0, kValueTypeForSeekForPrev,
+ timestamp_lb_ == nullptr ? &ts : timestamp_lb_);
+ }
if (iterate_upper_bound_ != nullptr &&
- user_comparator_.Compare(saved_key_.GetUserKey(),
- *iterate_upper_bound_) >= 0) {
+ user_comparator_.CompareWithoutTimestamp(
+ saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_upper_bound_,
+ /*b_has_ts=*/false) >= 0) {
saved_key_.Clear();
- saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
+ saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber,
+ kValueTypeForSeekForPrev, timestamp_ub_);
+ if (timestamp_size_ > 0) {
+ const std::string kTsMax(timestamp_size_, '\xff');
+ Slice ts = kTsMax;
+ saved_key_.UpdateInternalKey(
+ kMaxSequenceNumber, kValueTypeForSeekForPrev,
+ timestamp_lb_ != nullptr ? timestamp_lb_ : &ts);
+ }
}
}
void DBIter::Seek(const Slice& target) {
- PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
- StopWatch sw(env_, statistics_, DB_SEEK);
+ PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
+ StopWatch sw(clock_, statistics_, DB_SEEK);
#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
// TODO: What do we do if this returns an error?
- db_impl_->TraceIteratorSeek(cfd_->GetID(), target).PermitUncheckedError();
+ Slice lower_bound, upper_bound;
+ if (iterate_lower_bound_ != nullptr) {
+ lower_bound = *iterate_lower_bound_;
+ } else {
+ lower_bound = Slice("");
+ }
+ if (iterate_upper_bound_ != nullptr) {
+ upper_bound = *iterate_upper_bound_;
+ } else {
+ upper_bound = Slice("");
+ }
+ db_impl_->TraceIteratorSeek(cfd_->GetID(), target, lower_bound, upper_bound)
+ .PermitUncheckedError();
}
#endif // ROCKSDB_LITE
status_ = Status::OK();
ReleaseTempPinnedData();
+ ResetBlobValue();
+ ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
// Seek the inner iterator based on the target key.
SetSavedKeyToSeekTarget(target);
iter_.Seek(saved_key_.GetInternalKey());
- range_del_agg_.InvalidateRangeDelMapPositions();
RecordTick(statistics_, NUMBER_DB_SEEK);
}
if (!iter_.Valid()) {
// we need to find out the next key that is visible to the user.
ClearSavedValue();
if (prefix_same_as_start_) {
- // The case where the iterator needs to be invalidated if it has exausted
+ // The case where the iterator needs to be invalidated if it has exhausted
// keys within the same prefix of the seek key.
assert(prefix_extractor_ != nullptr);
Slice target_prefix = prefix_extractor_->Transform(target);
}
void DBIter::SeekForPrev(const Slice& target) {
- PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
- StopWatch sw(env_, statistics_, DB_SEEK);
+ PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
+ StopWatch sw(clock_, statistics_, DB_SEEK);
#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
// TODO: What do we do if this returns an error?
- db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target)
+ Slice lower_bound, upper_bound;
+ if (iterate_lower_bound_ != nullptr) {
+ lower_bound = *iterate_lower_bound_;
+ } else {
+ lower_bound = Slice("");
+ }
+ if (iterate_upper_bound_ != nullptr) {
+ upper_bound = *iterate_upper_bound_;
+ } else {
+ upper_bound = Slice("");
+ }
+ db_impl_
+ ->TraceIteratorSeekForPrev(cfd_->GetID(), target, lower_bound,
+ upper_bound)
.PermitUncheckedError();
}
#endif // ROCKSDB_LITE
- if (timestamp_size_ > 0) {
- valid_ = false;
- status_ = Status::NotSupported(
- "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
- return;
- }
-
status_ = Status::OK();
ReleaseTempPinnedData();
+ ResetBlobValue();
+ ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
// Seek the inner iterator based on the target key.
PERF_TIMER_GUARD(seek_internal_seek_time);
SetSavedKeyToSeekForPrevTarget(target);
iter_.SeekForPrev(saved_key_.GetInternalKey());
- range_del_agg_.InvalidateRangeDelMapPositions();
RecordTick(statistics_, NUMBER_DB_SEEK);
}
if (!iter_.Valid()) {
// backward direction.
ClearSavedValue();
if (prefix_same_as_start_) {
- // The case where the iterator needs to be invalidated if it has exausted
+ // The case where the iterator needs to be invalidated if it has exhausted
// keys within the same prefix of the seek key.
assert(prefix_extractor_ != nullptr);
Slice target_prefix = prefix_extractor_->Transform(target);
Seek(*iterate_lower_bound_);
return;
}
- PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
+ PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek will be used.
if (!expect_total_order_inner_iter()) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
status_ = Status::OK();
+ // if iterator is empty, this status_ could be unchecked.
+ status_.PermitUncheckedError();
direction_ = kForward;
ReleaseTempPinnedData();
+ ResetBlobValue();
+ ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
ClearSavedValue();
is_key_seqnum_zero_ = false;
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_.SeekToFirst();
- range_del_agg_.InvalidateRangeDelMapPositions();
}
RecordTick(statistics_, NUMBER_DB_SEEK);
}
if (valid_ && prefix_same_as_start_) {
assert(prefix_extractor_ != nullptr);
- prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
+ prefix_.SetUserKey(prefix_extractor_->Transform(
+ StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
}
}
void DBIter::SeekToLast() {
- if (timestamp_size_ > 0) {
- valid_ = false;
- status_ = Status::NotSupported(
- "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
- return;
- }
-
if (iterate_upper_bound_ != nullptr) {
// Seek to last key strictly less than ReadOptions.iterate_upper_bound.
SeekForPrev(*iterate_upper_bound_);
- if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
+ const bool is_ikey = (timestamp_size_ > 0 && timestamp_lb_ != nullptr);
+ Slice k = Valid() ? key() : Slice();
+ if (is_ikey && Valid()) {
+ k.remove_suffix(kNumInternalBytes + timestamp_size_);
+ }
+ while (Valid() && 0 == user_comparator_.CompareWithoutTimestamp(
+ *iterate_upper_bound_, /*a_has_ts=*/false, k,
+ /*b_has_ts=*/false)) {
ReleaseTempPinnedData();
+ ResetBlobValue();
+ ResetValueAndColumns();
PrevInternal(nullptr);
+
+ k = key();
+ if (is_ikey) {
+ k.remove_suffix(kNumInternalBytes + timestamp_size_);
+ }
}
return;
}
- PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
+ PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek will be used.
if (!expect_total_order_inner_iter()) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
status_ = Status::OK();
+ // if iterator is empty, this status_ could be unchecked.
+ status_.PermitUncheckedError();
direction_ = kReverse;
ReleaseTempPinnedData();
+ ResetBlobValue();
+ ResetValueAndColumns();
ResetInternalKeysSkippedCounter();
ClearSavedValue();
is_key_seqnum_zero_ = false;
{
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_.SeekToLast();
- range_del_agg_.InvalidateRangeDelMapPositions();
}
PrevInternal(nullptr);
if (statistics_ != nullptr) {
}
if (valid_ && prefix_same_as_start_) {
assert(prefix_extractor_ != nullptr);
- prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
+ prefix_.SetUserKey(prefix_extractor_->Transform(
+ StripTimestampFromUserKey(saved_key_.GetUserKey(), timestamp_size_)));
}
}
Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
- const ImmutableCFOptions& cf_options,
+ const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
const Comparator* user_key_comparator,
- InternalIterator* internal_iter,
+ InternalIterator* internal_iter, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
- ColumnFamilyData* cfd, bool allow_blob) {
- DBIter* db_iter = new DBIter(
- env, read_options, cf_options, mutable_cf_options, user_key_comparator,
- internal_iter, sequence, false, max_sequential_skip_in_iterations,
- read_callback, db_impl, cfd, allow_blob);
+ ColumnFamilyData* cfd, bool expose_blob_index) {
+ DBIter* db_iter =
+ new DBIter(env, read_options, ioptions, mutable_cf_options,
+ user_key_comparator, internal_iter, version, sequence, false,
+ max_sequential_skip_in_iterations, read_callback, db_impl, cfd,
+ expose_blob_index);
return db_iter;
}