1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
12 #include "db/merge_context.h"
13 #include "db/range_del_aggregator.h"
14 #include "db/snapshot_checker.h"
15 #include "rocksdb/compaction_filter.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/merge_operator.h"
18 #include "rocksdb/slice.h"
19 #include "rocksdb/wide_columns.h"
20 #include "util/stop_watch.h"
22 namespace ROCKSDB_NAMESPACE
{
31 class PrefetchBufferCollection
;
32 struct CompactionIterationStats
;
36 MergeHelper(Env
* env
, const Comparator
* user_comparator
,
37 const MergeOperator
* user_merge_operator
,
38 const CompactionFilter
* compaction_filter
, Logger
* logger
,
39 bool assert_valid_internal_key
, SequenceNumber latest_snapshot
,
40 const SnapshotChecker
* snapshot_checker
= nullptr, int level
= 0,
41 Statistics
* stats
= nullptr,
42 const std::atomic
<bool>* shutting_down
= nullptr);
44 // Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
45 // Result of merge will be written to result if status returned is OK.
46 // If operands is empty, the value will simply be copied to result.
47 // Set `update_num_ops_stats` to true if it is from a user read, so that
48 // the latency is sensitive.
49 // Returns one of the following statuses:
50 // - OK: Entries were successfully merged.
51 // - Corruption: Merge operator reported unsuccessful merge.
52 static Status
TimedFullMerge(const MergeOperator
* merge_operator
,
53 const Slice
& key
, const Slice
* value
,
54 const std::vector
<Slice
>& operands
,
55 std::string
* result
, Logger
* logger
,
56 Statistics
* statistics
, SystemClock
* clock
,
57 Slice
* result_operand
,
58 bool update_num_ops_stats
);
60 static Status
TimedFullMergeWithEntity(
61 const MergeOperator
* merge_operator
, const Slice
& key
, Slice base_entity
,
62 const std::vector
<Slice
>& operands
, std::string
* result
, Logger
* logger
,
63 Statistics
* statistics
, SystemClock
* clock
, bool update_num_ops_stats
);
65 // During compaction, merge entries until we hit
68 // - a different user key,
69 // - a specific sequence number (snapshot boundary),
70 // - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
71 // or - the end of iteration
72 // iter: (IN) points to the first merge type entry
73 // (OUT) points to the first entry not included in the merge process
74 // range_del_agg: (IN) filters merge operands covered by range tombstones.
75 // stop_before: (IN) a sequence number that merge should not cross.
76 // 0 means no restriction
77 // at_bottom: (IN) true if the iterator covers the bottem level, which means
78 // we could reach the start of the history of this user key.
79 // allow_data_in_errors: (IN) if true, data details will be displayed in
80 // error/log messages.
81 // blob_fetcher: (IN) blob fetcher object for the compaction's input version.
82 // prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers
83 // used for compaction readahead.
84 // c_iter_stats: (OUT) compaction iteration statistics.
86 // Returns one of the following statuses:
87 // - OK: Entries were successfully merged.
88 // - MergeInProgress: Put/Delete not encountered, and didn't reach the start
89 // of key's history. Output consists of merge operands only.
90 // - Corruption: Merge operator reported unsuccessful merge or a corrupted
91 // key has been encountered and not expected (applies only when compiling
92 // with asserts removed).
93 // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
95 // REQUIRED: The first key in the input is not corrupted.
96 Status
MergeUntil(InternalIterator
* iter
,
97 CompactionRangeDelAggregator
* range_del_agg
,
98 const SequenceNumber stop_before
, const bool at_bottom
,
99 const bool allow_data_in_errors
,
100 const BlobFetcher
* blob_fetcher
,
101 const std::string
* const full_history_ts_low
,
102 PrefetchBufferCollection
* prefetch_buffers
,
103 CompactionIterationStats
* c_iter_stats
);
105 // Filters a merge operand using the compaction filter specified
106 // in the constructor. Returns the decision that the filter made.
107 // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
108 // optional outputs of compaction filter.
109 // user_key includes timestamp if user-defined timestamp is enabled.
110 CompactionFilter::Decision
FilterMerge(const Slice
& user_key
,
111 const Slice
& value_slice
);
113 // Query the merge result
114 // These are valid until the next MergeUntil call
115 // If the merging was successful:
116 // - keys() contains a single element with the latest sequence number of
117 // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
118 // - values() contains a single element with the result of merging all the
121 // IMPORTANT 1: the key type could change after the MergeUntil call.
122 // Put/Delete + Merge + ... + Merge => Put
123 // Merge + ... + Merge => Merge
125 // If the merge operator is not associative, and if a Put/Delete is not found
126 // then the merging will be unsuccessful. In this case:
127 // - keys() contains the list of internal keys seen in order of iteration.
128 // - values() contains the list of values (merges) seen in the same order.
129 // values() is parallel to keys() so that the first entry in
130 // keys() is the key associated with the first entry in values()
131 // and so on. These lists will be the same length.
132 // All of these pairs will be merges over the same user key.
133 // See IMPORTANT 2 note below.
135 // IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
136 // So keys().back() was the first key seen by iterator.
137 // TODO: Re-style this comment to be like the first one
138 const std::deque
<std::string
>& keys() const { return keys_
; }
139 const std::vector
<Slice
>& values() const {
140 return merge_context_
.GetOperands();
142 uint64_t TotalFilterTime() const { return total_filter_time_
; }
143 bool HasOperator() const { return user_merge_operator_
!= nullptr; }
145 // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
146 // return true and fill *until with the key to which we should skip.
147 // If true, keys() and values() are empty.
148 bool FilteredUntil(Slice
* skip_until
) const {
149 if (!has_compaction_filter_skip_until_
) {
152 assert(compaction_filter_
!= nullptr);
153 assert(skip_until
!= nullptr);
154 assert(compaction_filter_skip_until_
.Valid());
155 *skip_until
= compaction_filter_skip_until_
.Encode();
162 const Comparator
* user_comparator_
;
163 const MergeOperator
* user_merge_operator_
;
164 const CompactionFilter
* compaction_filter_
;
165 const std::atomic
<bool>* shutting_down_
;
167 bool assert_valid_internal_key_
; // enforce no internal key corruption?
168 bool allow_single_operand_
;
169 SequenceNumber latest_snapshot_
;
170 const SnapshotChecker
* const snapshot_checker_
;
173 // the scratch area that holds the result of MergeUntil
174 // valid up to the next MergeUntil call
176 // Keeps track of the sequence of keys seen
177 std::deque
<std::string
> keys_
;
178 // Parallel with keys_; stores the operands
179 mutable MergeContext merge_context_
;
181 StopWatchNano filter_timer_
;
182 uint64_t total_filter_time_
;
185 bool has_compaction_filter_skip_until_
= false;
186 std::string compaction_filter_value_
;
187 InternalKey compaction_filter_skip_until_
;
189 bool IsShuttingDown() {
190 // This is a best-effort facility, so memory_order_relaxed is sufficient.
191 return shutting_down_
&& shutting_down_
->load(std::memory_order_relaxed
);
195 // MergeOutputIterator can be used to iterate over the result of a merge.
196 class MergeOutputIterator
{
198 // The MergeOutputIterator is bound to a MergeHelper instance.
199 explicit MergeOutputIterator(const MergeHelper
* merge_helper
);
201 // Seeks to the first record in the output.
203 // Advances to the next record in the output.
206 Slice
key() { return Slice(*it_keys_
); }
207 Slice
value() { return Slice(*it_values_
); }
208 bool Valid() { return it_keys_
!= merge_helper_
->keys().rend(); }
211 const MergeHelper
* merge_helper_
;
212 std::deque
<std::string
>::const_reverse_iterator it_keys_
;
213 std::vector
<Slice
>::const_reverse_iterator it_values_
;
216 } // namespace ROCKSDB_NAMESPACE