]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_iter.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_iter.cc
index e5d40294819222150c28dfba349a12d081e3bb61..bfe7c721382c252cf7bc0f612e37118b02c9b517 100644 (file)
@@ -49,6 +49,8 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
       read_callback_(read_callback),
       sequence_(s),
       statistics_(cf_options.statistics),
+      max_skip_(max_sequential_skip_in_iterations),
+      max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
       num_internal_keys_skipped_(0),
       iterate_lower_bound_(read_options.iterate_lower_bound),
       iterate_upper_bound_(read_options.iterate_upper_bound),
@@ -69,16 +71,18 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
       range_del_agg_(&cf_options.internal_comparator, s),
       db_impl_(db_impl),
       cfd_(cfd),
-      start_seqnum_(read_options.iter_start_seqnum) {
+      start_seqnum_(read_options.iter_start_seqnum),
+      timestamp_ub_(read_options.timestamp),
+      timestamp_lb_(read_options.iter_start_ts),
+      timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
   RecordTick(statistics_, NO_ITERATOR_CREATED);
-  max_skip_ = max_sequential_skip_in_iterations;
-  max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
   if (pin_thru_lifetime_) {
     pinned_iters_mgr_.StartPinning();
   }
   if (iter_.iter()) {
     iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
   }
+  assert(timestamp_size_ == user_comparator_.timestamp_size());
 }
 
 Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
@@ -103,11 +107,12 @@ Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
 }
 
 bool DBIter::ParseKey(ParsedInternalKey* ikey) {
-  if (!ParseInternalKey(iter_.key(), ikey)) {
-    status_ = Status::Corruption("corrupted internal key in DBIter");
+  Status s =
+      ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);  // TODO
+  if (!s.ok()) {
+    status_ = Status::Corruption("In DBIter: ", s.getState());
     valid_ = false;
-    ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
-                    iter_.key().ToString(true).c_str());
+    ROCKS_LOG_ERROR(logger_, "In DBIter: %s", status_.getState());
     return false;
   } else {
     return true;
@@ -143,13 +148,13 @@ void DBIter::Next() {
 
   local_stats_.next_count_++;
   if (ok && iter_.Valid()) {
-    Slice prefix;
     if (prefix_same_as_start_) {
       assert(prefix_extractor_ != nullptr);
-      prefix = prefix_.GetUserKey();
+      const Slice prefix = prefix_.GetUserKey();
+      FindNextUserEntry(true /* skipping the current user key */, &prefix);
+    } else {
+      FindNextUserEntry(true /* skipping the current user key */, nullptr);
     }
-    FindNextUserEntry(true /* skipping the current user key */,
-                      prefix_same_as_start_ ? &prefix : nullptr);
   } else {
     is_key_seqnum_zero_ = false;
     valid_ = false;
@@ -219,10 +224,16 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
 
     is_key_seqnum_zero_ = (ikey_.sequence == 0);
 
-    assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() ||
-           user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) < 0);
-    if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() &&
-        user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
+    assert(iterate_upper_bound_ == nullptr ||
+           iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound ||
+           user_comparator_.CompareWithoutTimestamp(
+               ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
+               /*b_has_ts=*/false) < 0);
+    if (iterate_upper_bound_ != nullptr &&
+        iter_.UpperBoundCheckResult() != IterBoundCheck::kInbound &&
+        user_comparator_.CompareWithoutTimestamp(
+            ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
+            /*b_has_ts=*/false) >= 0) {
       break;
     }
 
@@ -237,24 +248,37 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
       return false;
     }
 
