1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/merge_helper.h"
10 #include "db/dbformat.h"
11 #include "monitoring/perf_context_imp.h"
12 #include "monitoring/statistics.h"
13 #include "port/likely.h"
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/merge_operator.h"
17 #include "table/format.h"
18 #include "table/internal_iterator.h"
20 namespace ROCKSDB_NAMESPACE
{
22 MergeHelper::MergeHelper(Env
* env
, const Comparator
* user_comparator
,
23 const MergeOperator
* user_merge_operator
,
24 const CompactionFilter
* compaction_filter
,
25 Logger
* logger
, bool assert_valid_internal_key
,
26 SequenceNumber latest_snapshot
,
27 const SnapshotChecker
* snapshot_checker
, int level
,
29 const std::atomic
<bool>* shutting_down
)
31 user_comparator_(user_comparator
),
32 user_merge_operator_(user_merge_operator
),
33 compaction_filter_(compaction_filter
),
34 shutting_down_(shutting_down
),
36 assert_valid_internal_key_(assert_valid_internal_key
),
37 allow_single_operand_(false),
38 latest_snapshot_(latest_snapshot
),
39 snapshot_checker_(snapshot_checker
),
43 total_filter_time_(0U),
45 assert(user_comparator_
!= nullptr);
46 if (user_merge_operator_
) {
47 allow_single_operand_
= user_merge_operator_
->AllowSingleOperand();
51 Status
MergeHelper::TimedFullMerge(const MergeOperator
* merge_operator
,
52 const Slice
& key
, const Slice
* value
,
53 const std::vector
<Slice
>& operands
,
54 std::string
* result
, Logger
* logger
,
55 Statistics
* statistics
, Env
* env
,
56 Slice
* result_operand
,
57 bool update_num_ops_stats
) {
58 assert(merge_operator
!= nullptr);
60 if (operands
.size() == 0) {
61 assert(value
!= nullptr && result
!= nullptr);
62 result
->assign(value
->data(), value
->size());
66 if (update_num_ops_stats
) {
67 RecordInHistogram(statistics
, READ_NUM_MERGE_OPERANDS
,
68 static_cast<uint64_t>(operands
.size()));
72 Slice
tmp_result_operand(nullptr, 0);
73 const MergeOperator::MergeOperationInput
merge_in(key
, value
, operands
,
75 MergeOperator::MergeOperationOutput
merge_out(*result
, tmp_result_operand
);
77 // Setup to time the merge
78 StopWatchNano
timer(env
, statistics
!= nullptr);
79 PERF_TIMER_GUARD(merge_operator_time_nanos
);
82 success
= merge_operator
->FullMergeV2(merge_in
, &merge_out
);
84 if (tmp_result_operand
.data()) {
85 // FullMergeV2 result is an existing operand
86 if (result_operand
!= nullptr) {
87 *result_operand
= tmp_result_operand
;
89 result
->assign(tmp_result_operand
.data(), tmp_result_operand
.size());
91 } else if (result_operand
) {
92 *result_operand
= Slice(nullptr, 0);
95 RecordTick(statistics
, MERGE_OPERATION_TOTAL_TIME
,
96 statistics
? timer
.ElapsedNanos() : 0);
100 RecordTick(statistics
, NUMBER_MERGE_FAILURES
);
101 return Status::Corruption("Error: Could not perform merge.");
107 // PRE: iter points to the first merge type entry
108 // POST: iter points to the first entry beyond the merge process (or the end)
109 // keys_, operands_ are updated to reflect the merge result.
110 // keys_ stores the list of keys encountered while merging.
111 // operands_ stores the list of merge operands encountered while merging.
112 // keys_[i] corresponds to operands_[i] for each i.
114 // TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
115 // and just pass the StripeRep corresponding to the stripe being merged.
116 Status
MergeHelper::MergeUntil(InternalIterator
* iter
,
117 CompactionRangeDelAggregator
* range_del_agg
,
118 const SequenceNumber stop_before
,
119 const bool at_bottom
) {
120 // Get a copy of the internal key, before it's invalidated by iter->Next()
121 // Also maintain the list of merge operands seen.
122 assert(HasOperator());
124 merge_context_
.Clear();
125 has_compaction_filter_skip_until_
= false;
126 assert(user_merge_operator_
);
127 bool first_key
= true;
129 // We need to parse the internal key again as the parsed key is
130 // backed by the internal key!
131 // Assume no internal key corruption as it has been successfully parsed
133 // original_key_is_iter variable is just caching the information:
134 // original_key_is_iter == (iter->key().ToString() == original_key)
135 bool original_key_is_iter
= true;
136 std::string original_key
= iter
->key().ToString();
138 // orig_ikey is backed by original_key if keys_.empty()
139 // orig_ikey is backed by keys_.back() if !keys_.empty()
140 ParsedInternalKey orig_ikey
;
141 bool succ
= ParseInternalKey(original_key
, &orig_ikey
);
144 return Status::Corruption("Cannot parse key in MergeUntil");
148 bool hit_the_next_user_key
= false;
149 for (; iter
->Valid(); iter
->Next(), original_key_is_iter
= false) {
150 if (IsShuttingDown()) {
151 return Status::ShutdownInProgress();
154 ParsedInternalKey ikey
;
155 assert(keys_
.size() == merge_context_
.GetNumOperands());
157 if (!ParseInternalKey(iter
->key(), &ikey
)) {
158 // stop at corrupted key
159 if (assert_valid_internal_key_
) {
160 assert(!"Corrupted internal key not expected.");
161 return Status::Corruption("Corrupted internal key not expected.");
164 } else if (first_key
) {
165 assert(user_comparator_
->Equal(ikey
.user_key
, orig_ikey
.user_key
));
167 } else if (!user_comparator_
->Equal(ikey
.user_key
, orig_ikey
.user_key
)) {
168 // hit a different user key, stop right here
169 hit_the_next_user_key
= true;
171 } else if (stop_before
> 0 && ikey
.sequence
<= stop_before
&&
172 LIKELY(snapshot_checker_
== nullptr ||
173 snapshot_checker_
->CheckInSnapshot(ikey
.sequence
,
175 SnapshotCheckerResult::kNotInSnapshot
)) {
176 // hit an entry that's possibly visible by the previous snapshot, can't
181 // At this point we are guaranteed that we need to process this key.
183 assert(IsValueType(ikey
.type
));
184 if (ikey
.type
!= kTypeMerge
) {
186 // hit a put/delete/single delete
187 // => merge the put value or a nullptr with operands_
188 // => store result in operands_.back() (and update keys_.back())
189 // => change the entry type to kTypeValue for keys_.back()
190 // We are done! Success!
192 // If there are no operands, just return the Status::OK(). That will cause
193 // the compaction iterator to write out the key we're currently at, which
194 // is the put/delete we just encountered.
199 // TODO(noetzli) If the merge operator returns false, we are currently
200 // (almost) silently dropping the put/delete. That's probably not what we
201 // want. Also if we're in compaction and it's a put, it would be nice to
202 // run compaction filter on it.
203 const Slice val
= iter
->value();
204 const Slice
* val_ptr
;
205 if (kTypeValue
== ikey
.type
&&
206 (range_del_agg
== nullptr ||
207 !range_del_agg
->ShouldDelete(
208 ikey
, RangeDelPositioningMode::kForwardTraversal
))) {
213 std::string merge_result
;
214 s
= TimedFullMerge(user_merge_operator_
, ikey
.user_key
, val_ptr
,
215 merge_context_
.GetOperands(), &merge_result
, logger_
,
218 // We store the result in keys_.back() and operands_.back()
219 // if nothing went wrong (i.e.: no operand corruption on disk)
221 // The original key encountered
222 original_key
= std::move(keys_
.back());
223 orig_ikey
.type
= kTypeValue
;
224 UpdateInternalKey(&original_key
, orig_ikey
.sequence
, orig_ikey
.type
);
226 merge_context_
.Clear();
227 keys_
.emplace_front(std::move(original_key
));
228 merge_context_
.PushOperand(merge_result
);
231 // move iter to the next entry
236 // => if there is a compaction filter, apply it.
237 // => check for range tombstones covering the operand
238 // => merge the operand into the front of the operands_ list
240 // => then continue because we haven't yet seen a Put/Delete.
242 // Keep queuing keys and operands until we either meet a put / delete
243 // request or later did a partial merge.
245 Slice value_slice
= iter
->value();
246 // add an operand to the list if:
247 // 1) it's included in one of the snapshots. in that case we *must* write
248 // it out, no matter what compaction filter says
249 // 2) it's not filtered by a compaction filter
250 CompactionFilter::Decision filter
=
251 ikey
.sequence
<= latest_snapshot_
252 ? CompactionFilter::Decision::kKeep
253 : FilterMerge(orig_ikey
.user_key
, value_slice
);
254 if (filter
!= CompactionFilter::Decision::kRemoveAndSkipUntil
&&
255 range_del_agg
!= nullptr &&
256 range_del_agg
->ShouldDelete(
257 iter
->key(), RangeDelPositioningMode::kForwardTraversal
)) {
258 filter
= CompactionFilter::Decision::kRemove
;
260 if (filter
== CompactionFilter::Decision::kKeep
||
261 filter
== CompactionFilter::Decision::kChangeValue
) {
262 if (original_key_is_iter
) {
263 // this is just an optimization that saves us one memcpy
264 keys_
.push_front(std::move(original_key
));
266 keys_
.push_front(iter
->key().ToString());
268 if (keys_
.size() == 1) {
269 // we need to re-anchor the orig_ikey because it was anchored by
270 // original_key before
271 ParseInternalKey(keys_
.back(), &orig_ikey
);
273 if (filter
== CompactionFilter::Decision::kKeep
) {
274 merge_context_
.PushOperand(
275 value_slice
, iter
->IsValuePinned() /* operand_pinned */);
276 } else { // kChangeValue
277 // Compaction filter asked us to change the operand from value_slice
278 // to compaction_filter_value_.
279 merge_context_
.PushOperand(compaction_filter_value_
, false);
281 } else if (filter
== CompactionFilter::Decision::kRemoveAndSkipUntil
) {
282 // Compaction filter asked us to remove this key altogether
283 // (not just this operand), along with some keys following it.
285 merge_context_
.Clear();
286 has_compaction_filter_skip_until_
= true;
292 if (merge_context_
.GetNumOperands() == 0) {
293 // we filtered out all the merge operands
297 // We are sure we have seen this key's entire history if:
298 // at_bottom == true (this does not necessarily mean it is the bottommost
299 // layer, but rather that we are confident the key does not appear on any of
300 // the lower layers, at_bottom == false doesn't mean it does appear, just
301 // that we can't be sure, see Compaction::IsBottommostLevel for details)
303 // we have either encountered another key or end of key history on this
306 // When these conditions are true we are able to merge all the keys
309 // For these cases we are not sure about, we simply miss the opportunity
310 // to combine the keys. Since VersionSet::SetupOtherInputs() always makes
311 // sure that all merge-operands on the same level get compacted together,
312 // this will simply lead to these merge operands moving to the next level.
313 bool surely_seen_the_beginning
=
314 (hit_the_next_user_key
|| !iter
->Valid()) && at_bottom
;
315 if (surely_seen_the_beginning
) {
316 // do a final merge with nullptr as the existing value and say
317 // bye to the merge type (it's now converted to a Put)
318 assert(kTypeMerge
== orig_ikey
.type
);
319 assert(merge_context_
.GetNumOperands() >= 1);
320 assert(merge_context_
.GetNumOperands() == keys_
.size());
321 std::string merge_result
;
322 s
= TimedFullMerge(user_merge_operator_
, orig_ikey
.user_key
, nullptr,
323 merge_context_
.GetOperands(), &merge_result
, logger_
,
326 // The original key encountered
327 // We are certain that keys_ is not empty here (see assertions couple of
329 original_key
= std::move(keys_
.back());
330 orig_ikey
.type
= kTypeValue
;
331 UpdateInternalKey(&original_key
, orig_ikey
.sequence
, orig_ikey
.type
);
333 merge_context_
.Clear();
334 keys_
.emplace_front(std::move(original_key
));
335 merge_context_
.PushOperand(merge_result
);
338 // We haven't seen the beginning of the key nor a Put/Delete.
339 // Attempt to use the user's associative merge function to
340 // merge the stacked merge operands into a single operand.
341 s
= Status::MergeInProgress();
342 if (merge_context_
.GetNumOperands() >= 2 ||
343 (allow_single_operand_
&& merge_context_
.GetNumOperands() == 1)) {
344 bool merge_success
= false;
345 std::string merge_result
;
347 StopWatchNano
timer(env_
, stats_
!= nullptr);
348 PERF_TIMER_GUARD(merge_operator_time_nanos
);
349 merge_success
= user_merge_operator_
->PartialMergeMulti(
351 std::deque
<Slice
>(merge_context_
.GetOperands().begin(),
352 merge_context_
.GetOperands().end()),
353 &merge_result
, logger_
);
354 RecordTick(stats_
, MERGE_OPERATION_TOTAL_TIME
,
355 stats_
? timer
.ElapsedNanosSafe() : 0);
358 // Merging of operands (associative merge) was successful.
359 // Replace operands with the merge result
360 merge_context_
.Clear();
361 merge_context_
.PushOperand(merge_result
);
362 keys_
.erase(keys_
.begin(), keys_
.end() - 1);
370 MergeOutputIterator::MergeOutputIterator(const MergeHelper
* merge_helper
)
371 : merge_helper_(merge_helper
) {
372 it_keys_
= merge_helper_
->keys().rend();
373 it_values_
= merge_helper_
->values().rend();
376 void MergeOutputIterator::SeekToFirst() {
377 const auto& keys
= merge_helper_
->keys();
378 const auto& values
= merge_helper_
->values();
379 assert(keys
.size() == values
.size());
380 it_keys_
= keys
.rbegin();
381 it_values_
= values
.rbegin();
384 void MergeOutputIterator::Next() {
389 CompactionFilter::Decision
MergeHelper::FilterMerge(const Slice
& user_key
,
390 const Slice
& value_slice
) {
391 if (compaction_filter_
== nullptr) {
392 return CompactionFilter::Decision::kKeep
;
394 if (stats_
!= nullptr && ShouldReportDetailedTime(env_
, stats_
)) {
395 filter_timer_
.Start();
397 compaction_filter_value_
.clear();
398 compaction_filter_skip_until_
.Clear();
399 auto ret
= compaction_filter_
->FilterV2(
400 level_
, user_key
, CompactionFilter::ValueType::kMergeOperand
, value_slice
,
401 &compaction_filter_value_
, compaction_filter_skip_until_
.rep());
402 if (ret
== CompactionFilter::Decision::kRemoveAndSkipUntil
) {
403 if (user_comparator_
->Compare(*compaction_filter_skip_until_
.rep(),
405 // Invalid skip_until returned from compaction filter.
406 // Keep the key as per FilterV2 documentation.
407 ret
= CompactionFilter::Decision::kKeep
;
409 compaction_filter_skip_until_
.ConvertFromUserKey(kMaxSequenceNumber
,
413 total_filter_time_
+= filter_timer_
.ElapsedNanosSafe();
417 } // namespace ROCKSDB_NAMESPACE