]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/compaction/compaction_iterator.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_iterator.h
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5#pragma once
6
7#include <algorithm>
1e59de90 8#include <cinttypes>
7c673cae
FG
9#include <deque>
10#include <string>
494da23a 11#include <unordered_set>
7c673cae
FG
12#include <vector>
13
f67539c2
TL
14#include "db/compaction/compaction.h"
15#include "db/compaction/compaction_iteration_stats.h"
7c673cae
FG
16#include "db/merge_helper.h"
17#include "db/pinned_iterators_manager.h"
18#include "db/range_del_aggregator.h"
11fdf7f2
TL
19#include "db/snapshot_checker.h"
20#include "options/cf_options.h"
7c673cae
FG
21#include "rocksdb/compaction_filter.h"
22
f67539c2 23namespace ROCKSDB_NAMESPACE {
7c673cae 24
20effc67 25class BlobFileBuilder;
1e59de90
TL
26class BlobFetcher;
27class PrefetchBufferCollection;
28
29// A wrapper of internal iterator whose purpose is to count how
30// many entries there are in the iterator.
31class SequenceIterWrapper : public InternalIterator {
32 public:
33 SequenceIterWrapper(InternalIterator* iter, const Comparator* cmp,
34 bool need_count_entries)
35 : icmp_(cmp),
36 inner_iter_(iter),
37 need_count_entries_(need_count_entries) {}
38 bool Valid() const override { return inner_iter_->Valid(); }
39 Status status() const override { return inner_iter_->status(); }
40 void Next() override {
41 num_itered_++;
42 inner_iter_->Next();
43 }
44 void Seek(const Slice& target) override {
45 if (!need_count_entries_) {
46 inner_iter_->Seek(target);
47 } else {
48 // For flush cases, we need to count total number of entries, so we
49 // do Next() rather than Seek().
50 while (inner_iter_->Valid() &&
51 icmp_.Compare(inner_iter_->key(), target) < 0) {
52 Next();
53 }
54 }
55 }
56 Slice key() const override { return inner_iter_->key(); }
57 Slice value() const override { return inner_iter_->value(); }
58
59 // Unused InternalIterator methods
60 void SeekToFirst() override { assert(false); }
61 void Prev() override { assert(false); }
62 void SeekForPrev(const Slice& /* target */) override { assert(false); }
63 void SeekToLast() override { assert(false); }
64
65 uint64_t num_itered() const { return num_itered_; }
66
67 private:
68 InternalKeyComparator icmp_;
69 InternalIterator* inner_iter_; // not owned
70 uint64_t num_itered_ = 0;
71 bool need_count_entries_;
72};
20effc67 73
7c673cae
FG
74class CompactionIterator {
75 public:
76 // A wrapper around Compaction. Has a much smaller interface, only what
77 // CompactionIterator uses. Tests can override it.
78 class CompactionProxy {
79 public:
7c673cae 80 virtual ~CompactionProxy() = default;
20effc67
TL
81
82 virtual int level() const = 0;
83
7c673cae 84 virtual bool KeyNotExistsBeyondOutputLevel(
20effc67
TL
85 const Slice& user_key, std::vector<size_t>* level_ptrs) const = 0;
86
87 virtual bool bottommost_level() const = 0;
88
89 virtual int number_levels() const = 0;
90
1e59de90 91 // Result includes timestamp if user-defined timestamp is enabled.
20effc67
TL
92 virtual Slice GetLargestUserKey() const = 0;
93
94 virtual bool allow_ingest_behind() const = 0;
95
1e59de90
TL
96 virtual bool allow_mmap_reads() const = 0;
97
98 virtual bool enable_blob_garbage_collection() const = 0;
99
100 virtual double blob_garbage_collection_age_cutoff() const = 0;
101
102 virtual uint64_t blob_compaction_readahead_size() const = 0;
103
104 virtual const Version* input_version() const = 0;
105
106 virtual bool DoesInputReferenceBlobFiles() const = 0;
107
108 virtual const Compaction* real_compaction() const = 0;
109
110 virtual bool SupportsPerKeyPlacement() const = 0;
111
112 // `key` includes timestamp if user-defined timestamp is enabled.
113 virtual bool WithinPenultimateLevelOutputRange(const Slice& key) const = 0;
20effc67
TL
114 };
115
116 class RealCompaction : public CompactionProxy {
117 public:
118 explicit RealCompaction(const Compaction* compaction)
119 : compaction_(compaction) {
120 assert(compaction_);
1e59de90
TL
121 assert(compaction_->immutable_options());
122 assert(compaction_->mutable_cf_options());
20effc67
TL
123 }
124
125 int level() const override { return compaction_->level(); }
126
127 bool KeyNotExistsBeyondOutputLevel(
128 const Slice& user_key, std::vector<size_t>* level_ptrs) const override {
7c673cae
FG
129 return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
130 }
20effc67
TL
131
132 bool bottommost_level() const override {
7c673cae
FG
133 return compaction_->bottommost_level();
134 }
20effc67
TL
135
136 int number_levels() const override { return compaction_->number_levels(); }
137
1e59de90 138 // Result includes timestamp if user-defined timestamp is enabled.
20effc67 139 Slice GetLargestUserKey() const override {
7c673cae
FG
140 return compaction_->GetLargestUserKey();
141 }
20effc67
TL
142
143 bool allow_ingest_behind() const override {
1e59de90
TL
144 return compaction_->immutable_options()->allow_ingest_behind;
145 }
146
147 bool allow_mmap_reads() const override {
148 return compaction_->immutable_options()->allow_mmap_reads;
149 }
150
151 bool enable_blob_garbage_collection() const override {
152 return compaction_->enable_blob_garbage_collection();
11fdf7f2 153 }
20effc67 154
1e59de90
TL
155 double blob_garbage_collection_age_cutoff() const override {
156 return compaction_->blob_garbage_collection_age_cutoff();
157 }
158
159 uint64_t blob_compaction_readahead_size() const override {
160 return compaction_->mutable_cf_options()->blob_compaction_readahead_size;
161 }
162
163 const Version* input_version() const override {
164 return compaction_->input_version();
165 }
166
167 bool DoesInputReferenceBlobFiles() const override {
168 return compaction_->DoesInputReferenceBlobFiles();
169 }
170
171 const Compaction* real_compaction() const override { return compaction_; }
172
173 bool SupportsPerKeyPlacement() const override {
174 return compaction_->SupportsPerKeyPlacement();
175 }
176
177 // Check if key is within penultimate level output range, to see if it's
178 // safe to output to the penultimate level for per_key_placement feature.
179 // `key` includes timestamp if user-defined timestamp is enabled.
180 bool WithinPenultimateLevelOutputRange(const Slice& key) const override {
181 return compaction_->WithinPenultimateLevelOutputRange(key);
11fdf7f2 182 }
7c673cae 183
7c673cae
FG
184 private:
185 const Compaction* compaction_;
186 };
187
1e59de90
TL
188 CompactionIterator(
189 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
190 SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
191 SequenceNumber earliest_write_conflict_snapshot,
192 SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
193 Env* env, bool report_detailed_time, bool expect_valid_internal_key,
194 CompactionRangeDelAggregator* range_del_agg,
195 BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
196 bool enforce_single_del_contracts,
197 const std::atomic<bool>& manual_compaction_canceled,
198 const Compaction* compaction = nullptr,
199 const CompactionFilter* compaction_filter = nullptr,
200 const std::atomic<bool>* shutting_down = nullptr,
201 const std::shared_ptr<Logger> info_log = nullptr,
202 const std::string* full_history_ts_low = nullptr,
203 const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
204 const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
7c673cae
FG
205
206 // Constructor with custom CompactionProxy, used for tests.
1e59de90
TL
207 CompactionIterator(
208 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
209 SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
210 SequenceNumber earliest_write_conflict_snapshot,
211 SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
212 Env* env, bool report_detailed_time, bool expect_valid_internal_key,
213 CompactionRangeDelAggregator* range_del_agg,
214 BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
215 bool enforce_single_del_contracts,
216 const std::atomic<bool>& manual_compaction_canceled,
217 std::unique_ptr<CompactionProxy> compaction,
218 const CompactionFilter* compaction_filter = nullptr,
219 const std::atomic<bool>* shutting_down = nullptr,
220 const std::shared_ptr<Logger> info_log = nullptr,
221 const std::string* full_history_ts_low = nullptr,
222 const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
223 const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
7c673cae
FG
224
225 ~CompactionIterator();
226
227 void ResetRecordCounts();
228
229 // Seek to the beginning of the compaction iterator output.
230 //
231 // REQUIRED: Call only once.
232 void SeekToFirst();
233
234 // Produces the next record in the compaction.
235 //
236 // REQUIRED: SeekToFirst() has been called.
237 void Next();
238
239 // Getters
240 const Slice& key() const { return key_; }
241 const Slice& value() const { return value_; }
242 const Status& status() const { return status_; }
243 const ParsedInternalKey& ikey() const { return ikey_; }
1e59de90 244 inline bool Valid() const { return validity_info_.IsValid(); }
7c673cae
FG
245 const Slice& user_key() const { return current_user_key_; }
246 const CompactionIterationStats& iter_stats() const { return iter_stats_; }
1e59de90
TL
247 uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
248 // If the current key should be placed on penultimate level, only valid if
249 // per_key_placement is supported
250 bool output_to_penultimate_level() const {
251 return output_to_penultimate_level_;
252 }
253 Status InputStatus() const { return input_.status(); }
7c673cae
FG
254
255 private:
256 // Processes the input stream to find the next output
257 void NextFromInput();
258
1e59de90 259 // Do final preparations before presenting the output to the callee.
7c673cae
FG
260 void PrepareOutput();
261
1e59de90
TL
262 // Decide the current key should be output to the last level or penultimate
263 // level, only call for compaction supports per key placement
264 void DecideOutputLevel();
265
266 // Passes the output value to the blob file builder (if any), and replaces it
267 // with the corresponding blob reference if it has been actually written to a
268 // blob file (i.e. if it passed the value size check). Returns true if the
269 // value got extracted to a blob file, false otherwise.
270 bool ExtractLargeValueIfNeededImpl();
271
272 // Extracts large values as described above, and updates the internal key's
273 // type to kTypeBlobIndex if the value got extracted. Should only be called
274 // for regular values (kTypeValue).
275 void ExtractLargeValueIfNeeded();
276
277 // Relocates valid blobs residing in the oldest blob files if garbage
278 // collection is enabled. Relocated blobs are written to new blob files or
279 // inlined in the LSM tree depending on the current settings (i.e.
280 // enable_blob_files and min_blob_size). Should only be called for blob
281 // references (kTypeBlobIndex).
282 //
283 // Note: the stacked BlobDB implementation's compaction filter based GC
284 // algorithm is also called from here.
285 void GarbageCollectBlobIfNeeded();
286
11fdf7f2 287 // Invoke compaction filter if needed.
20effc67
TL
288 // Return true on success, false on failures (e.g.: kIOError).
289 bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
11fdf7f2 290
7c673cae
FG
291 // Given a sequence number, return the sequence number of the
292 // earliest snapshot that this sequence number is visible in.
293 // The snapshots themselves are arranged in ascending order of
294 // sequence numbers.
295 // Employ a sequential search because the total number of
296 // snapshots are typically small.
297 inline SequenceNumber findEarliestVisibleSnapshot(
298 SequenceNumber in, SequenceNumber* prev_snapshot);
299
494da23a
TL
300 inline bool KeyCommitted(SequenceNumber sequence) {
301 return snapshot_checker_ == nullptr ||
1e59de90 302 snapshot_checker_->CheckInSnapshot(sequence, job_snapshot_) ==
494da23a
TL
303 SnapshotCheckerResult::kInSnapshot;
304 }
305
1e59de90
TL
306 bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
307
308 bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
494da23a 309
20effc67
TL
310 // Extract user-defined timestamp from user key if possible and compare it
311 // with *full_history_ts_low_ if applicable.
312 inline void UpdateTimestampAndCompareWithFullHistoryLow() {
313 if (!timestamp_size_) {
314 return;
315 }
316 Slice ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
317 curr_ts_.assign(ts.data(), ts.size());
318 if (full_history_ts_low_) {
319 cmp_with_history_ts_low_ =
320 cmp_->CompareTimestamp(ts, *full_history_ts_low_);
321 }
322 }
323
1e59de90
TL
324 static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber(
325 const CompactionProxy* compaction);
326 static std::unique_ptr<BlobFetcher> CreateBlobFetcherIfNeeded(
327 const CompactionProxy* compaction);
328 static std::unique_ptr<PrefetchBufferCollection>
329 CreatePrefetchBufferCollectionIfNeeded(const CompactionProxy* compaction);
330
331 SequenceIterWrapper input_;
7c673cae
FG
332 const Comparator* cmp_;
333 MergeHelper* merge_helper_;
334 const std::vector<SequenceNumber>* snapshots_;
494da23a
TL
335 // List of snapshots released during compaction.
336 // findEarliestVisibleSnapshot() find them out from return of
337 // snapshot_checker, and make sure they will not be returned as
338 // earliest visible snapshot of an older value.
339 // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
340 std::unordered_set<SequenceNumber> released_snapshots_;
7c673cae 341 const SequenceNumber earliest_write_conflict_snapshot_;
1e59de90 342 const SequenceNumber job_snapshot_;
11fdf7f2 343 const SnapshotChecker* const snapshot_checker_;
7c673cae 344 Env* env_;
1e59de90
TL
345 SystemClock* clock_;
346 const bool report_detailed_time_;
347 const bool expect_valid_internal_key_;
494da23a 348 CompactionRangeDelAggregator* range_del_agg_;
20effc67 349 BlobFileBuilder* blob_file_builder_;
7c673cae
FG
350 std::unique_ptr<CompactionProxy> compaction_;
351 const CompactionFilter* compaction_filter_;
7c673cae 352 const std::atomic<bool>* shutting_down_;
1e59de90
TL
353 const std::atomic<bool>& manual_compaction_canceled_;
354 const bool bottommost_level_;
355 const bool visible_at_tip_;
356 const SequenceNumber earliest_snapshot_;
7c673cae 357
20effc67
TL
358 std::shared_ptr<Logger> info_log_;
359
1e59de90
TL
360 const bool allow_data_in_errors_;
361
362 const bool enforce_single_del_contracts_;
20effc67
TL
363
364 // Comes from comparator.
365 const size_t timestamp_size_;
366
367 // Lower bound timestamp to retain full history in terms of user-defined
368 // timestamp. If a key's timestamp is older than full_history_ts_low_, then
369 // the key *may* be eligible for garbage collection (GC). The skipping logic
370 // is in `NextFromInput()` and `PrepareOutput()`.
371 // If nullptr, NO GC will be performed and all history will be preserved.
372 const std::string* const full_history_ts_low_;
373
7c673cae
FG
374 // State
375 //
1e59de90
TL
376 enum ValidContext : uint8_t {
377 kMerge1 = 0,
378 kMerge2 = 1,
379 kParseKeyError = 2,
380 kCurrentKeyUncommitted = 3,
381 kKeepSDAndClearPut = 4,
382 kKeepTsHistory = 5,
383 kKeepSDForConflictCheck = 6,
384 kKeepSDForSnapshot = 7,
385 kKeepSD = 8,
386 kKeepDel = 9,
387 kNewUserKey = 10,
388 };
389
390 struct ValidityInfo {
391 inline bool IsValid() const { return rep & 1; }
392 ValidContext GetContext() const {
393 return static_cast<ValidContext>(rep >> 1);
394 }
395 inline void SetValid(uint8_t ctx) { rep = (ctx << 1) | 1; }
396 inline void Invalidate() { rep = 0; }
397
398 uint8_t rep{0};
399 } validity_info_;
400
7c673cae 401 // Points to a copy of the current compaction iterator output (current_key_)
1e59de90 402 // if valid.
7c673cae
FG
403 Slice key_;
404 // Points to the value in the underlying iterator that corresponds to the
405 // current output.
406 Slice value_;
407 // The status is OK unless compaction iterator encounters a merge operand
408 // while not having a merge operator defined.
409 Status status_;
410 // Stores the user key, sequence number and type of the current compaction
411 // iterator output (or current key in the underlying iterator during
412 // NextFromInput()).
413 ParsedInternalKey ikey_;
414 // Stores whether ikey_.user_key is valid. If set to false, the user key is
415 // not compared against the current key in the underlying iterator.
416 bool has_current_user_key_ = false;
20effc67
TL
417 // If false, the iterator holds a copy of the current compaction iterator
418 // output (or current key in the underlying iterator during NextFromInput()).
419 bool at_next_ = false;
420
7c673cae
FG
421 IterKey current_key_;
422 Slice current_user_key_;
20effc67 423 std::string curr_ts_;
7c673cae
FG
424 SequenceNumber current_user_key_sequence_;
425 SequenceNumber current_user_key_snapshot_;
426
427 // True if the iterator has already returned a record for the current key.
428 bool has_outputted_key_ = false;
429
430 // truncated the value of the next key and output it without applying any
431 // compaction rules. This is used for outputting a put after a single delete.
432 bool clear_and_output_next_key_ = false;
433
434 MergeOutputIterator merge_out_iter_;
435 // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
436 // merge operands and then releasing them after consuming them.
437 PinnedIteratorsManager pinned_iters_mgr_;
1e59de90
TL
438
439 uint64_t blob_garbage_collection_cutoff_file_number_;
440
441 std::unique_ptr<BlobFetcher> blob_fetcher_;
442 std::unique_ptr<PrefetchBufferCollection> prefetch_buffers_;
443
20effc67 444 std::string blob_index_;
1e59de90 445 PinnableSlice blob_value_;
7c673cae
FG
446 std::string compaction_filter_value_;
447 InternalKey compaction_filter_skip_until_;
448 // "level_ptrs" holds indices that remember which file of an associated
449 // level we were last checking during the last call to compaction->
450 // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
451 // to pick off where it left off since each subcompaction's key range is
452 // increasing so a later call to the function must be looking for a key that
453 // is in or beyond the last file checked during the previous call
454 std::vector<size_t> level_ptrs_;
455 CompactionIterationStats iter_stats_;
456
11fdf7f2
TL
457 // Used to avoid purging uncommitted values. The application can specify
458 // uncommitted values by providing a SnapshotChecker object.
459 bool current_key_committed_;
20effc67
TL
460
461 // Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
462 int cmp_with_history_ts_low_;
11fdf7f2 463
1e59de90
TL
464 const int level_;
465
466 // True if the previous internal key (same user key)'s sequence number has
467 // just been zeroed out during bottommost compaction.
468 bool last_key_seq_zeroed_{false};
469
470 // True if the current key should be output to the penultimate level if
471 // possible, compaction logic makes the final decision on which level to
472 // output to.
473 bool output_to_penultimate_level_{false};
474
475 // min seqno for preserving the time information.
476 const SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber;
477
478 // min seqno to preclude the data from the last level, if the key seqno larger
479 // than this, it will be output to penultimate level
480 const SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
481
482 void AdvanceInputIter() { input_.Next(); }
483
484 void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }
485
7c673cae
FG
486 bool IsShuttingDown() {
487 // This is a best-effort facility, so memory_order_relaxed is sufficient.
488 return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
489 }
f67539c2
TL
490
491 bool IsPausingManualCompaction() {
492 // This is a best-effort facility, so memory_order_relaxed is sufficient.
1e59de90 493 return manual_compaction_canceled_.load(std::memory_order_relaxed);
f67539c2 494 }
7c673cae 495};
1e59de90
TL
496
497inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,
498 SequenceNumber snapshot) {
499 return ((seq) <= (snapshot) &&
500 (snapshot_checker_ == nullptr ||
501 LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) ==
502 SnapshotCheckerResult::kInSnapshot)));
503}
504
505inline bool CompactionIterator::DefinitelyNotInSnapshot(
506 SequenceNumber seq, SequenceNumber snapshot) {
507 return ((seq) > (snapshot) ||
508 (snapshot_checker_ != nullptr &&
509 UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) ==
510 SnapshotCheckerResult::kNotInSnapshot)));
511}
512
f67539c2 513} // namespace ROCKSDB_NAMESPACE