-    if (IsVisible(ikey_.sequence)) {
+    assert(ikey_.user_key.size() >= timestamp_size_);
+    Slice ts = timestamp_size_ > 0 ? ExtractTimestampFromUserKey(
+                                         ikey_.user_key, timestamp_size_)
+                                   : Slice();
+    bool more_recent = false;
+    if (IsVisible(ikey_.sequence, ts, &more_recent)) {
       // If the previous entry is of seqnum 0, the current entry will not
       // possibly be skipped. This condition can potentially be relaxed to
       // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
       // prone to bugs causing the same user key with the same sequence number.
-      if (!is_prev_key_seqnum_zero && skipping_saved_key &&
-          user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey()) <=
-              0) {
+      // Note that with current timestamp implementation, the same user key can
+      // have different timestamps and zero sequence number on the bottommost
+      // level. This may change in the future.
+      if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) &&
+          skipping_saved_key &&
+          CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
         num_skipped++;  // skip this entry
         PERF_COUNTER_ADD(internal_key_skipped_count, 1);
       } else {
         assert(!skipping_saved_key ||
-               user_comparator_.Compare(ikey_.user_key,
-                                        saved_key_.GetUserKey()) > 0);
+               CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0);
+        if (!iter_.PrepareValue()) {
+          assert(!iter_.status().ok());
+          valid_ = false;
+          return false;
+        }
         num_skipped = 0;
         reseek_done = false;
         switch (ikey_.type) {
           case kTypeDeletion:
+          case kTypeDeletionWithTimestamp:
           case kTypeSingleDeletion:
             // Arrange to skip all upcoming entries for this key since
             // they are hidden by this deletion.
@@ -263,7 +287,20 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
             // 2) return ikey only if ikey.seqnum >= start_seqnum_
             // note that if deletion seqnum is < start_seqnum_ we
             // just skip it like in normal iterator.
-            if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_)  {
+            if (start_seqnum_ > 0) {
+              if (ikey_.sequence >= start_seqnum_) {
+                saved_key_.SetInternalKey(ikey_);
+                valid_ = true;
+                return true;
+              } else {
+                saved_key_.SetUserKey(
+                    ikey_.user_key,
+                    !pin_thru_lifetime_ ||
+                        !iter_.iter()->IsKeyPinned() /* copy */);
+                skipping_saved_key = true;
+                PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
+              }
+            } else if (timestamp_lb_) {
               saved_key_.SetInternalKey(ikey_);
               valid_ = true;
               return true;
@@ -278,10 +315,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
           case kTypeValue:
           case kTypeBlobIndex:
             if (start_seqnum_ > 0) {
-              // we are taking incremental snapshot here
-              // incremental snapshots aren't supported on DB with range deletes
-              assert(ikey_.type != kTypeBlobIndex);
               if (ikey_.sequence >= start_seqnum_) {
+                assert(ikey_.type != kTypeBlobIndex);
                 saved_key_.SetInternalKey(ikey_);
                 valid_ = true;
                 return true;
@@ -294,6 +329,10 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
                         !iter_.iter()->IsKeyPinned() /* copy */);
                 skipping_saved_key = true;
               }
+            } else if (timestamp_lb_) {
+              saved_key_.SetInternalKey(ikey_);
+              valid_ = true;
+              return true;
             } else {
               saved_key_.SetUserKey(
                   ikey_.user_key, !pin_thru_lifetime_ ||
@@ -346,18 +385,23 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
             }
             break;
           default:
-            assert(false);
-            break;
+            valid_ = false;
+            status_ = Status::Corruption(
+                "Unknown value type: " +
+                std::to_string(static_cast<unsigned int>(ikey_.type)));
+            return false;
         }
       }
     } else {
-      PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
+      if (more_recent) {
+        PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
+      }
 
-      // This key was inserted after our snapshot was taken.
-      // If this happens too many times in a row for the same user key, we want
-      // to seek to the target sequence number.
-      int cmp =
-          user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey());
+      // This key was inserted after our snapshot was taken or skipped by
+      // timestamp range. If this happens too many times in a row for the same
+      // user key, we want to seek to the target sequence number.
+      int cmp = user_comparator_.CompareWithoutTimestamp(
+          ikey_.user_key, saved_key_.GetUserKey());
       if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
         num_skipped++;
       } else {
@@ -388,8 +432,17 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
         // We're looking for the next user-key but all we see are the same
         // user-key with decreasing sequence numbers. Fast forward to
         // sequence number 0 and type deletion (the smallest type).
-        AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
-                                                       0, kTypeDeletion));
+        if (timestamp_size_ == 0) {
+          AppendInternalKey(
+              &last_key,
+              ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
+        } else {
+          const std::string kTsMin(timestamp_size_, static_cast<char>(0));
+          AppendInternalKeyWithDifferentTimestamp(
+              &last_key,
+              ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
+              kTsMin);
+        }
         // Don't set skipping_saved_key = false because we may still see more
         // user-keys equal to saved_key_.
       } else {
@@ -398,9 +451,17 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
         // Note that this only covers a case when a higher key was overwritten
         // many times since our snapshot was taken, not the case when a lot of
         // different keys were inserted after our snapshot was taken.
-        AppendInternalKey(&last_key,
-                          ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
-                                            kValueTypeForSeek));
+        if (timestamp_size_ == 0) {
+          AppendInternalKey(
+              &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
+                                           kValueTypeForSeek));
+        } else {
+          AppendInternalKeyWithDifferentTimestamp(
+              &last_key,
+              ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
+                                kValueTypeForSeek),
+              *timestamp_ub_);
+        }
       }
       iter_.Seek(last_key);
       RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
@@ -417,6 +478,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
 // Scan from the newer entries to older entries.
 // PRE: iter_.key() points to the first merge type entry
 //      saved_key_ stores the user key
