]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/merge_helper.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / merge_helper.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#include "db/merge_helper.h"
7
7c673cae
FG
8#include <string>
9
10#include "db/dbformat.h"
11#include "monitoring/perf_context_imp.h"
12#include "monitoring/statistics.h"
11fdf7f2 13#include "port/likely.h"
7c673cae
FG
14#include "rocksdb/comparator.h"
15#include "rocksdb/db.h"
16#include "rocksdb/merge_operator.h"
11fdf7f2 17#include "table/format.h"
7c673cae
FG
18#include "table/internal_iterator.h"
19
20namespace rocksdb {
21
11fdf7f2
TL
22MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
23 const MergeOperator* user_merge_operator,
24 const CompactionFilter* compaction_filter,
25 Logger* logger, bool assert_valid_internal_key,
26 SequenceNumber latest_snapshot,
27 const SnapshotChecker* snapshot_checker, int level,
28 Statistics* stats,
29 const std::atomic<bool>* shutting_down)
30 : env_(env),
31 user_comparator_(user_comparator),
32 user_merge_operator_(user_merge_operator),
33 compaction_filter_(compaction_filter),
34 shutting_down_(shutting_down),
35 logger_(logger),
36 assert_valid_internal_key_(assert_valid_internal_key),
37 allow_single_operand_(false),
38 latest_snapshot_(latest_snapshot),
39 snapshot_checker_(snapshot_checker),
40 level_(level),
41 keys_(),
42 filter_timer_(env_),
43 total_filter_time_(0U),
44 stats_(stats) {
45 assert(user_comparator_ != nullptr);
46 if (user_merge_operator_) {
47 allow_single_operand_ = user_merge_operator_->AllowSingleOperand();
48 }
49}
50
7c673cae
FG
51Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
52 const Slice& key, const Slice* value,
53 const std::vector<Slice>& operands,
54 std::string* result, Logger* logger,
55 Statistics* statistics, Env* env,
11fdf7f2
TL
56 Slice* result_operand,
57 bool update_num_ops_stats) {
7c673cae
FG
58 assert(merge_operator != nullptr);
59
60 if (operands.size() == 0) {
61 assert(value != nullptr && result != nullptr);
62 result->assign(value->data(), value->size());
63 return Status::OK();
64 }
65
11fdf7f2 66 if (update_num_ops_stats) {
494da23a
TL
67 RecordInHistogram(statistics, READ_NUM_MERGE_OPERANDS,
68 static_cast<uint64_t>(operands.size()));
11fdf7f2
TL
69 }
70
7c673cae
FG
71 bool success;
72 Slice tmp_result_operand(nullptr, 0);
73 const MergeOperator::MergeOperationInput merge_in(key, value, operands,
74 logger);
75 MergeOperator::MergeOperationOutput merge_out(*result, tmp_result_operand);
76 {
77 // Setup to time the merge
78 StopWatchNano timer(env, statistics != nullptr);
79 PERF_TIMER_GUARD(merge_operator_time_nanos);
80
81 // Do the merge
82 success = merge_operator->FullMergeV2(merge_in, &merge_out);
83
84 if (tmp_result_operand.data()) {
85 // FullMergeV2 result is an existing operand
86 if (result_operand != nullptr) {
87 *result_operand = tmp_result_operand;
88 } else {
89 result->assign(tmp_result_operand.data(), tmp_result_operand.size());
90 }
91 } else if (result_operand) {
92 *result_operand = Slice(nullptr, 0);
93 }
94
95 RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME,
96 statistics ? timer.ElapsedNanos() : 0);
97 }
98
99 if (!success) {
100 RecordTick(statistics, NUMBER_MERGE_FAILURES);
101 return Status::Corruption("Error: Could not perform merge.");
102 }
103
104 return Status::OK();
105}
106
107// PRE: iter points to the first merge type entry
108// POST: iter points to the first entry beyond the merge process (or the end)
109// keys_, operands_ are updated to reflect the merge result.
110// keys_ stores the list of keys encountered while merging.
111// operands_ stores the list of merge operands encountered while merging.
112// keys_[i] corresponds to operands_[i] for each i.
494da23a
TL
113//
114// TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
115// and just pass the StripeRep corresponding to the stripe being merged.
7c673cae 116Status MergeHelper::MergeUntil(InternalIterator* iter,
494da23a 117 CompactionRangeDelAggregator* range_del_agg,
7c673cae
FG
118 const SequenceNumber stop_before,
119 const bool at_bottom) {
120 // Get a copy of the internal key, before it's invalidated by iter->Next()
121 // Also maintain the list of merge operands seen.
122 assert(HasOperator());
123 keys_.clear();
124 merge_context_.Clear();
125 has_compaction_filter_skip_until_ = false;
126 assert(user_merge_operator_);
127 bool first_key = true;
128
129 // We need to parse the internal key again as the parsed key is
130 // backed by the internal key!
131 // Assume no internal key corruption as it has been successfully parsed
132 // by the caller.
133 // original_key_is_iter variable is just caching the information:
134 // original_key_is_iter == (iter->key().ToString() == original_key)
135 bool original_key_is_iter = true;
136 std::string original_key = iter->key().ToString();
137 // Important:
138 // orig_ikey is backed by original_key if keys_.empty()
139 // orig_ikey is backed by keys_.back() if !keys_.empty()
140 ParsedInternalKey orig_ikey;
494da23a
TL
141 bool succ = ParseInternalKey(original_key, &orig_ikey);
142 assert(succ);
143 if (!succ) {
144 return Status::Corruption("Cannot parse key in MergeUntil");
145 }
7c673cae
FG
146
147 Status s;
148 bool hit_the_next_user_key = false;
149 for (; iter->Valid(); iter->Next(), original_key_is_iter = false) {
150 if (IsShuttingDown()) {
151 return Status::ShutdownInProgress();
152 }
153
154 ParsedInternalKey ikey;
155 assert(keys_.size() == merge_context_.GetNumOperands());
156
157 if (!ParseInternalKey(iter->key(), &ikey)) {
158 // stop at corrupted key
159 if (assert_valid_internal_key_) {
160 assert(!"Corrupted internal key not expected.");
161 return Status::Corruption("Corrupted internal key not expected.");
162 }
163 break;
164 } else if (first_key) {
165 assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key));
166 first_key = false;
167 } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) {
168 // hit a different user key, stop right here
169 hit_the_next_user_key = true;
170 break;
11fdf7f2
TL
171 } else if (stop_before > 0 && ikey.sequence <= stop_before &&
172 LIKELY(snapshot_checker_ == nullptr ||
494da23a
TL
173 snapshot_checker_->CheckInSnapshot(ikey.sequence,
174 stop_before) !=
175 SnapshotCheckerResult::kNotInSnapshot)) {
176 // hit an entry that's possibly visible by the previous snapshot, can't
177 // touch that
7c673cae
FG
178 break;
179 }
180
181 // At this point we are guaranteed that we need to process this key.
182
183 assert(IsValueType(ikey.type));
184 if (ikey.type != kTypeMerge) {
185
186 // hit a put/delete/single delete
187 // => merge the put value or a nullptr with operands_
188 // => store result in operands_.back() (and update keys_.back())
189 // => change the entry type to kTypeValue for keys_.back()
190 // We are done! Success!
191
192 // If there are no operands, just return the Status::OK(). That will cause
193 // the compaction iterator to write out the key we're currently at, which
194 // is the put/delete we just encountered.
195 if (keys_.empty()) {
196 return Status::OK();
197 }
198
199 // TODO(noetzli) If the merge operator returns false, we are currently
200 // (almost) silently dropping the put/delete. That's probably not what we
201 // want. Also if we're in compaction and it's a put, it would be nice to
202 // run compaction filter on it.
203 const Slice val = iter->value();
494da23a
TL
204 const Slice* val_ptr;
205 if (kTypeValue == ikey.type &&
206 (range_del_agg == nullptr ||
207 !range_del_agg->ShouldDelete(
208 ikey, RangeDelPositioningMode::kForwardTraversal))) {
209 val_ptr = &val;
210 } else {
211 val_ptr = nullptr;
212 }
7c673cae
FG
213 std::string merge_result;
214 s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr,
215 merge_context_.GetOperands(), &merge_result, logger_,
216 stats_, env_);
217
218 // We store the result in keys_.back() and operands_.back()
219 // if nothing went wrong (i.e.: no operand corruption on disk)
220 if (s.ok()) {
221 // The original key encountered
222 original_key = std::move(keys_.back());
223 orig_ikey.type = kTypeValue;
224 UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
225 keys_.clear();
226 merge_context_.Clear();
227 keys_.emplace_front(std::move(original_key));
228 merge_context_.PushOperand(merge_result);
229 }
230
231 // move iter to the next entry
232 iter->Next();
233 return s;
234 } else {
235 // hit a merge
236 // => if there is a compaction filter, apply it.
237 // => check for range tombstones covering the operand
238 // => merge the operand into the front of the operands_ list
239 // if not filtered
240 // => then continue because we haven't yet seen a Put/Delete.
241 //
242 // Keep queuing keys and operands until we either meet a put / delete
243 // request or later did a partial merge.
244
245 Slice value_slice = iter->value();
246 // add an operand to the list if:
247 // 1) it's included in one of the snapshots. in that case we *must* write
248 // it out, no matter what compaction filter says
249 // 2) it's not filtered by a compaction filter
250 CompactionFilter::Decision filter =
251 ikey.sequence <= latest_snapshot_
252 ? CompactionFilter::Decision::kKeep
253 : FilterMerge(orig_ikey.user_key, value_slice);
11fdf7f2
TL
254 if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
255 range_del_agg != nullptr &&
7c673cae 256 range_del_agg->ShouldDelete(
11fdf7f2 257 iter->key(), RangeDelPositioningMode::kForwardTraversal)) {
7c673cae
FG
258 filter = CompactionFilter::Decision::kRemove;
259 }
260 if (filter == CompactionFilter::Decision::kKeep ||
261 filter == CompactionFilter::Decision::kChangeValue) {
262 if (original_key_is_iter) {
263 // this is just an optimization that saves us one memcpy
264 keys_.push_front(std::move(original_key));
265 } else {
266 keys_.push_front(iter->key().ToString());
267 }
268 if (keys_.size() == 1) {
269 // we need to re-anchor the orig_ikey because it was anchored by
270 // original_key before
271 ParseInternalKey(keys_.back(), &orig_ikey);
272 }
273 if (filter == CompactionFilter::Decision::kKeep) {
274 merge_context_.PushOperand(
275 value_slice, iter->IsValuePinned() /* operand_pinned */);
276 } else { // kChangeValue
277 // Compaction filter asked us to change the operand from value_slice
278 // to compaction_filter_value_.
279 merge_context_.PushOperand(compaction_filter_value_, false);
280 }
281 } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
282 // Compaction filter asked us to remove this key altogether
283 // (not just this operand), along with some keys following it.
284 keys_.clear();
285 merge_context_.Clear();
286 has_compaction_filter_skip_until_ = true;
287 return Status::OK();
288 }
289 }
290 }
291
292 if (merge_context_.GetNumOperands() == 0) {
293 // we filtered out all the merge operands
294 return Status::OK();
295 }
296
494da23a
TL
297 // We are sure we have seen this key's entire history if:
298 // at_bottom == true (this does not necessarily mean it is the bottommost
299 // layer, but rather that we are confident the key does not appear on any of
300 // the lower layers, at_bottom == false doesn't mean it does appear, just
301 // that we can't be sure, see Compaction::IsBottommostLevel for details)
302 // AND
303 // we have either encountered another key or end of key history on this
304 // layer.
7c673cae 305 //
494da23a
TL
306 // When these conditions are true we are able to merge all the keys
307 // using full merge.
308 //
309 // For these cases we are not sure about, we simply miss the opportunity
7c673cae
FG
310 // to combine the keys. Since VersionSet::SetupOtherInputs() always makes
311 // sure that all merge-operands on the same level get compacted together,
312 // this will simply lead to these merge operands moving to the next level.
494da23a
TL
313 bool surely_seen_the_beginning =
314 (hit_the_next_user_key || !iter->Valid()) && at_bottom;
7c673cae
FG
315 if (surely_seen_the_beginning) {
316 // do a final merge with nullptr as the existing value and say
317 // bye to the merge type (it's now converted to a Put)
318 assert(kTypeMerge == orig_ikey.type);
319 assert(merge_context_.GetNumOperands() >= 1);
320 assert(merge_context_.GetNumOperands() == keys_.size());
321 std::string merge_result;
322 s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr,
323 merge_context_.GetOperands(), &merge_result, logger_,
324 stats_, env_);
325 if (s.ok()) {
326 // The original key encountered
327 // We are certain that keys_ is not empty here (see assertions couple of
328 // lines before).
329 original_key = std::move(keys_.back());
330 orig_ikey.type = kTypeValue;
331 UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
332 keys_.clear();
333 merge_context_.Clear();
334 keys_.emplace_front(std::move(original_key));
335 merge_context_.PushOperand(merge_result);
336 }
337 } else {
338 // We haven't seen the beginning of the key nor a Put/Delete.
339 // Attempt to use the user's associative merge function to
340 // merge the stacked merge operands into a single operand.
341 s = Status::MergeInProgress();
11fdf7f2
TL
342 if (merge_context_.GetNumOperands() >= 2 ||
343 (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) {
7c673cae
FG
344 bool merge_success = false;
345 std::string merge_result;
346 {
347 StopWatchNano timer(env_, stats_ != nullptr);
348 PERF_TIMER_GUARD(merge_operator_time_nanos);
349 merge_success = user_merge_operator_->PartialMergeMulti(
350 orig_ikey.user_key,
351 std::deque<Slice>(merge_context_.GetOperands().begin(),
352 merge_context_.GetOperands().end()),
353 &merge_result, logger_);
354 RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME,
355 stats_ ? timer.ElapsedNanosSafe() : 0);
356 }
357 if (merge_success) {
358 // Merging of operands (associative merge) was successful.
359 // Replace operands with the merge result
360 merge_context_.Clear();
361 merge_context_.PushOperand(merge_result);
362 keys_.erase(keys_.begin(), keys_.end() - 1);
363 }
364 }
365 }
366
367 return s;
368}
369
370MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper)
371 : merge_helper_(merge_helper) {
372 it_keys_ = merge_helper_->keys().rend();
373 it_values_ = merge_helper_->values().rend();
374}
375
376void MergeOutputIterator::SeekToFirst() {
377 const auto& keys = merge_helper_->keys();
378 const auto& values = merge_helper_->values();
379 assert(keys.size() == values.size());
380 it_keys_ = keys.rbegin();
381 it_values_ = values.rbegin();
382}
383
384void MergeOutputIterator::Next() {
385 ++it_keys_;
386 ++it_values_;
387}
388
389CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
390 const Slice& value_slice) {
391 if (compaction_filter_ == nullptr) {
392 return CompactionFilter::Decision::kKeep;
393 }
11fdf7f2 394 if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
7c673cae
FG
395 filter_timer_.Start();
396 }
397 compaction_filter_value_.clear();
398 compaction_filter_skip_until_.Clear();
399 auto ret = compaction_filter_->FilterV2(
400 level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice,
401 &compaction_filter_value_, compaction_filter_skip_until_.rep());
402 if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) {
403 if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(),
404 user_key) <= 0) {
405 // Invalid skip_until returned from compaction filter.
406 // Keep the key as per FilterV2 documentation.
407 ret = CompactionFilter::Decision::kKeep;
408 } else {
409 compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
410 kValueTypeForSeek);
411 }
412 }
413 total_filter_time_ += filter_timer_.ElapsedNanosSafe();
414 return ret;
415}
416
417} // namespace rocksdb