]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction/compaction_iterator.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_iterator.cc
index 1bebfc71743e60dd7c46a3d469c532b2581d71c4..8ee5841ed58cd06908f1385be0739db69b3f06d3 100644 (file)
@@ -3,9 +3,11 @@
 //  COPYING file in the root directory) and Apache 2.0 License
 //  (found in the LICENSE.Apache file in the root directory).
 
+#include "db/compaction/compaction_iterator.h"
+
 #include <cinttypes>
 
-#include "db/compaction/compaction_iterator.h"
+#include "db/blob/blob_file_builder.h"
 #include "db/snapshot_checker.h"
 #include "port/likely.h"
 #include "rocksdb/listener.h"
@@ -36,20 +38,23 @@ CompactionIterator::CompactionIterator(
     SequenceNumber earliest_write_conflict_snapshot,
     const SnapshotChecker* snapshot_checker, Env* env,
     bool report_detailed_time, bool expect_valid_internal_key,
-    CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction,
-    const CompactionFilter* compaction_filter,
+    CompactionRangeDelAggregator* range_del_agg,
+    BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
+    const Compaction* compaction, const CompactionFilter* compaction_filter,
     const std::atomic<bool>* shutting_down,
     const SequenceNumber preserve_deletes_seqnum,
-    const std::atomic<bool>* manual_compaction_paused,
-    const std::shared_ptr<Logger> info_log)
+    const std::atomic<int>* manual_compaction_paused,
+    const std::shared_ptr<Logger> info_log,
+    const std::string* full_history_ts_low)
     : CompactionIterator(
           input, cmp, merge_helper, last_sequence, snapshots,
           earliest_write_conflict_snapshot, snapshot_checker, env,
           report_detailed_time, expect_valid_internal_key, range_del_agg,
+          blob_file_builder, allow_data_in_errors,
           std::unique_ptr<CompactionProxy>(
-              compaction ? new CompactionProxy(compaction) : nullptr),
+              compaction ? new RealCompaction(compaction) : nullptr),
           compaction_filter, shutting_down, preserve_deletes_seqnum,
-          manual_compaction_paused, info_log) {}
+          manual_compaction_paused, info_log, full_history_ts_low) {}
 
 CompactionIterator::CompactionIterator(
     InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
@@ -58,12 +63,14 @@ CompactionIterator::CompactionIterator(
     const SnapshotChecker* snapshot_checker, Env* env,
     bool report_detailed_time, bool expect_valid_internal_key,
     CompactionRangeDelAggregator* range_del_agg,
+    BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
     std::unique_ptr<CompactionProxy> compaction,
     const CompactionFilter* compaction_filter,
     const std::atomic<bool>* shutting_down,
     const SequenceNumber preserve_deletes_seqnum,
-    const std::atomic<bool>* manual_compaction_paused,
-    const std::shared_ptr<Logger> info_log)
+    const std::atomic<int>* manual_compaction_paused,
+    const std::shared_ptr<Logger> info_log,
+    const std::string* full_history_ts_low)
     : input_(input),
       cmp_(cmp),
       merge_helper_(merge_helper),