+//      iter_.PrepareValue() has been called
 // POST: saved_value_ has the merged value for the user key
 //       iter_ points to the next entry (or invalid)
 bool DBIter::MergeValuesNewToOld() {
@@ -446,14 +508,21 @@ bool DBIter::MergeValuesNewToOld() {
     if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
       // hit the next user key, stop right here
       break;
-    } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
+    }
+    if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
                range_del_agg_.ShouldDelete(
                    ikey, RangeDelPositioningMode::kForwardTraversal)) {
       // hit a delete with the same user key, stop right here
       // iter_ is positioned after delete
       iter_.Next();
       break;
-    } else if (kTypeValue == ikey.type) {
+    }
+    if (!iter_.PrepareValue()) {
+      valid_ = false;
+      return false;
+    }
+
+    if (kTypeValue == ikey.type) {
       // hit a put, merge the put value with operands and store the
       // final result in saved_value_. We are done!
       const Slice val = iter_.value();
@@ -491,7 +560,11 @@ bool DBIter::MergeValuesNewToOld() {
       valid_ = false;
       return false;
     } else {
-      assert(false);
+      valid_ = false;
+      status_ = Status::Corruption(
+          "Unrecognized value type: " +
+          std::to_string(static_cast<unsigned int>(ikey.type)));
+      return false;
     }
   }
 
@@ -519,6 +592,13 @@ bool DBIter::MergeValuesNewToOld() {
 }
 
 void DBIter::Prev() {
+  if (timestamp_size_ > 0) {
+    valid_ = false;
+    status_ = Status::NotSupported(
+        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
+    return;
+  }
+
   assert(valid_);
   assert(status_.ok());
 
@@ -697,7 +777,13 @@ bool DBIter::FindValueForCurrentKey() {
       return false;
     }
 
-    if (!IsVisible(ikey.sequence) ||
+    assert(ikey.user_key.size() >= timestamp_size_);
+    Slice ts;
+    if (timestamp_size_ > 0) {
+      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
+                 timestamp_size_);
+    }
+    if (!IsVisible(ikey.sequence, ts) ||
         !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
       break;
     }
@@ -712,6 +798,11 @@ bool DBIter::FindValueForCurrentKey() {
       return FindValueForCurrentKeyUsingSeek();
     }
 
+    if (!iter_.PrepareValue()) {
+      valid_ = false;
+      return false;
+    }
+
     last_key_entry_type = ikey.type;
     switch (last_key_entry_type) {
       case kTypeValue:
@@ -720,12 +811,19 @@ bool DBIter::FindValueForCurrentKey() {
                 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
           last_key_entry_type = kTypeRangeDeletion;
           PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
-        } else {
-          assert(iter_.iter()->IsValuePinned());
+        } else if (iter_.iter()->IsValuePinned()) {
           pinned_value_ = iter_.value();
+        } else {
+          valid_ = false;
+          status_ = Status::NotSupported(
+              "Backward iteration not supported if underlying iterator's value "
+              "cannot be pinned.");
         }
         merge_context_.Clear();
         last_not_merge_type = last_key_entry_type;
+        if (!status_.ok()) {
+          return false;
+        }
         break;
       case kTypeDeletion:
       case kTypeSingleDeletion:
@@ -749,7 +847,11 @@ bool DBIter::FindValueForCurrentKey() {
         }
         break;
       default:
-        assert(false);
+        valid_ = false;
+        status_ = Status::Corruption(
+            "Unknown value type: " +
+            std::to_string(static_cast<unsigned int>(last_key_entry_type)));
+        return false;
     }
 
     PERF_COUNTER_ADD(internal_key_skipped_count, 1);
@@ -763,6 +865,7 @@ bool DBIter::FindValueForCurrentKey() {
   }
 
   Status s;
+  s.PermitUncheckedError();
   is_blob_ = false;
   switch (last_key_entry_type) {
     case kTypeDeletion:
@@ -814,8 +917,11 @@ bool DBIter::FindValueForCurrentKey() {
       is_blob_ = true;
       break;
     default:
-      assert(false);
-      break;
+      valid_ = false;
+      status_ = Status::Corruption(
+          "Unknown value type: " +
+          std::to_string(static_cast<unsigned int>(last_key_entry_type)));
+      return false;
   }
   if (!s.ok()) {
     valid_ = false;
@@ -853,6 +959,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
     if (!ParseKey(&ikey)) {
       return false;
     }
+    assert(ikey.user_key.size() >= timestamp_size_);
+    Slice ts;
+    if (timestamp_size_ > 0) {
+      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
+                 timestamp_size_);
+    }
+
     if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
       // No visible values for this key, even though FindValueForCurrentKey()
       // has seen some. This is possible if we're using a tailing iterator, and
@@ -861,7 +974,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
       return true;
     }
 
-    if (IsVisible(ikey.sequence)) {
+    if (IsVisible(ikey.sequence, ts)) {
       break;
     }
 
@@ -882,6 +995,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
     valid_ = false;
     return false;
   }
+  if (!iter_.PrepareValue()) {
+    valid_ = false;
+    return false;
+  }
   if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
     assert(iter_.iter()->IsValuePinned());
     pinned_value_ = iter_.value();
@@ -913,12 +1030,17 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
     if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
       break;
     }
-
     if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
         range_del_agg_.ShouldDelete(
             ikey, RangeDelPositioningMode::kForwardTraversal)) {
       break;
-    } else if (ikey.type == kTypeValue) {
+    }
+    if (!iter_.PrepareValue()) {
+      valid_ = false;
+      return false;
+    }
+
+    if (ikey.type == kTypeValue) {
       const Slice val = iter_.value();
       Status s = MergeHelper::TimedFullMerge(
           merge_operator_, saved_key_.GetUserKey(), &val,
@@ -948,7 +1070,11 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
       valid_ = false;
       return false;
     } else {
-      assert(false);
+      valid_ = false;
+      status_ = Status::Corruption(
+          "Unknown value type: " +
+          std::to_string(static_cast<unsigned int>(ikey.type)));
+      return false;
     }
   }
 
