]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/merge_helper.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / merge_helper.h
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae 5//
11fdf7f2 6#pragma once
7c673cae
FG
7
8#include <deque>
9#include <string>
10#include <vector>
11
7c673cae
FG
12#include "db/merge_context.h"
13#include "db/range_del_aggregator.h"
11fdf7f2 14#include "db/snapshot_checker.h"
7c673cae
FG
15#include "rocksdb/compaction_filter.h"
16#include "rocksdb/env.h"
1e59de90 17#include "rocksdb/merge_operator.h"
7c673cae 18#include "rocksdb/slice.h"
1e59de90 19#include "rocksdb/wide_columns.h"
7c673cae
FG
20#include "util/stop_watch.h"
21
f67539c2 22namespace ROCKSDB_NAMESPACE {
7c673cae
FG
23
24class Comparator;
25class Iterator;
26class Logger;
27class MergeOperator;
28class Statistics;
1e59de90
TL
29class SystemClock;
30class BlobFetcher;
31class PrefetchBufferCollection;
32struct CompactionIterationStats;
7c673cae
FG
33
34class MergeHelper {
35 public:
36 MergeHelper(Env* env, const Comparator* user_comparator,
37 const MergeOperator* user_merge_operator,
38 const CompactionFilter* compaction_filter, Logger* logger,
39 bool assert_valid_internal_key, SequenceNumber latest_snapshot,
11fdf7f2
TL
40 const SnapshotChecker* snapshot_checker = nullptr, int level = 0,
41 Statistics* stats = nullptr,
42 const std::atomic<bool>* shutting_down = nullptr);
7c673cae
FG
43
44 // Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
45 // Result of merge will be written to result if status returned is OK.
46 // If operands is empty, the value will simply be copied to result.
11fdf7f2
TL
47 // Set `update_num_ops_stats` to true if it is from a user read, so that
48 // the latency is sensitive.
7c673cae
FG
49 // Returns one of the following statuses:
50 // - OK: Entries were successfully merged.
51 // - Corruption: Merge operator reported unsuccessful merge.
52 static Status TimedFullMerge(const MergeOperator* merge_operator,
53 const Slice& key, const Slice* value,
54 const std::vector<Slice>& operands,
55 std::string* result, Logger* logger,
1e59de90
TL
56 Statistics* statistics, SystemClock* clock,
57 Slice* result_operand,
58 bool update_num_ops_stats);
7c673cae 59
1e59de90
TL
60 static Status TimedFullMergeWithEntity(
61 const MergeOperator* merge_operator, const Slice& key, Slice base_entity,
62 const std::vector<Slice>& operands, std::string* result, Logger* logger,
63 Statistics* statistics, SystemClock* clock, bool update_num_ops_stats);
64
65 // During compaction, merge entries until we hit
7c673cae
FG
66 // - a corrupted key
67 // - a Put/Delete,
68 // - a different user key,
69 // - a specific sequence number (snapshot boundary),
70 // - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
71 // or - the end of iteration
72 // iter: (IN) points to the first merge type entry
73 // (OUT) points to the first entry not included in the merge process
74 // range_del_agg: (IN) filters merge operands covered by range tombstones.
75 // stop_before: (IN) a sequence number that merge should not cross.
76 // 0 means no restriction
77 // at_bottom: (IN) true if the iterator covers the bottem level, which means
78 // we could reach the start of the history of this user key.
20effc67
TL
79 // allow_data_in_errors: (IN) if true, data details will be displayed in
80 // error/log messages.
1e59de90
TL
81 // blob_fetcher: (IN) blob fetcher object for the compaction's input version.
82 // prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers
83 // used for compaction readahead.
84 // c_iter_stats: (OUT) compaction iteration statistics.
7c673cae
FG
85 //
86 // Returns one of the following statuses:
87 // - OK: Entries were successfully merged.
88 // - MergeInProgress: Put/Delete not encountered, and didn't reach the start
89 // of key's history. Output consists of merge operands only.
90 // - Corruption: Merge operator reported unsuccessful merge or a corrupted
91 // key has been encountered and not expected (applies only when compiling
92 // with asserts removed).
93 // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
94 //
95 // REQUIRED: The first key in the input is not corrupted.
96 Status MergeUntil(InternalIterator* iter,
1e59de90
TL
97 CompactionRangeDelAggregator* range_del_agg,
98 const SequenceNumber stop_before, const bool at_bottom,
99 const bool allow_data_in_errors,
100 const BlobFetcher* blob_fetcher,
101 const std::string* const full_history_ts_low,
102 PrefetchBufferCollection* prefetch_buffers,
103 CompactionIterationStats* c_iter_stats);
7c673cae
FG
104
105 // Filters a merge operand using the compaction filter specified
106 // in the constructor. Returns the decision that the filter made.
107 // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
108 // optional outputs of compaction filter.
1e59de90 109 // user_key includes timestamp if user-defined timestamp is enabled.
7c673cae
FG
110 CompactionFilter::Decision FilterMerge(const Slice& user_key,
111 const Slice& value_slice);
112
113 // Query the merge result
114 // These are valid until the next MergeUntil call
115 // If the merging was successful:
116 // - keys() contains a single element with the latest sequence number of
117 // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
118 // - values() contains a single element with the result of merging all the
119 // operands together
120 //
121 // IMPORTANT 1: the key type could change after the MergeUntil call.
122 // Put/Delete + Merge + ... + Merge => Put
123 // Merge + ... + Merge => Merge
124 //
125 // If the merge operator is not associative, and if a Put/Delete is not found
126 // then the merging will be unsuccessful. In this case:
127 // - keys() contains the list of internal keys seen in order of iteration.
128 // - values() contains the list of values (merges) seen in the same order.
129 // values() is parallel to keys() so that the first entry in
130 // keys() is the key associated with the first entry in values()
131 // and so on. These lists will be the same length.
132 // All of these pairs will be merges over the same user key.
133 // See IMPORTANT 2 note below.
134 //
135 // IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
136 // So keys().back() was the first key seen by iterator.
137 // TODO: Re-style this comment to be like the first one
138 const std::deque<std::string>& keys() const { return keys_; }
139 const std::vector<Slice>& values() const {
140 return merge_context_.GetOperands();
141 }
142 uint64_t TotalFilterTime() const { return total_filter_time_; }
143 bool HasOperator() const { return user_merge_operator_ != nullptr; }
144
145 // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
146 // return true and fill *until with the key to which we should skip.
147 // If true, keys() and values() are empty.
148 bool FilteredUntil(Slice* skip_until) const {
149 if (!has_compaction_filter_skip_until_) {
150 return false;
151 }
152 assert(compaction_filter_ != nullptr);
153 assert(skip_until != nullptr);
154 assert(compaction_filter_skip_until_.Valid());
155 *skip_until = compaction_filter_skip_until_.Encode();
156 return true;
157 }
158
159 private:
160 Env* env_;
1e59de90 161 SystemClock* clock_;
7c673cae
FG
162 const Comparator* user_comparator_;
163 const MergeOperator* user_merge_operator_;
164 const CompactionFilter* compaction_filter_;
165 const std::atomic<bool>* shutting_down_;
166 Logger* logger_;
1e59de90 167 bool assert_valid_internal_key_; // enforce no internal key corruption?
11fdf7f2 168 bool allow_single_operand_;
7c673cae 169 SequenceNumber latest_snapshot_;
11fdf7f2 170 const SnapshotChecker* const snapshot_checker_;
7c673cae
FG
171 int level_;
172
173 // the scratch area that holds the result of MergeUntil
174 // valid up to the next MergeUntil call
175
176 // Keeps track of the sequence of keys seen
177 std::deque<std::string> keys_;
178 // Parallel with keys_; stores the operands
179 mutable MergeContext merge_context_;
180
181 StopWatchNano filter_timer_;
182 uint64_t total_filter_time_;
183 Statistics* stats_;
184
185 bool has_compaction_filter_skip_until_ = false;
186 std::string compaction_filter_value_;
187 InternalKey compaction_filter_skip_until_;
188
189 bool IsShuttingDown() {
190 // This is a best-effort facility, so memory_order_relaxed is sufficient.
191 return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
192 }
193};
194
195// MergeOutputIterator can be used to iterate over the result of a merge.
196class MergeOutputIterator {
197 public:
198 // The MergeOutputIterator is bound to a MergeHelper instance.
199 explicit MergeOutputIterator(const MergeHelper* merge_helper);
200
201 // Seeks to the first record in the output.
202 void SeekToFirst();
203 // Advances to the next record in the output.
204 void Next();
205
206 Slice key() { return Slice(*it_keys_); }
207 Slice value() { return Slice(*it_values_); }
208 bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }
209
210 private:
211 const MergeHelper* merge_helper_;
212 std::deque<std::string>::const_reverse_iterator it_keys_;
213 std::vector<Slice>::const_reverse_iterator it_values_;
214};
215
f67539c2 216} // namespace ROCKSDB_NAMESPACE