]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction/compaction_iterator.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_iterator.h
index 5088038b60bc0a2755278a6318a8878379580c23..c215d2bbbd0648a638377693c4993587ebe98247 100644 (file)
@@ -5,6 +5,7 @@
 #pragma once
 
 #include <algorithm>
+#include <cinttypes>
 #include <deque>
 #include <string>
 #include <unordered_set>
 namespace ROCKSDB_NAMESPACE {
 
 class BlobFileBuilder;
+class BlobFetcher;
+class PrefetchBufferCollection;
+
+// A wrapper of internal iterator whose purpose is to count how
+// many entries there are in the iterator.
+class SequenceIterWrapper : public InternalIterator {
+ public:
+  SequenceIterWrapper(InternalIterator* iter, const Comparator* cmp,
+                      bool need_count_entries)
+      : icmp_(cmp),
+        inner_iter_(iter),
+        need_count_entries_(need_count_entries) {}
+  bool Valid() const override { return inner_iter_->Valid(); }
+  Status status() const override { return inner_iter_->status(); }
+  void Next() override {
+    num_itered_++;
+    inner_iter_->Next();
+  }
+  void Seek(const Slice& target) override {
+    if (!need_count_entries_) {
+      inner_iter_->Seek(target);
+    } else {
+      // For flush cases, we need to count total number of entries, so we
+      // do Next() rather than Seek().
+      while (inner_iter_->Valid() &&
+             icmp_.Compare(inner_iter_->key(), target) < 0) {
+        Next();
+      }
+    }
+  }
+  Slice key() const override { return inner_iter_->key(); }
+  Slice value() const override { return inner_iter_->value(); }
+
+  // Unused InternalIterator methods
+  void SeekToFirst() override { assert(false); }
+  void Prev() override { assert(false); }
+  void SeekForPrev(const Slice& /* target */) override { assert(false); }
+  void SeekToLast() override { assert(false); }
+
+  uint64_t num_itered() const { return num_itered_; }
+
+ private:
+  InternalKeyComparator icmp_;
+  InternalIterator* inner_iter_;  // not owned
+  uint64_t num_itered_ = 0;
+  bool need_count_entries_;
+};
 
 class CompactionIterator {
  public:
@@ -40,11 +88,29 @@ class CompactionIterator {
 
     virtual int number_levels() const = 0;
 
+    // Result includes timestamp if user-defined timestamp is enabled.
     virtual Slice GetLargestUserKey() const = 0;
 
     virtual bool allow_ingest_behind() const = 0;
 
-    virtual bool preserve_deletes() const = 0;
+    virtual bool allow_mmap_reads() const = 0;
+
+    virtual bool enable_blob_garbage_collection() const = 0;
+
+    virtual double blob_garbage_collection_age_cutoff() const = 0;
+
+    virtual uint64_t blob_compaction_readahead_size() const = 0;
+
+    virtual const Version* input_version() const = 0;
+
+    virtual bool DoesInputReferenceBlobFiles() const = 0;
+
+    virtual const Compaction* real_compaction() const = 0;
+
+    virtual bool SupportsPerKeyPlacement() const = 0;
+
+    // `key` includes timestamp if user-defined timestamp is enabled.
+    virtual bool WithinPenultimateLevelOutputRange(const Slice& key) const = 0;
   };
 
   class RealCompaction : public CompactionProxy {
@@ -52,7 +118,8 @@ class CompactionIterator {
     explicit RealCompaction(const Compaction* compaction)
         : compaction_(compaction) {
       assert(compaction_);
-      assert(compaction_->immutable_cf_options());
+      assert(compaction_->immutable_options());
+      assert(compaction_->mutable_cf_options());
     }
 
     int level() const override { return compaction_->level(); }
@@ -68,56 +135,92 @@ class CompactionIterator {
 
     int number_levels() const override { return compaction_->number_levels(); }
 
+    // Result includes timestamp if user-defined timestamp is enabled.
     Slice GetLargestUserKey() const override {
       return compaction_->GetLargestUserKey();
     }
 
     bool allow_ingest_behind() const override {
-      return compaction_->immutable_cf_options()->allow_ingest_behind;
+      return compaction_->immutable_options()->allow_ingest_behind;
+    }
+
+    bool allow_mmap_reads() const override {
+      return compaction_->immutable_options()->allow_mmap_reads;
+    }
+
+    bool enable_blob_garbage_collection() const override {
+      return compaction_->enable_blob_garbage_collection();
     }
 
-    bool preserve_deletes() const override {
-      return compaction_->immutable_cf_options()->preserve_deletes;
+    double blob_garbage_collection_age_cutoff() const override {
+      return compaction_->blob_garbage_collection_age_cutoff();
+    }
+
+    uint64_t blob_compaction_readahead_size() const override {
+      return compaction_->mutable_cf_options()->blob_compaction_readahead_size;
+    }
+
+    const Version* input_version() const override {
+      return compaction_->input_version();
+    }
+
+    bool DoesInputReferenceBlobFiles() const override {
+      return compaction_->DoesInputReferenceBlobFiles();
+    }
+
+    const Compaction* real_compaction() const override { return compaction_; }
+
+    bool SupportsPerKeyPlacement() const override {
+      return compaction_->SupportsPerKeyPlacement();
+    }
+
+    // Check if key is within penultimate level output range, to see if it's
+    // safe to output to the penultimate level for per_key_placement feature.
+    // `key` includes timestamp if user-defined timestamp is enabled.
+    bool WithinPenultimateLevelOutputRange(const Slice& key) const override {
+      return compaction_->WithinPenultimateLevelOutputRange(key);
     }
 
    private:
     const Compaction* compaction_;
   };
 
-  CompactionIterator(InternalIterator* input, const Comparator* cmp,
-                     MergeHelper* merge_helper, SequenceNumber last_sequence,
-                     std::vector<SequenceNumber>* snapshots,
-                     SequenceNumber earliest_write_conflict_snapshot,
-                     const SnapshotChecker* snapshot_checker, Env* env,
-                     bool report_detailed_time, bool expect_valid_internal_key,
-                     CompactionRangeDelAggregator* range_del_agg,
-                     BlobFileBuilder* blob_file_builder,
-                     bool allow_data_in_errors,
-                     const Compaction* compaction = nullptr,
-                     const CompactionFilter* compaction_filter = nullptr,
-                     const std::atomic<bool>* shutting_down = nullptr,
-                     const SequenceNumber preserve_deletes_seqnum = 0,
-                     const std::atomic<int>* manual_compaction_paused = nullptr,
-                     const std::shared_ptr<Logger> info_log = nullptr,
-                     const std::string* full_history_ts_low = nullptr);
+  CompactionIterator(
+      InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
+      SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
+      SequenceNumber earliest_write_conflict_snapshot,
+      SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
+      Env* env, bool report_detailed_time, bool expect_valid_internal_key,
+      CompactionRangeDelAggregator* range_del_agg,
+      BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
+      bool enforce_single_del_contracts,
+      const std::atomic<bool>& manual_compaction_canceled,
+      const Compaction* compaction = nullptr,
+      const CompactionFilter* compaction_filter = nullptr,
+      const std::atomic<bool>* shutting_down = nullptr,
+      const std::shared_ptr<Logger> info_log = nullptr,
+      const std::string* full_history_ts_low = nullptr,
+      const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
+      const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
 
   // Constructor with custom CompactionProxy, used for tests.
-  CompactionIterator(InternalIterator* input, const Comparator* cmp,
-                     MergeHelper* merge_helper, SequenceNumber last_sequence,
-                     std::vector<SequenceNumber>* snapshots,
-                     SequenceNumber earliest_write_conflict_snapshot,
-                     const SnapshotChecker* snapshot_checker, Env* env,
-                     bool report_detailed_time, bool expect_valid_internal_key,
-                     CompactionRangeDelAggregator* range_del_agg,
-                     BlobFileBuilder* blob_file_builder,
-                     bool allow_data_in_errors,
-                     std::unique_ptr<CompactionProxy> compaction,
-                     const CompactionFilter* compaction_filter = nullptr,
-                     const std::atomic<bool>* shutting_down = nullptr,
-                     const SequenceNumber preserve_deletes_seqnum = 0,
-                     const std::atomic<int>* manual_compaction_paused = nullptr,
-                     const std::shared_ptr<Logger> info_log = nullptr,
-                     const std::string* full_history_ts_low = nullptr);
+  CompactionIterator(
+      InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
+      SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
+      SequenceNumber earliest_write_conflict_snapshot,
+      SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
+      Env* env, bool report_detailed_time, bool expect_valid_internal_key,
+      CompactionRangeDelAggregator* range_del_agg,
+      BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
+      bool enforce_single_del_contracts,
+      const std::atomic<bool>& manual_compaction_canceled,
+      std::unique_ptr<CompactionProxy> compaction,
+      const CompactionFilter* compaction_filter = nullptr,
+      const std::atomic<bool>* shutting_down = nullptr,
+      const std::shared_ptr<Logger> info_log = nullptr,
+      const std::string* full_history_ts_low = nullptr,
+      const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
+      const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
 
   ~CompactionIterator();
 
@@ -138,19 +241,49 @@ class CompactionIterator {
   const Slice& value() const { return value_; }
   const Status& status() const { return status_; }
   const ParsedInternalKey& ikey() const { return ikey_; }
-  bool Valid() const { return valid_; }
+  inline bool Valid() const { return validity_info_.IsValid(); }
   const Slice& user_key() const { return current_user_key_; }
   const CompactionIterationStats& iter_stats() const { return iter_stats_; }
+  uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
+  // If the current key should be placed on penultimate level, only valid if
+  // per_key_placement is supported
+  bool output_to_penultimate_level() const {
+    return output_to_penultimate_level_;
+  }
+  Status InputStatus() const { return input_.status(); }
 
  private:
   // Processes the input stream to find the next output
   void NextFromInput();
 
-  // Do last preparations before presenting the output to the callee. At this
-  // point this only zeroes out the sequence number if possible for better
-  // compression.
+  // Do final preparations before presenting the output to the callee.
   void PrepareOutput();
 
+  // Decide the current key should be output to the last level or penultimate
+  // level, only call for compaction supports per key placement
+  void DecideOutputLevel();
+
+  // Passes the output value to the blob file builder (if any), and replaces it
+  // with the corresponding blob reference if it has been actually written to a
+  // blob file (i.e. if it passed the value size check). Returns true if the
+  // value got extracted to a blob file, false otherwise.
+  bool ExtractLargeValueIfNeededImpl();
+
+  // Extracts large values as described above, and updates the internal key's
+  // type to kTypeBlobIndex if the value got extracted. Should only be called
+  // for regular values (kTypeValue).
+  void ExtractLargeValueIfNeeded();
+
+  // Relocates valid blobs residing in the oldest blob files if garbage
+  // collection is enabled. Relocated blobs are written to new blob files or
+  // inlined in the LSM tree depending on the current settings (i.e.
+  // enable_blob_files and min_blob_size). Should only be called for blob
+  // references (kTypeBlobIndex).
+  //
+  // Note: the stacked BlobDB implementation's compaction filter based GC
+  // algorithm is also called from here.
+  void GarbageCollectBlobIfNeeded();
+
   // Invoke compaction filter if needed.
   // Return true on success, false on failures (e.g.: kIOError).
   bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
@@ -164,18 +297,15 @@ class CompactionIterator {
   inline SequenceNumber findEarliestVisibleSnapshot(
       SequenceNumber in, SequenceNumber* prev_snapshot);
 
-  // Checks whether the currently seen ikey_ is needed for
-  // incremental (differential) snapshot and hence can't be dropped
-  // or seqnum be zero-ed out even if all other conditions for it are met.
-  inline bool ikeyNotNeededForIncrementalSnapshot();
-
   inline bool KeyCommitted(SequenceNumber sequence) {
     return snapshot_checker_ == nullptr ||
-           snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) ==
+           snapshot_checker_->CheckInSnapshot(sequence, job_snapshot_) ==
                SnapshotCheckerResult::kInSnapshot;
   }
 
-  bool IsInEarliestSnapshot(SequenceNumber sequence);
+  bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
+
+  bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
 
   // Extract user-defined timestamp from user key if possible and compare it
   // with *full_history_ts_low_ if applicable.
@@ -191,7 +321,14 @@ class CompactionIterator {
     }
   }
 
-  InternalIterator* input_;
+  static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber(
+      const CompactionProxy* compaction);
+  static std::unique_ptr<BlobFetcher> CreateBlobFetcherIfNeeded(
+      const CompactionProxy* compaction);
+  static std::unique_ptr<PrefetchBufferCollection>
+  CreatePrefetchBufferCollectionIfNeeded(const CompactionProxy* compaction);
+
+  SequenceIterWrapper input_;
   const Comparator* cmp_;
   MergeHelper* merge_helper_;
   const std::vector<SequenceNumber>* snapshots_;
@@ -201,28 +338,28 @@ class CompactionIterator {
   // earliest visible snapshot of an older value.
   // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
   std::unordered_set<SequenceNumber> released_snapshots_;
-  std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
   const SequenceNumber earliest_write_conflict_snapshot_;
+  const SequenceNumber job_snapshot_;
   const SnapshotChecker* const snapshot_checker_;
   Env* env_;
-  bool report_detailed_time_;
-  bool expect_valid_internal_key_;
+  SystemClock* clock_;
+  const bool report_detailed_time_;
+  const bool expect_valid_internal_key_;
   CompactionRangeDelAggregator* range_del_agg_;
   BlobFileBuilder* blob_file_builder_;
   std::unique_ptr<CompactionProxy> compaction_;
   const CompactionFilter* compaction_filter_;
   const std::atomic<bool>* shutting_down_;
-  const std::atomic<int>* manual_compaction_paused_;
-  const SequenceNumber preserve_deletes_seqnum_;
-  bool bottommost_level_;
-  bool valid_ = false;
-  bool visible_at_tip_;
-  SequenceNumber earliest_snapshot_;
-  SequenceNumber latest_snapshot_;
+  const std::atomic<bool>& manual_compaction_canceled_;
+  const bool bottommost_level_;
+  const bool visible_at_tip_;
+  const SequenceNumber earliest_snapshot_;
 
   std::shared_ptr<Logger> info_log_;
 
-  bool allow_data_in_errors_;
+  const bool allow_data_in_errors_;
+
+  const bool enforce_single_del_contracts_;
 
   // Comes from comparator.
   const size_t timestamp_size_;
@@ -236,8 +373,33 @@ class CompactionIterator {
 
   // State
   //
+  enum ValidContext : uint8_t {
+    kMerge1 = 0,
+    kMerge2 = 1,
+    kParseKeyError = 2,
+    kCurrentKeyUncommitted = 3,
+    kKeepSDAndClearPut = 4,
+    kKeepTsHistory = 5,
+    kKeepSDForConflictCheck = 6,
+    kKeepSDForSnapshot = 7,
+    kKeepSD = 8,
+    kKeepDel = 9,
+    kNewUserKey = 10,
+  };
+
+  struct ValidityInfo {
+    inline bool IsValid() const { return rep & 1; }
+    ValidContext GetContext() const {
+      return static_cast<ValidContext>(rep >> 1);
+    }
+    inline void SetValid(uint8_t ctx) { rep = (ctx << 1) | 1; }
+    inline void Invalidate() { rep = 0; }
+
+    uint8_t rep{0};
+  } validity_info_;
+
   // Points to a copy of the current compaction iterator output (current_key_)
-  // if valid_.
+  // if valid.
   Slice key_;
   // Points to the value in the underlying iterator that corresponds to the
   // current output.
@@ -273,7 +435,14 @@ class CompactionIterator {
   // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
   // merge operands and then releasing them after consuming them.
   PinnedIteratorsManager pinned_iters_mgr_;
+
+  uint64_t blob_garbage_collection_cutoff_file_number_;
+
+  std::unique_ptr<BlobFetcher> blob_fetcher_;
+  std::unique_ptr<PrefetchBufferCollection> prefetch_buffers_;
+
   std::string blob_index_;
+  PinnableSlice blob_value_;
   std::string compaction_filter_value_;
   InternalKey compaction_filter_skip_until_;
   // "level_ptrs" holds indices that remember which file of an associated
@@ -292,6 +461,28 @@ class CompactionIterator {
   // Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
   int cmp_with_history_ts_low_;
 
+  const int level_;
+
+  // True if the previous internal key (same user key)'s sequence number has
+  // just been zeroed out during bottommost compaction.
+  bool last_key_seq_zeroed_{false};
+
+  // True if the current key should be output to the penultimate level if
+  // possible, compaction logic makes the final decision on which level to
+  // output to.
+  bool output_to_penultimate_level_{false};
+
+  // min seqno for preserving the time information.
+  const SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber;
+
+  // min seqno to preclude the data from the last level, if the key seqno larger
+  // than this, it will be output to penultimate level
+  const SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
+
+  void AdvanceInputIter() { input_.Next(); }
+
+  void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }
+
   bool IsShuttingDown() {
     // This is a best-effort facility, so memory_order_relaxed is sufficient.
     return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
@@ -299,8 +490,24 @@ class CompactionIterator {
 
   bool IsPausingManualCompaction() {
     // This is a best-effort facility, so memory_order_relaxed is sufficient.
-    return manual_compaction_paused_ &&
-           manual_compaction_paused_->load(std::memory_order_relaxed) > 0;
+    return manual_compaction_canceled_.load(std::memory_order_relaxed);
   }
 };
+
+inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,
+                                                     SequenceNumber snapshot) {
+  return ((seq) <= (snapshot) &&
+          (snapshot_checker_ == nullptr ||
+           LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) ==
+                  SnapshotCheckerResult::kInSnapshot)));
+}
+
+inline bool CompactionIterator::DefinitelyNotInSnapshot(
+    SequenceNumber seq, SequenceNumber snapshot) {
+  return ((seq) > (snapshot) ||
+          (snapshot_checker_ != nullptr &&
+           UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) ==
+                    SnapshotCheckerResult::kNotInSnapshot)));
+}
+
 }  // namespace ROCKSDB_NAMESPACE