1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
12 #include "db/dbformat.h"
13 #include "db/merge_context.h"
14 #include "db/range_del_aggregator.h"
15 #include "db/snapshot_checker.h"
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/slice.h"
19 #include "util/stop_watch.h"
31 MergeHelper(Env
* env
, const Comparator
* user_comparator
,
32 const MergeOperator
* user_merge_operator
,
33 const CompactionFilter
* compaction_filter
, Logger
* logger
,
34 bool assert_valid_internal_key
, SequenceNumber latest_snapshot
,
35 const SnapshotChecker
* snapshot_checker
= nullptr, int level
= 0,
36 Statistics
* stats
= nullptr,
37 const std::atomic
<bool>* shutting_down
= nullptr);
39 // Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
40 // Result of merge will be written to result if status returned is OK.
41 // If operands is empty, the value will simply be copied to result.
42 // Set `update_num_ops_stats` to true if it is from a user read, so that
43 // the latency is sensitive.
44 // Returns one of the following statuses:
45 // - OK: Entries were successfully merged.
46 // - Corruption: Merge operator reported unsuccessful merge.
47 static Status
TimedFullMerge(const MergeOperator
* merge_operator
,
48 const Slice
& key
, const Slice
* value
,
49 const std::vector
<Slice
>& operands
,
50 std::string
* result
, Logger
* logger
,
51 Statistics
* statistics
, Env
* env
,
52 Slice
* result_operand
= nullptr,
53 bool update_num_ops_stats
= false);
55 // Merge entries until we hit
58 // - a different user key,
59 // - a specific sequence number (snapshot boundary),
60 // - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
61 // or - the end of iteration
62 // iter: (IN) points to the first merge type entry
63 // (OUT) points to the first entry not included in the merge process
64 // range_del_agg: (IN) filters merge operands covered by range tombstones.
65 // stop_before: (IN) a sequence number that merge should not cross.
66 // 0 means no restriction
67 // at_bottom: (IN) true if the iterator covers the bottem level, which means
68 // we could reach the start of the history of this user key.
70 // Returns one of the following statuses:
71 // - OK: Entries were successfully merged.
72 // - MergeInProgress: Put/Delete not encountered, and didn't reach the start
73 // of key's history. Output consists of merge operands only.
74 // - Corruption: Merge operator reported unsuccessful merge or a corrupted
75 // key has been encountered and not expected (applies only when compiling
76 // with asserts removed).
77 // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
79 // REQUIRED: The first key in the input is not corrupted.
80 Status
MergeUntil(InternalIterator
* iter
,
81 CompactionRangeDelAggregator
* range_del_agg
= nullptr,
82 const SequenceNumber stop_before
= 0,
83 const bool at_bottom
= false);
85 // Filters a merge operand using the compaction filter specified
86 // in the constructor. Returns the decision that the filter made.
87 // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
88 // optional outputs of compaction filter.
89 CompactionFilter::Decision
FilterMerge(const Slice
& user_key
,
90 const Slice
& value_slice
);
92 // Query the merge result
93 // These are valid until the next MergeUntil call
94 // If the merging was successful:
95 // - keys() contains a single element with the latest sequence number of
96 // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
97 // - values() contains a single element with the result of merging all the
100 // IMPORTANT 1: the key type could change after the MergeUntil call.
101 // Put/Delete + Merge + ... + Merge => Put
102 // Merge + ... + Merge => Merge
104 // If the merge operator is not associative, and if a Put/Delete is not found
105 // then the merging will be unsuccessful. In this case:
106 // - keys() contains the list of internal keys seen in order of iteration.
107 // - values() contains the list of values (merges) seen in the same order.
108 // values() is parallel to keys() so that the first entry in
109 // keys() is the key associated with the first entry in values()
110 // and so on. These lists will be the same length.
111 // All of these pairs will be merges over the same user key.
112 // See IMPORTANT 2 note below.
114 // IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
115 // So keys().back() was the first key seen by iterator.
116 // TODO: Re-style this comment to be like the first one
117 const std::deque
<std::string
>& keys() const { return keys_
; }
118 const std::vector
<Slice
>& values() const {
119 return merge_context_
.GetOperands();
121 uint64_t TotalFilterTime() const { return total_filter_time_
; }
122 bool HasOperator() const { return user_merge_operator_
!= nullptr; }
124 // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
125 // return true and fill *until with the key to which we should skip.
126 // If true, keys() and values() are empty.
127 bool FilteredUntil(Slice
* skip_until
) const {
128 if (!has_compaction_filter_skip_until_
) {
131 assert(compaction_filter_
!= nullptr);
132 assert(skip_until
!= nullptr);
133 assert(compaction_filter_skip_until_
.Valid());
134 *skip_until
= compaction_filter_skip_until_
.Encode();
140 const Comparator
* user_comparator_
;
141 const MergeOperator
* user_merge_operator_
;
142 const CompactionFilter
* compaction_filter_
;
143 const std::atomic
<bool>* shutting_down_
;
145 bool assert_valid_internal_key_
; // enforce no internal key corruption?
146 bool allow_single_operand_
;
147 SequenceNumber latest_snapshot_
;
148 const SnapshotChecker
* const snapshot_checker_
;
151 // the scratch area that holds the result of MergeUntil
152 // valid up to the next MergeUntil call
154 // Keeps track of the sequence of keys seen
155 std::deque
<std::string
> keys_
;
156 // Parallel with keys_; stores the operands
157 mutable MergeContext merge_context_
;
159 StopWatchNano filter_timer_
;
160 uint64_t total_filter_time_
;
163 bool has_compaction_filter_skip_until_
= false;
164 std::string compaction_filter_value_
;
165 InternalKey compaction_filter_skip_until_
;
167 bool IsShuttingDown() {
168 // This is a best-effort facility, so memory_order_relaxed is sufficient.
169 return shutting_down_
&& shutting_down_
->load(std::memory_order_relaxed
);
173 // MergeOutputIterator can be used to iterate over the result of a merge.
174 class MergeOutputIterator
{
176 // The MergeOutputIterator is bound to a MergeHelper instance.
177 explicit MergeOutputIterator(const MergeHelper
* merge_helper
);
179 // Seeks to the first record in the output.
181 // Advances to the next record in the output.
184 Slice
key() { return Slice(*it_keys_
); }
185 Slice
value() { return Slice(*it_values_
); }
186 bool Valid() { return it_keys_
!= merge_helper_
->keys().rend(); }
189 const MergeHelper
* merge_helper_
;
190 std::deque
<std::string
>::const_reverse_iterator it_keys_
;
191 std::vector
<Slice
>::const_reverse_iterator it_values_
;
194 } // namespace rocksdb