#pragma once
#include <algorithm>
+#include <cinttypes>
#include <deque>
#include <string>
#include <unordered_set>
namespace ROCKSDB_NAMESPACE {
class BlobFileBuilder;
+class BlobFetcher;
+class PrefetchBufferCollection;
+
+// A wrapper of internal iterator whose purpose is to count how
+// many entries there are in the iterator.
+class SequenceIterWrapper : public InternalIterator {
+ public:
+ SequenceIterWrapper(InternalIterator* iter, const Comparator* cmp,
+ bool need_count_entries)
+ : icmp_(cmp),
+ inner_iter_(iter),
+ need_count_entries_(need_count_entries) {}
+ bool Valid() const override { return inner_iter_->Valid(); }
+ Status status() const override { return inner_iter_->status(); }
+ void Next() override {
+ num_itered_++;
+ inner_iter_->Next();
+ }
+ void Seek(const Slice& target) override {
+ if (!need_count_entries_) {
+ inner_iter_->Seek(target);
+ } else {
+ // For flush cases, we need to count total number of entries, so we
+ // do Next() rather than Seek().
+ while (inner_iter_->Valid() &&
+ icmp_.Compare(inner_iter_->key(), target) < 0) {
+ Next();
+ }
+ }
+ }
+ Slice key() const override { return inner_iter_->key(); }
+ Slice value() const override { return inner_iter_->value(); }
+
+ // Unused InternalIterator methods
+ void SeekToFirst() override { assert(false); }
+ void Prev() override { assert(false); }
+ void SeekForPrev(const Slice& /* target */) override { assert(false); }
+ void SeekToLast() override { assert(false); }
+
+ uint64_t num_itered() const { return num_itered_; }
+
+ private:
+ InternalKeyComparator icmp_;
+ InternalIterator* inner_iter_; // not owned
+ uint64_t num_itered_ = 0;
+ bool need_count_entries_;
+};
class CompactionIterator {
public:
virtual int number_levels() const = 0;
+ // Result includes timestamp if user-defined timestamp is enabled.
virtual Slice GetLargestUserKey() const = 0;
virtual bool allow_ingest_behind() const = 0;
- virtual bool preserve_deletes() const = 0;
+ virtual bool allow_mmap_reads() const = 0;
+
+ virtual bool enable_blob_garbage_collection() const = 0;
+
+ virtual double blob_garbage_collection_age_cutoff() const = 0;
+
+ virtual uint64_t blob_compaction_readahead_size() const = 0;
+
+ virtual const Version* input_version() const = 0;
+
+ virtual bool DoesInputReferenceBlobFiles() const = 0;
+
+ virtual const Compaction* real_compaction() const = 0;
+
+ virtual bool SupportsPerKeyPlacement() const = 0;
+
+ // `key` includes timestamp if user-defined timestamp is enabled.
+ virtual bool WithinPenultimateLevelOutputRange(const Slice& key) const = 0;
};
class RealCompaction : public CompactionProxy {
explicit RealCompaction(const Compaction* compaction)
: compaction_(compaction) {
assert(compaction_);
- assert(compaction_->immutable_cf_options());
+ assert(compaction_->immutable_options());
+ assert(compaction_->mutable_cf_options());
}
int level() const override { return compaction_->level(); }
int number_levels() const override { return compaction_->number_levels(); }
+ // Result includes timestamp if user-defined timestamp is enabled.
Slice GetLargestUserKey() const override {
return compaction_->GetLargestUserKey();
}
bool allow_ingest_behind() const override {
- return compaction_->immutable_cf_options()->allow_ingest_behind;
+ return compaction_->immutable_options()->allow_ingest_behind;
+ }
+
+ bool allow_mmap_reads() const override {
+ return compaction_->immutable_options()->allow_mmap_reads;
+ }
+
+ bool enable_blob_garbage_collection() const override {
+ return compaction_->enable_blob_garbage_collection();
}
- bool preserve_deletes() const override {
- return compaction_->immutable_cf_options()->preserve_deletes;
+ double blob_garbage_collection_age_cutoff() const override {
+ return compaction_->blob_garbage_collection_age_cutoff();
+ }
+
+ uint64_t blob_compaction_readahead_size() const override {
+ return compaction_->mutable_cf_options()->blob_compaction_readahead_size;
+ }
+
+ const Version* input_version() const override {
+ return compaction_->input_version();
+ }
+
+ bool DoesInputReferenceBlobFiles() const override {
+ return compaction_->DoesInputReferenceBlobFiles();
+ }
+
+ const Compaction* real_compaction() const override { return compaction_; }
+
+ bool SupportsPerKeyPlacement() const override {
+ return compaction_->SupportsPerKeyPlacement();
+ }
+
+ // Check if key is within penultimate level output range, to see if it's
+ // safe to output to the penultimate level for per_key_placement feature.
+ // `key` includes timestamp if user-defined timestamp is enabled.
+ bool WithinPenultimateLevelOutputRange(const Slice& key) const override {
+ return compaction_->WithinPenultimateLevelOutputRange(key);
}
private:
const Compaction* compaction_;
};
- CompactionIterator(InternalIterator* input, const Comparator* cmp,
- MergeHelper* merge_helper, SequenceNumber last_sequence,
- std::vector<SequenceNumber>* snapshots,
- SequenceNumber earliest_write_conflict_snapshot,
- const SnapshotChecker* snapshot_checker, Env* env,
- bool report_detailed_time, bool expect_valid_internal_key,
- CompactionRangeDelAggregator* range_del_agg,
- BlobFileBuilder* blob_file_builder,
- bool allow_data_in_errors,
- const Compaction* compaction = nullptr,
- const CompactionFilter* compaction_filter = nullptr,
- const std::atomic<bool>* shutting_down = nullptr,
- const SequenceNumber preserve_deletes_seqnum = 0,
- const std::atomic<int>* manual_compaction_paused = nullptr,
- const std::shared_ptr<Logger> info_log = nullptr,
- const std::string* full_history_ts_low = nullptr);
+ CompactionIterator(
+ InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
+ SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
+ SequenceNumber earliest_write_conflict_snapshot,
+ SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
+ Env* env, bool report_detailed_time, bool expect_valid_internal_key,
+ CompactionRangeDelAggregator* range_del_agg,
+ BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
+ bool enforce_single_del_contracts,
+ const std::atomic<bool>& manual_compaction_canceled,
+ const Compaction* compaction = nullptr,
+ const CompactionFilter* compaction_filter = nullptr,
+ const std::atomic<bool>* shutting_down = nullptr,
+ const std::shared_ptr<Logger> info_log = nullptr,
+ const std::string* full_history_ts_low = nullptr,
+ const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
+ const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
// Constructor with custom CompactionProxy, used for tests.
- CompactionIterator(InternalIterator* input, const Comparator* cmp,
- MergeHelper* merge_helper, SequenceNumber last_sequence,
- std::vector<SequenceNumber>* snapshots,
- SequenceNumber earliest_write_conflict_snapshot,
- const SnapshotChecker* snapshot_checker, Env* env,
- bool report_detailed_time, bool expect_valid_internal_key,
- CompactionRangeDelAggregator* range_del_agg,
- BlobFileBuilder* blob_file_builder,
- bool allow_data_in_errors,
- std::unique_ptr<CompactionProxy> compaction,
- const CompactionFilter* compaction_filter = nullptr,
- const std::atomic<bool>* shutting_down = nullptr,
- const SequenceNumber preserve_deletes_seqnum = 0,
- const std::atomic<int>* manual_compaction_paused = nullptr,
- const std::shared_ptr<Logger> info_log = nullptr,
- const std::string* full_history_ts_low = nullptr);
+ CompactionIterator(
+ InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
+ SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
+ SequenceNumber earliest_write_conflict_snapshot,
+ SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
+ Env* env, bool report_detailed_time, bool expect_valid_internal_key,
+ CompactionRangeDelAggregator* range_del_agg,
+ BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
+ bool enforce_single_del_contracts,
+ const std::atomic<bool>& manual_compaction_canceled,
+ std::unique_ptr<CompactionProxy> compaction,
+ const CompactionFilter* compaction_filter = nullptr,
+ const std::atomic<bool>* shutting_down = nullptr,
+ const std::shared_ptr<Logger> info_log = nullptr,
+ const std::string* full_history_ts_low = nullptr,
+ const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
+ const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
~CompactionIterator();
const Slice& value() const { return value_; }
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
- bool Valid() const { return valid_; }
+ inline bool Valid() const { return validity_info_.IsValid(); }
const Slice& user_key() const { return current_user_key_; }
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
+ uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
+ // If the current key should be placed on penultimate level, only valid if
+ // per_key_placement is supported
+ bool output_to_penultimate_level() const {
+ return output_to_penultimate_level_;
+ }
+ Status InputStatus() const { return input_.status(); }
private:
// Processes the input stream to find the next output
void NextFromInput();
- // Do last preparations before presenting the output to the callee. At this
- // point this only zeroes out the sequence number if possible for better
- // compression.
+ // Do final preparations before presenting the output to the callee.
void PrepareOutput();
+ // Decide the current key should be output to the last level or penultimate
+ // level, only call for compaction supports per key placement
+ void DecideOutputLevel();
+
+ // Passes the output value to the blob file builder (if any), and replaces it
+ // with the corresponding blob reference if it has been actually written to a
+ // blob file (i.e. if it passed the value size check). Returns true if the
+ // value got extracted to a blob file, false otherwise.
+ bool ExtractLargeValueIfNeededImpl();
+
+ // Extracts large values as described above, and updates the internal key's
+ // type to kTypeBlobIndex if the value got extracted. Should only be called
+ // for regular values (kTypeValue).
+ void ExtractLargeValueIfNeeded();
+
+ // Relocates valid blobs residing in the oldest blob files if garbage
+ // collection is enabled. Relocated blobs are written to new blob files or
+ // inlined in the LSM tree depending on the current settings (i.e.
+ // enable_blob_files and min_blob_size). Should only be called for blob
+ // references (kTypeBlobIndex).
+ //
+ // Note: the stacked BlobDB implementation's compaction filter based GC
+ // algorithm is also called from here.
+ void GarbageCollectBlobIfNeeded();
+
// Invoke compaction filter if needed.
// Return true on success, false on failures (e.g.: kIOError).
bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
inline SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot);
- // Checks whether the currently seen ikey_ is needed for
- // incremental (differential) snapshot and hence can't be dropped
- // or seqnum be zero-ed out even if all other conditions for it are met.
- inline bool ikeyNotNeededForIncrementalSnapshot();
-
inline bool KeyCommitted(SequenceNumber sequence) {
return snapshot_checker_ == nullptr ||
- snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) ==
+ snapshot_checker_->CheckInSnapshot(sequence, job_snapshot_) ==
SnapshotCheckerResult::kInSnapshot;
}
- bool IsInEarliestSnapshot(SequenceNumber sequence);
+ bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
+
+ bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
// Extract user-defined timestamp from user key if possible and compare it
// with *full_history_ts_low_ if applicable.
}
}
- InternalIterator* input_;
+ static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber(
+ const CompactionProxy* compaction);
+ static std::unique_ptr<BlobFetcher> CreateBlobFetcherIfNeeded(
+ const CompactionProxy* compaction);
+ static std::unique_ptr<PrefetchBufferCollection>
+ CreatePrefetchBufferCollectionIfNeeded(const CompactionProxy* compaction);
+
+ SequenceIterWrapper input_;
const Comparator* cmp_;
MergeHelper* merge_helper_;
const std::vector<SequenceNumber>* snapshots_;
// earliest visible snapshot of an older value.
// See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
std::unordered_set<SequenceNumber> released_snapshots_;
- std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
const SequenceNumber earliest_write_conflict_snapshot_;
+ const SequenceNumber job_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
- bool report_detailed_time_;
- bool expect_valid_internal_key_;
+ SystemClock* clock_;
+ const bool report_detailed_time_;
+ const bool expect_valid_internal_key_;
CompactionRangeDelAggregator* range_del_agg_;
BlobFileBuilder* blob_file_builder_;
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
- const std::atomic<int>* manual_compaction_paused_;
- const SequenceNumber preserve_deletes_seqnum_;
- bool bottommost_level_;
- bool valid_ = false;
- bool visible_at_tip_;
- SequenceNumber earliest_snapshot_;
- SequenceNumber latest_snapshot_;
+ const std::atomic<bool>& manual_compaction_canceled_;
+ const bool bottommost_level_;
+ const bool visible_at_tip_;
+ const SequenceNumber earliest_snapshot_;
std::shared_ptr<Logger> info_log_;
- bool allow_data_in_errors_;
+ const bool allow_data_in_errors_;
+
+ const bool enforce_single_del_contracts_;
// Comes from comparator.
const size_t timestamp_size_;
// State
//
+ enum ValidContext : uint8_t {
+ kMerge1 = 0,
+ kMerge2 = 1,
+ kParseKeyError = 2,
+ kCurrentKeyUncommitted = 3,
+ kKeepSDAndClearPut = 4,
+ kKeepTsHistory = 5,
+ kKeepSDForConflictCheck = 6,
+ kKeepSDForSnapshot = 7,
+ kKeepSD = 8,
+ kKeepDel = 9,
+ kNewUserKey = 10,
+ };
+
+ struct ValidityInfo {
+ inline bool IsValid() const { return rep & 1; }
+ ValidContext GetContext() const {
+ return static_cast<ValidContext>(rep >> 1);
+ }
+ inline void SetValid(uint8_t ctx) { rep = (ctx << 1) | 1; }
+ inline void Invalidate() { rep = 0; }
+
+ uint8_t rep{0};
+ } validity_info_;
+
// Points to a copy of the current compaction iterator output (current_key_)
- // if valid_.
+ // if valid.
Slice key_;
// Points to the value in the underlying iterator that corresponds to the
// current output.
// PinnedIteratorsManager used to pin input_ Iterator blocks while reading
// merge operands and then releasing them after consuming them.
PinnedIteratorsManager pinned_iters_mgr_;
+
+ uint64_t blob_garbage_collection_cutoff_file_number_;
+
+ std::unique_ptr<BlobFetcher> blob_fetcher_;
+ std::unique_ptr<PrefetchBufferCollection> prefetch_buffers_;
+
std::string blob_index_;
+ PinnableSlice blob_value_;
std::string compaction_filter_value_;
InternalKey compaction_filter_skip_until_;
// "level_ptrs" holds indices that remember which file of an associated
// Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
int cmp_with_history_ts_low_;
+ const int level_;
+
+ // True if the previous internal key (same user key)'s sequence number has
+ // just been zeroed out during bottommost compaction.
+ bool last_key_seq_zeroed_{false};
+
+ // True if the current key should be output to the penultimate level if
+ // possible, compaction logic makes the final decision on which level to
+ // output to.
+ bool output_to_penultimate_level_{false};
+
+ // min seqno for preserving the time information.
+ const SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber;
+
+ // min seqno to preclude the data from the last level, if the key seqno larger
+ // than this, it will be output to penultimate level
+ const SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
+
+ void AdvanceInputIter() { input_.Next(); }
+
+ void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }
+
bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
bool IsPausingManualCompaction() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
- return manual_compaction_paused_ &&
- manual_compaction_paused_->load(std::memory_order_relaxed) > 0;
+ return manual_compaction_canceled_.load(std::memory_order_relaxed);
}
};
+
+inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,
+ SequenceNumber snapshot) {
+ return ((seq) <= (snapshot) &&
+ (snapshot_checker_ == nullptr ||
+ LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) ==
+ SnapshotCheckerResult::kInSnapshot)));
+}
+
+inline bool CompactionIterator::DefinitelyNotInSnapshot(
+ SequenceNumber seq, SequenceNumber snapshot) {
+ return ((seq) > (snapshot) ||
+ (snapshot_checker_ != nullptr &&
+ UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) ==
+ SnapshotCheckerResult::kNotInSnapshot)));
+}
+
} // namespace ROCKSDB_NAMESPACE