@@ -1001,7 +1127,13 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
     }
 
     assert(ikey.sequence != kMaxSequenceNumber);
-    if (!IsVisible(ikey.sequence)) {
+    assert(ikey.user_key.size() >= timestamp_size_);
+    Slice ts;
+    if (timestamp_size_ > 0) {
+      ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
+                 timestamp_size_);
+    }
+    if (!IsVisible(ikey.sequence, ts)) {
       PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
     } else {
       PERF_COUNTER_ADD(internal_key_skipped_count, 1);
@@ -1046,26 +1178,40 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) {
   return false;
 }
 
-bool DBIter::IsVisible(SequenceNumber sequence) {
-  if (read_callback_ == nullptr) {
-    return sequence <= sequence_;
-  } else {
-    return read_callback_->IsVisible(sequence);
+bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts,
+                       bool* more_recent) {
+  // Remember that comparator orders preceding timestamp as larger.
+  // TODO(yanqin): support timestamp in read_callback_.
+  bool visible_by_seq = (read_callback_ == nullptr)
+                            ? sequence <= sequence_
+                            : read_callback_->IsVisible(sequence);
+
+  bool visible_by_ts =
+      (timestamp_ub_ == nullptr ||
+       user_comparator_.CompareTimestamp(ts, *timestamp_ub_) <= 0) &&
+      (timestamp_lb_ == nullptr ||
+       user_comparator_.CompareTimestamp(ts, *timestamp_lb_) >= 0);
+
+  if (more_recent) {
+    *more_recent = !visible_by_seq;
   }
+  return visible_by_seq && visible_by_ts;
 }
 
 void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
   is_key_seqnum_zero_ = false;
   SequenceNumber seq = sequence_;
   saved_key_.Clear();
-  saved_key_.SetInternalKey(target, seq);
+  saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_);
 
   if (iterate_lower_bound_ != nullptr &&
-      user_comparator_.Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) <
-          0) {
+      user_comparator_.CompareWithoutTimestamp(
+          saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
+          /*b_has_ts=*/false) < 0) {
     // Seek key is smaller than the lower bound.
     saved_key_.Clear();
-    saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
+    saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek,
+                              timestamp_ub_);
   }
 }
 
@@ -1090,7 +1236,8 @@ void DBIter::Seek(const Slice& target) {
 
 #ifndef ROCKSDB_LITE
   if (db_impl_ != nullptr && cfd_ != nullptr) {
-    db_impl_->TraceIteratorSeek(cfd_->GetID(), target);
+    // TODO: What do we do if this returns an error?
+    db_impl_->TraceIteratorSeek(cfd_->GetID(), target).PermitUncheckedError();
   }
 #endif  // ROCKSDB_LITE
 
@@ -1151,10 +1298,19 @@ void DBIter::SeekForPrev(const Slice& target) {
 
 #ifndef ROCKSDB_LITE
   if (db_impl_ != nullptr && cfd_ != nullptr) {
-    db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target);
+    // TODO: What do we do if this returns an error?
+    db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target)
+        .PermitUncheckedError();
   }
 #endif  // ROCKSDB_LITE
 
+  if (timestamp_size_ > 0) {
+    valid_ = false;
+    status_ = Status::NotSupported(
+        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
+    return;
+  }
+
   status_ = Status::OK();
   ReleaseTempPinnedData();
   ResetInternalKeysSkippedCounter();
@@ -1248,6 +1404,13 @@ void DBIter::SeekToFirst() {
 }
 
 void DBIter::SeekToLast() {
+  if (timestamp_size_ > 0) {
+    valid_ = false;
+    status_ = Status::NotSupported(
+        "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
+    return;
+  }
+
   if (iterate_upper_bound_ != nullptr) {
     // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
     SeekForPrev(*iterate_upper_bound_);