@@ -74,20 +81,27 @@ CompactionIterator::CompactionIterator(
       report_detailed_time_(report_detailed_time),
       expect_valid_internal_key_(expect_valid_internal_key),
       range_del_agg_(range_del_agg),
+      blob_file_builder_(blob_file_builder),
       compaction_(std::move(compaction)),
       compaction_filter_(compaction_filter),
       shutting_down_(shutting_down),
       manual_compaction_paused_(manual_compaction_paused),
       preserve_deletes_seqnum_(preserve_deletes_seqnum),
+      info_log_(info_log),
+      allow_data_in_errors_(allow_data_in_errors),
+      timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
+      full_history_ts_low_(full_history_ts_low),
       current_user_key_sequence_(0),
       current_user_key_snapshot_(0),
       merge_out_iter_(merge_helper_),
       current_key_committed_(false),
-      info_log_(info_log) {
+      cmp_with_history_ts_low_(0) {
   assert(compaction_filter_ == nullptr || compaction_ != nullptr);
   assert(snapshots_ != nullptr);
-  bottommost_level_ =
-      compaction_ == nullptr ? false : compaction_->bottommost_level();
+  bottommost_level_ = compaction_ == nullptr
+                          ? false
+                          : compaction_->bottommost_level() &&
+                                !compaction_->allow_ingest_behind();
   if (compaction_ != nullptr) {
     level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
   }
@@ -108,6 +122,8 @@ CompactionIterator::CompactionIterator(
   for (size_t i = 1; i < snapshots_->size(); ++i) {
     assert(snapshots_->at(i - 1) < snapshots_->at(i));
   }
+  assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
+         timestamp_size_ == full_history_ts_low_->size());
 #endif
   input_->SetPinnedItersMgr(&pinned_iters_mgr_);
   TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
@@ -142,14 +158,13 @@ void CompactionIterator::Next() {
     if (merge_out_iter_.Valid()) {
       key_ = merge_out_iter_.key();
       value_ = merge_out_iter_.value();
-      bool valid_key __attribute__((__unused__));
-      valid_key =  ParseInternalKey(key_, &ikey_);
+      Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
       // MergeUntil stops when it encounters a corrupt key and does not
       // include them in the result, so we expect the keys here to be valid.
-      assert(valid_key);
-      if (!valid_key) {
-        ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
-                        key_.ToString(true).c_str());
+      assert(s.ok());
+      if (!s.ok()) {
+        ROCKS_LOG_FATAL(info_log_, "Invalid key in compaction. %s",
+                        s.getState());
       }
 
       // Keep current_key_ in sync.
@@ -182,7 +197,7 @@ void CompactionIterator::Next() {
   PrepareOutput();
 }
 
-void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
+bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
                                               Slice* skip_until) {
   if (compaction_filter_ != nullptr &&
       (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) {
@@ -225,14 +240,31 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
       value_.clear();
       iter_stats_.num_record_drop_user++;
     } else if (filter == CompactionFilter::Decision::kChangeValue) {
+      if (ikey_.type == kTypeBlobIndex) {
+        // value transfer from blob file to inlined data
+        ikey_.type = kTypeValue;
+        current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+      }
       value_ = compaction_filter_value_;
     } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
       *need_skip = true;
       compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
                                                        kValueTypeForSeek);
       *skip_until = compaction_filter_skip_until_.Encode();
+    } else if (filter == CompactionFilter::Decision::kChangeBlobIndex) {
+      if (ikey_.type == kTypeValue) {
+        // value transfer from inlined data to blob file
+        ikey_.type = kTypeBlobIndex;
+        current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+      }
+      value_ = compaction_filter_value_;
+    } else if (filter == CompactionFilter::Decision::kIOError) {
+      status_ =
+          Status::IOError("Failed to access blob during compaction filter");
+      return false;
     }
   }
+  return true;
 }
 
 void CompactionIterator::NextFromInput() {
@@ -245,27 +277,28 @@ void CompactionIterator::NextFromInput() {
     value_ = input_->value();
     iter_stats_.num_input_records++;
 
-    if (!ParseInternalKey(key_, &ikey_)) {
+    Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
+    if (!pik_status.ok()) {
+      iter_stats_.num_input_corrupt_records++;
+
       // If `expect_valid_internal_key_` is false, return the corrupted key
       // and let the caller decide what to do with it.
-      // TODO(noetzli): We should have a more elegant solution for this.
       if (expect_valid_internal_key_) {
-        assert(!"Corrupted internal key not expected.");
-        status_ = Status::Corruption("Corrupted internal key not expected.");
-        break;
+        status_ = pik_status;
+        return;
       }
       key_ = current_key_.SetInternalKey(key_);
       has_current_user_key_ = false;
       current_user_key_sequence_ = kMaxSequenceNumber;
       current_user_key_snapshot_ = 0;
-      iter_stats_.num_input_corrupt_records++;
       valid_ = true;
       break;
     }
     TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
 
     // Update input statistics
-    if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
+    if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
+        ikey_.type == kTypeDeletionWithTimestamp) {
       iter_stats_.num_input_deletion_records++;
     }
     iter_stats_.total_input_raw_key_bytes += key_.size();
@@ -278,25 +311,63 @@ void CompactionIterator::NextFromInput() {
     // merge_helper_->compaction_filter_skip_until_.
     Slice skip_until;
 
+    int cmp_user_key_without_ts = 0;
+    int cmp_ts = 0;
+    if (has_current_user_key_) {
+      cmp_user_key_without_ts =
+          timestamp_size_
+              ? cmp_->CompareWithoutTimestamp(ikey_.user_key, current_user_key_)
+              : cmp_->Compare(ikey_.user_key, current_user_key_);
+      // if timestamp_size_ > 0, then curr_ts_ has been initialized by a
+      // previous key.
+      cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
+                                     ExtractTimestampFromUserKey(
+                                         ikey_.user_key, timestamp_size_),
+                                     curr_ts_)
+                               : 0;
+    }
+
     // Check whether the user key changed. After this if statement current_key_
     // is a copy of the current input key (maybe converted to a delete by the
     // compaction filter). ikey_.user_key is pointing to the copy.
-    if (!has_current_user_key_ ||
-        !cmp_->Equal(ikey_.user_key, current_user_key_)) {
+    if (!has_current_user_key_ || cmp_user_key_without_ts != 0 || cmp_ts != 0) {
       // First occurrence of this user key
       // Copy key for output
       key_ = current_key_.SetInternalKey(key_, &ikey_);
+
+      // If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
+      // in next iteration to compare with the timestamp of next key.
+      UpdateTimestampAndCompareWithFullHistoryLow();
+
+      // If
+      // (1) !has_current_user_key_, OR
+      // (2) timestamp is disabled, OR
+      // (3) all history will be preserved, OR
+      // (4) user key (excluding timestamp) is different from previous key, OR
+      // (5) timestamp is NO older than *full_history_ts_low_
+      // then current_user_key_ must be treated as a different user key.
+      // This means, if a user key (excluding ts) is the same as the previous
+      // user key, and its ts is older than *full_history_ts_low_, then we
+      // consider this key for GC, e.g. it may be dropped if certain conditions
+      // match.
+      if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
+          0 != cmp_user_key_without_ts || cmp_with_history_ts_low_ >= 0) {
+        // Initialize for future comparison for rule (A) and etc.
+        current_user_key_sequence_ = kMaxSequenceNumber;
+        current_user_key_snapshot_ = 0;
+        has_current_user_key_ = true;
+      }
       current_user_key_ = ikey_.user_key;
-      has_current_user_key_ = true;
+
       has_outputted_key_ = false;
-      current_user_key_sequence_ = kMaxSequenceNumber;
-      current_user_key_snapshot_ = 0;
+
       current_key_committed_ = KeyCommitted(ikey_.sequence);
 
       // Apply the compaction filter to the first committed version of the user
       // key.
-      if (current_key_committed_) {
-        InvokeFilterIfNeeded(&need_skip, &skip_until);
+      if (current_key_committed_ &&
+          !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
+        break;
       }
     } else {
       // Update the current key to reflect the new sequence number/type without
@@ -316,8 +387,9 @@ void CompactionIterator::NextFromInput() {
         current_key_committed_ = KeyCommitted(ikey_.sequence);
         // Apply the compaction filter to the first committed version of the
         // user key.
-        if (current_key_committed_) {
-          InvokeFilterIfNeeded(&need_skip, &skip_until);
+        if (current_key_committed_ &&
+            !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
+          break;
         }
       }
     }
@@ -331,8 +403,7 @@ void CompactionIterator::NextFromInput() {
     // If there are no snapshots, then this kv affect visibility at tip.
     // Otherwise, search though all existing snapshots to find the earliest
     // snapshot that is affected by this kv.
-    SequenceNumber last_sequence __attribute__((__unused__));
-    last_sequence = current_user_key_sequence_;
+    SequenceNumber last_sequence = current_user_key_sequence_;
     current_user_key_sequence_ = ikey_.sequence;
     SequenceNumber last_snapshot = current_user_key_snapshot_;
     SequenceNumber prev_snapshot = 0;  // 0 means no previous snapshot
@@ -404,7 +475,9 @@ void CompactionIterator::NextFromInput() {
 
       // Check whether the next key exists, is not corrupt, and is the same key
       // as the single delete.
-      if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
+      if (input_->Valid() &&
+          ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_)
+              .ok() &&
           cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
         // Check whether the next key belongs to the same snapshot as the
         // SingleDelete.
@@ -508,9 +581,12 @@ void CompactionIterator::NextFromInput() {
                         last_sequence, current_user_key_sequence_);
       }
 
-      ++iter_stats_.num_record_drop_hidden;  // (A)
+      ++iter_stats_.num_record_drop_hidden;  // rule (A)
       input_->Next();
-    } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
+    } else if (compaction_ != nullptr &&
+               (ikey_.type == kTypeDeletion ||
+                (ikey_.type == kTypeDeletionWithTimestamp &&
+                 cmp_with_history_ts_low_ < 0)) &&
                IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
                ikeyNotNeededForIncrementalSnapshot() &&
                compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
@@ -534,30 +610,49 @@ void CompactionIterator::NextFromInput() {
       // given that:
       // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
       // (2) No value exist earlier than the deletion.
+      //
+      // Note also that a deletion marker of type kTypeDeletionWithTimestamp
+      // will be treated as a different user key unless the timestamp is older
+      // than *full_history_ts_low_.
       ++iter_stats_.num_record_drop_obsolete;
       if (!bottommost_level_) {
         ++iter_stats_.num_optimized_del_drop_obsolete;
       }
       input_->Next();
-    } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ &&
-               ikeyNotNeededForIncrementalSnapshot()) {
+    } else if ((ikey_.type == kTypeDeletion ||
+                (ikey_.type == kTypeDeletionWithTimestamp &&
+                 cmp_with_history_ts_low_ < 0)) &&
+               bottommost_level_ && ikeyNotNeededForIncrementalSnapshot()) {
       // Handle the case where we have a delete key at the bottom most level
       // We can skip outputting the key iff there are no subsequent puts for this
       // key
+      assert(!compaction_ || compaction_->KeyNotExistsBeyondOutputLevel(
+                                 ikey_.user_key, &level_ptrs_));
       ParsedInternalKey next_ikey;
       input_->Next();
-      // Skip over all versions of this key that happen to occur in the same snapshot
-      // range as the delete
-      while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
-             cmp_->Equal(ikey_.user_key, next_ikey.user_key) &&
+      // Skip over all versions of this key that happen to occur in the same
+      // snapshot range as the delete.
+      //
+      // Note that a deletion marker of type kTypeDeletionWithTimestamp will be
+      // considered to have a different user key unless the timestamp is older
+      // than *full_history_ts_low_.
+      while (!IsPausingManualCompaction() && !IsShuttingDown() &&
+             input_->Valid() &&
+             (ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_)
+                  .ok()) &&
+             0 == cmp_->CompareWithoutTimestamp(ikey_.user_key,
+                                                next_ikey.user_key) &&
              (prev_snapshot == 0 ||
               DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) {
         input_->Next();
       }
       // If you find you still need to output a row with this key, we need to output the
       // delete too
-      if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
-          cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
+      if (input_->Valid() &&
+          (ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_)
+               .ok()) &&
+          0 == cmp_->CompareWithoutTimestamp(ikey_.user_key,
+                                             next_ikey.user_key)) {
         valid_ = true;
         at_next_ = true;
       }
@@ -573,8 +668,9 @@ void CompactionIterator::NextFromInput() {
       // have hit (A)
       // We encapsulate the merge related state machine in a different
       // object to minimize change to the existing flow.
-      Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
-                                           prev_snapshot, bottommost_level_);
+      Status s =
+          merge_helper_->MergeUntil(input_, range_del_agg_, prev_snapshot,
+                                    bottommost_level_, allow_data_in_errors_);
       merge_out_iter_.SeekToFirst();
 
       if (!s.ok() && !s.IsMergeInProgress()) {
@@ -585,14 +681,13 @@ void CompactionIterator::NextFromInput() {
         //       These will be correctly set below.
         key_ = merge_out_iter_.key();
         value_ = merge_out_iter_.value();
-        bool valid_key __attribute__((__unused__));
-        valid_key = ParseInternalKey(key_, &ikey_);
+        pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
         // MergeUntil stops when it encounters a corrupt key and does not
         // include them in the result, so we expect the keys here to valid.
-        assert(valid_key);
-        if (!valid_key) {
-          ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
-                          key_.ToString(true).c_str());
+        assert(pik_status.ok());
+        if (!pik_status.ok()) {
+          ROCKS_LOG_FATAL(info_log_, "Invalid key in compaction. %s",
+                          pik_status.getState());
         }
         // Keep current_key_ in sync.
         current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
@@ -640,20 +735,37 @@ void CompactionIterator::NextFromInput() {
 
 void CompactionIterator::PrepareOutput() {
   if (valid_) {
-    if (compaction_filter_ && ikey_.type == kTypeBlobIndex) {
-      const auto blob_decision = compaction_filter_->PrepareBlobOutput(
-          user_key(), value_, &compaction_filter_value_);
-
-      if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
-        status_ = Status::Corruption(
-            "Corrupted blob reference encountered during GC");
-        valid_ = false;
-      } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
-        status_ = Status::IOError("Could not relocate blob during GC");
-        valid_ = false;
-      } else if (blob_decision ==
-                 CompactionFilter::BlobDecision::kChangeValue) {
-        value_ = compaction_filter_value_;
+    if (ikey_.type == kTypeValue) {
+      if (blob_file_builder_) {
+        blob_index_.clear();
+        const Status s =
+            blob_file_builder_->Add(user_key(), value_, &blob_index_);
+
+        if (!s.ok()) {
+          status_ = s;
+          valid_ = false;
+        } else if (!blob_index_.empty()) {
+          value_ = blob_index_;
+          ikey_.type = kTypeBlobIndex;
+          current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+        }
+      }
+    } else if (ikey_.type == kTypeBlobIndex) {
+      if (compaction_filter_) {
+        const auto blob_decision = compaction_filter_->PrepareBlobOutput(
+            user_key(), value_, &compaction_filter_value_);
+
+        if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
+          status_ = Status::Corruption(
+              "Corrupted blob reference encountered during GC");
+          valid_ = false;
+        } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
+          status_ = Status::IOError("Could not relocate blob during GC");
+          valid_ = false;
+        } else if (blob_decision ==
+                   CompactionFilter::BlobDecision::kChangeValue) {
+          value_ = compaction_filter_value_;
+        }
       }
     }
 
@@ -679,7 +791,18 @@ void CompactionIterator::PrepareOutput() {
                         ikey_.type);
       }
       ikey_.sequence = 0;
-      current_key_.UpdateInternalKey(0, ikey_.type);
+      if (!timestamp_size_) {
+        current_key_.UpdateInternalKey(0, ikey_.type);
+      } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
+        // We can also zero out timestamp for better compression.
+        // For the same user key (excluding timestamp), the timestamp-based
+        // history can be collapsed to save some space if the timestamp is
+        // older than *full_history_ts_low_.
+        const std::string kTsMin(timestamp_size_, static_cast<char>(0));
+        const Slice ts_slice = kTsMin;
+        ikey_.SetTimestamp(ts_slice);
+        current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
+      }
     }
   }
 }