1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
10 #include <unordered_set>
13 #include "db/compaction.h"
14 #include "db/compaction_iteration_stats.h"
15 #include "db/merge_helper.h"
16 #include "db/pinned_iterators_manager.h"
17 #include "db/range_del_aggregator.h"
18 #include "db/snapshot_checker.h"
19 #include "options/cf_options.h"
20 #include "rocksdb/compaction_filter.h"
24 class CompactionIterator
{
26 // A wrapper around Compaction. Has a much smaller interface, only what
27 // CompactionIterator uses. Tests can override it.
28 class CompactionProxy
{
30 explicit CompactionProxy(const Compaction
* compaction
)
31 : compaction_(compaction
) {}
33 virtual ~CompactionProxy() = default;
34 virtual int level(size_t /*compaction_input_level*/ = 0) const {
35 return compaction_
->level();
37 virtual bool KeyNotExistsBeyondOutputLevel(
38 const Slice
& user_key
, std::vector
<size_t>* level_ptrs
) const {
39 return compaction_
->KeyNotExistsBeyondOutputLevel(user_key
, level_ptrs
);
41 virtual bool bottommost_level() const {
42 return compaction_
->bottommost_level();
44 virtual int number_levels() const { return compaction_
->number_levels(); }
45 virtual Slice
GetLargestUserKey() const {
46 return compaction_
->GetLargestUserKey();
48 virtual bool allow_ingest_behind() const {
49 return compaction_
->immutable_cf_options()->allow_ingest_behind
;
51 virtual bool preserve_deletes() const {
52 return compaction_
->immutable_cf_options()->preserve_deletes
;
56 CompactionProxy() = default;
59 const Compaction
* compaction_
;
62 CompactionIterator(InternalIterator
* input
, const Comparator
* cmp
,
63 MergeHelper
* merge_helper
, SequenceNumber last_sequence
,
64 std::vector
<SequenceNumber
>* snapshots
,
65 SequenceNumber earliest_write_conflict_snapshot
,
66 const SnapshotChecker
* snapshot_checker
, Env
* env
,
67 bool report_detailed_time
, bool expect_valid_internal_key
,
68 CompactionRangeDelAggregator
* range_del_agg
,
69 const Compaction
* compaction
= nullptr,
70 const CompactionFilter
* compaction_filter
= nullptr,
71 const std::atomic
<bool>* shutting_down
= nullptr,
72 const SequenceNumber preserve_deletes_seqnum
= 0);
74 // Constructor with custom CompactionProxy, used for tests.
75 CompactionIterator(InternalIterator
* input
, const Comparator
* cmp
,
76 MergeHelper
* merge_helper
, SequenceNumber last_sequence
,
77 std::vector
<SequenceNumber
>* snapshots
,
78 SequenceNumber earliest_write_conflict_snapshot
,
79 const SnapshotChecker
* snapshot_checker
, Env
* env
,
80 bool report_detailed_time
, bool expect_valid_internal_key
,
81 CompactionRangeDelAggregator
* range_del_agg
,
82 std::unique_ptr
<CompactionProxy
> compaction
,
83 const CompactionFilter
* compaction_filter
= nullptr,
84 const std::atomic
<bool>* shutting_down
= nullptr,
85 const SequenceNumber preserve_deletes_seqnum
= 0);
87 ~CompactionIterator();
89 void ResetRecordCounts();
91 // Seek to the beginning of the compaction iterator output.
93 // REQUIRED: Call only once.
96 // Produces the next record in the compaction.
98 // REQUIRED: SeekToFirst() has been called.
102 const Slice
& key() const { return key_
; }
103 const Slice
& value() const { return value_
; }
104 const Status
& status() const { return status_
; }
105 const ParsedInternalKey
& ikey() const { return ikey_
; }
106 bool Valid() const { return valid_
; }
107 const Slice
& user_key() const { return current_user_key_
; }
108 const CompactionIterationStats
& iter_stats() const { return iter_stats_
; }
111 // Processes the input stream to find the next output
112 void NextFromInput();
114 // Do last preparations before presenting the output to the callee. At this
115 // point this only zeroes out the sequence number if possible for better
117 void PrepareOutput();
119 // Invoke compaction filter if needed.
120 void InvokeFilterIfNeeded(bool* need_skip
, Slice
* skip_until
);
122 // Given a sequence number, return the sequence number of the
123 // earliest snapshot that this sequence number is visible in.
124 // The snapshots themselves are arranged in ascending order of
126 // Employ a sequential search because the total number of
127 // snapshots are typically small.
128 inline SequenceNumber
findEarliestVisibleSnapshot(
129 SequenceNumber in
, SequenceNumber
* prev_snapshot
);
131 // Checks whether the currently seen ikey_ is needed for
132 // incremental (differential) snapshot and hence can't be dropped
133 // or seqnum be zero-ed out even if all other conditions for it are met.
134 inline bool ikeyNotNeededForIncrementalSnapshot();
136 inline bool KeyCommitted(SequenceNumber sequence
) {
137 return snapshot_checker_
== nullptr ||
138 snapshot_checker_
->CheckInSnapshot(sequence
, kMaxSequenceNumber
) ==
139 SnapshotCheckerResult::kInSnapshot
;
142 bool IsInEarliestSnapshot(SequenceNumber sequence
);
144 InternalIterator
* input_
;
145 const Comparator
* cmp_
;
146 MergeHelper
* merge_helper_
;
147 const std::vector
<SequenceNumber
>* snapshots_
;
148 // List of snapshots released during compaction.
149 // findEarliestVisibleSnapshot() find them out from return of
150 // snapshot_checker, and make sure they will not be returned as
151 // earliest visible snapshot of an older value.
152 // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
153 std::unordered_set
<SequenceNumber
> released_snapshots_
;
154 std::vector
<SequenceNumber
>::const_iterator earliest_snapshot_iter_
;
155 const SequenceNumber earliest_write_conflict_snapshot_
;
156 const SnapshotChecker
* const snapshot_checker_
;
158 bool report_detailed_time_
;
159 bool expect_valid_internal_key_
;
160 CompactionRangeDelAggregator
* range_del_agg_
;
161 std::unique_ptr
<CompactionProxy
> compaction_
;
162 const CompactionFilter
* compaction_filter_
;
163 const std::atomic
<bool>* shutting_down_
;
164 const SequenceNumber preserve_deletes_seqnum_
;
165 bool bottommost_level_
;
167 bool visible_at_tip_
;
168 SequenceNumber earliest_snapshot_
;
169 SequenceNumber latest_snapshot_
;
173 // Points to a copy of the current compaction iterator output (current_key_)
176 // Points to the value in the underlying iterator that corresponds to the
179 // The status is OK unless compaction iterator encounters a merge operand
180 // while not having a merge operator defined.
182 // Stores the user key, sequence number and type of the current compaction
183 // iterator output (or current key in the underlying iterator during
185 ParsedInternalKey ikey_
;
186 // Stores whether ikey_.user_key is valid. If set to false, the user key is
187 // not compared against the current key in the underlying iterator.
188 bool has_current_user_key_
= false;
189 bool at_next_
= false; // If false, the iterator
190 // Holds a copy of the current compaction iterator output (or current key in
191 // the underlying iterator during NextFromInput()).
192 IterKey current_key_
;
193 Slice current_user_key_
;
194 SequenceNumber current_user_key_sequence_
;
195 SequenceNumber current_user_key_snapshot_
;
197 // True if the iterator has already returned a record for the current key.
198 bool has_outputted_key_
= false;
200 // truncated the value of the next key and output it without applying any
201 // compaction rules. This is used for outputting a put after a single delete.
202 bool clear_and_output_next_key_
= false;
204 MergeOutputIterator merge_out_iter_
;
205 // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
206 // merge operands and then releasing them after consuming them.
207 PinnedIteratorsManager pinned_iters_mgr_
;
208 std::string compaction_filter_value_
;
209 InternalKey compaction_filter_skip_until_
;
210 // "level_ptrs" holds indices that remember which file of an associated
211 // level we were last checking during the last call to compaction->
212 // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
213 // to pick off where it left off since each subcompaction's key range is
214 // increasing so a later call to the function must be looking for a key that
215 // is in or beyond the last file checked during the previous call
216 std::vector
<size_t> level_ptrs_
;
217 CompactionIterationStats iter_stats_
;
219 // Used to avoid purging uncommitted values. The application can specify
220 // uncommitted values by providing a SnapshotChecker object.
221 bool current_key_committed_
;
223 bool IsShuttingDown() {
224 // This is a best-effort facility, so memory_order_relaxed is sufficient.
225 return shutting_down_
&& shutting_down_
->load(std::memory_order_relaxed
);
228 } // namespace rocksdb