]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae | 5 | // |
11fdf7f2 | 6 | #pragma once |
7c673cae FG |
7 | |
8 | #include <deque> | |
9 | #include <string> | |
10 | #include <vector> | |
11 | ||
7c673cae FG |
12 | #include "db/merge_context.h" |
13 | #include "db/range_del_aggregator.h" | |
11fdf7f2 | 14 | #include "db/snapshot_checker.h" |
7c673cae FG |
15 | #include "rocksdb/compaction_filter.h" |
16 | #include "rocksdb/env.h" | |
1e59de90 | 17 | #include "rocksdb/merge_operator.h" |
7c673cae | 18 | #include "rocksdb/slice.h" |
1e59de90 | 19 | #include "rocksdb/wide_columns.h" |
7c673cae FG |
20 | #include "util/stop_watch.h" |
21 | ||
f67539c2 | 22 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
23 | |
24 | class Comparator; | |
25 | class Iterator; | |
26 | class Logger; | |
27 | class MergeOperator; | |
28 | class Statistics; | |
1e59de90 TL |
29 | class SystemClock; |
30 | class BlobFetcher; | |
31 | class PrefetchBufferCollection; | |
32 | struct CompactionIterationStats; | |
7c673cae FG |
33 | |
34 | class MergeHelper { | |
35 | public: | |
36 | MergeHelper(Env* env, const Comparator* user_comparator, | |
37 | const MergeOperator* user_merge_operator, | |
38 | const CompactionFilter* compaction_filter, Logger* logger, | |
39 | bool assert_valid_internal_key, SequenceNumber latest_snapshot, | |
11fdf7f2 TL |
40 | const SnapshotChecker* snapshot_checker = nullptr, int level = 0, |
41 | Statistics* stats = nullptr, | |
42 | const std::atomic<bool>* shutting_down = nullptr); | |
7c673cae FG |
43 | |
44 | // Wrapper around MergeOperator::FullMergeV2() that records perf statistics. | |
45 | // Result of merge will be written to result if status returned is OK. | |
46 | // If operands is empty, the value will simply be copied to result. | |
11fdf7f2 TL |
47 | // Set `update_num_ops_stats` to true if it is from a user read, so that |
48 | // the latency is sensitive. | |
7c673cae FG |
49 | // Returns one of the following statuses: |
50 | // - OK: Entries were successfully merged. | |
51 | // - Corruption: Merge operator reported unsuccessful merge. | |
52 | static Status TimedFullMerge(const MergeOperator* merge_operator, | |
53 | const Slice& key, const Slice* value, | |
54 | const std::vector<Slice>& operands, | |
55 | std::string* result, Logger* logger, | |
1e59de90 TL |
56 | Statistics* statistics, SystemClock* clock, |
57 | Slice* result_operand, | |
58 | bool update_num_ops_stats); | |
7c673cae | 59 | |
1e59de90 TL |
60 | static Status TimedFullMergeWithEntity( |
61 | const MergeOperator* merge_operator, const Slice& key, Slice base_entity, | |
62 | const std::vector<Slice>& operands, std::string* result, Logger* logger, | |
63 | Statistics* statistics, SystemClock* clock, bool update_num_ops_stats); | |
64 | ||
65 | // During compaction, merge entries until we hit | |
7c673cae FG |
66 | // - a corrupted key |
67 | // - a Put/Delete, | |
68 | // - a different user key, | |
69 | // - a specific sequence number (snapshot boundary), | |
70 | // - REMOVE_AND_SKIP_UNTIL returned from compaction filter, | |
71 | // or - the end of iteration | |
72 | // iter: (IN) points to the first merge type entry | |
73 | // (OUT) points to the first entry not included in the merge process | |
74 | // range_del_agg: (IN) filters merge operands covered by range tombstones. | |
75 | // stop_before: (IN) a sequence number that merge should not cross. | |
76 | // 0 means no restriction | |
77 | // at_bottom: (IN) true if the iterator covers the bottem level, which means | |
78 | // we could reach the start of the history of this user key. | |
20effc67 TL |
79 | // allow_data_in_errors: (IN) if true, data details will be displayed in |
80 | // error/log messages. | |
1e59de90 TL |
81 | // blob_fetcher: (IN) blob fetcher object for the compaction's input version. |
82 | // prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers | |
83 | // used for compaction readahead. | |
84 | // c_iter_stats: (OUT) compaction iteration statistics. | |
7c673cae FG |
85 | // |
86 | // Returns one of the following statuses: | |
87 | // - OK: Entries were successfully merged. | |
88 | // - MergeInProgress: Put/Delete not encountered, and didn't reach the start | |
89 | // of key's history. Output consists of merge operands only. | |
90 | // - Corruption: Merge operator reported unsuccessful merge or a corrupted | |
91 | // key has been encountered and not expected (applies only when compiling | |
92 | // with asserts removed). | |
93 | // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true). | |
94 | // | |
95 | // REQUIRED: The first key in the input is not corrupted. | |
96 | Status MergeUntil(InternalIterator* iter, | |
1e59de90 TL |
97 | CompactionRangeDelAggregator* range_del_agg, |
98 | const SequenceNumber stop_before, const bool at_bottom, | |
99 | const bool allow_data_in_errors, | |
100 | const BlobFetcher* blob_fetcher, | |
101 | const std::string* const full_history_ts_low, | |
102 | PrefetchBufferCollection* prefetch_buffers, | |
103 | CompactionIterationStats* c_iter_stats); | |
7c673cae FG |
104 | |
105 | // Filters a merge operand using the compaction filter specified | |
106 | // in the constructor. Returns the decision that the filter made. | |
107 | // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the | |
108 | // optional outputs of compaction filter. | |
1e59de90 | 109 | // user_key includes timestamp if user-defined timestamp is enabled. |
7c673cae FG |
110 | CompactionFilter::Decision FilterMerge(const Slice& user_key, |
111 | const Slice& value_slice); | |
112 | ||
113 | // Query the merge result | |
114 | // These are valid until the next MergeUntil call | |
115 | // If the merging was successful: | |
116 | // - keys() contains a single element with the latest sequence number of | |
117 | // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below. | |
118 | // - values() contains a single element with the result of merging all the | |
119 | // operands together | |
120 | // | |
121 | // IMPORTANT 1: the key type could change after the MergeUntil call. | |
122 | // Put/Delete + Merge + ... + Merge => Put | |
123 | // Merge + ... + Merge => Merge | |
124 | // | |
125 | // If the merge operator is not associative, and if a Put/Delete is not found | |
126 | // then the merging will be unsuccessful. In this case: | |
127 | // - keys() contains the list of internal keys seen in order of iteration. | |
128 | // - values() contains the list of values (merges) seen in the same order. | |
129 | // values() is parallel to keys() so that the first entry in | |
130 | // keys() is the key associated with the first entry in values() | |
131 | // and so on. These lists will be the same length. | |
132 | // All of these pairs will be merges over the same user key. | |
133 | // See IMPORTANT 2 note below. | |
134 | // | |
135 | // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. | |
136 | // So keys().back() was the first key seen by iterator. | |
137 | // TODO: Re-style this comment to be like the first one | |
138 | const std::deque<std::string>& keys() const { return keys_; } | |
139 | const std::vector<Slice>& values() const { | |
140 | return merge_context_.GetOperands(); | |
141 | } | |
142 | uint64_t TotalFilterTime() const { return total_filter_time_; } | |
143 | bool HasOperator() const { return user_merge_operator_ != nullptr; } | |
144 | ||
145 | // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will | |
146 | // return true and fill *until with the key to which we should skip. | |
147 | // If true, keys() and values() are empty. | |
148 | bool FilteredUntil(Slice* skip_until) const { | |
149 | if (!has_compaction_filter_skip_until_) { | |
150 | return false; | |
151 | } | |
152 | assert(compaction_filter_ != nullptr); | |
153 | assert(skip_until != nullptr); | |
154 | assert(compaction_filter_skip_until_.Valid()); | |
155 | *skip_until = compaction_filter_skip_until_.Encode(); | |
156 | return true; | |
157 | } | |
158 | ||
159 | private: | |
160 | Env* env_; | |
1e59de90 | 161 | SystemClock* clock_; |
7c673cae FG |
162 | const Comparator* user_comparator_; |
163 | const MergeOperator* user_merge_operator_; | |
164 | const CompactionFilter* compaction_filter_; | |
165 | const std::atomic<bool>* shutting_down_; | |
166 | Logger* logger_; | |
1e59de90 | 167 | bool assert_valid_internal_key_; // enforce no internal key corruption? |
11fdf7f2 | 168 | bool allow_single_operand_; |
7c673cae | 169 | SequenceNumber latest_snapshot_; |
11fdf7f2 | 170 | const SnapshotChecker* const snapshot_checker_; |
7c673cae FG |
171 | int level_; |
172 | ||
173 | // the scratch area that holds the result of MergeUntil | |
174 | // valid up to the next MergeUntil call | |
175 | ||
176 | // Keeps track of the sequence of keys seen | |
177 | std::deque<std::string> keys_; | |
178 | // Parallel with keys_; stores the operands | |
179 | mutable MergeContext merge_context_; | |
180 | ||
181 | StopWatchNano filter_timer_; | |
182 | uint64_t total_filter_time_; | |
183 | Statistics* stats_; | |
184 | ||
185 | bool has_compaction_filter_skip_until_ = false; | |
186 | std::string compaction_filter_value_; | |
187 | InternalKey compaction_filter_skip_until_; | |
188 | ||
189 | bool IsShuttingDown() { | |
190 | // This is a best-effort facility, so memory_order_relaxed is sufficient. | |
191 | return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); | |
192 | } | |
193 | }; | |
194 | ||
195 | // MergeOutputIterator can be used to iterate over the result of a merge. | |
196 | class MergeOutputIterator { | |
197 | public: | |
198 | // The MergeOutputIterator is bound to a MergeHelper instance. | |
199 | explicit MergeOutputIterator(const MergeHelper* merge_helper); | |
200 | ||
201 | // Seeks to the first record in the output. | |
202 | void SeekToFirst(); | |
203 | // Advances to the next record in the output. | |
204 | void Next(); | |
205 | ||
206 | Slice key() { return Slice(*it_keys_); } | |
207 | Slice value() { return Slice(*it_values_); } | |
208 | bool Valid() { return it_keys_ != merge_helper_->keys().rend(); } | |
209 | ||
210 | private: | |
211 | const MergeHelper* merge_helper_; | |
212 | std::deque<std::string>::const_reverse_iterator it_keys_; | |
213 | std::vector<Slice>::const_reverse_iterator it_values_; | |
214 | }; | |
215 | ||
f67539c2 | 216 | } // namespace ROCKSDB_NAMESPACE |