]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae | 5 | // |
11fdf7f2 | 6 | #pragma once |
7c673cae FG |
7 | |
8 | #include <deque> | |
9 | #include <string> | |
10 | #include <vector> | |
11 | ||
12 | #include "db/dbformat.h" | |
13 | #include "db/merge_context.h" | |
14 | #include "db/range_del_aggregator.h" | |
11fdf7f2 | 15 | #include "db/snapshot_checker.h" |
7c673cae FG |
16 | #include "rocksdb/compaction_filter.h" |
17 | #include "rocksdb/env.h" | |
18 | #include "rocksdb/slice.h" | |
19 | #include "util/stop_watch.h" | |
20 | ||
21 | namespace rocksdb { | |
22 | ||
23 | class Comparator; | |
24 | class Iterator; | |
25 | class Logger; | |
26 | class MergeOperator; | |
27 | class Statistics; | |
7c673cae FG |
28 | |
29 | class MergeHelper { | |
30 | public: | |
31 | MergeHelper(Env* env, const Comparator* user_comparator, | |
32 | const MergeOperator* user_merge_operator, | |
33 | const CompactionFilter* compaction_filter, Logger* logger, | |
34 | bool assert_valid_internal_key, SequenceNumber latest_snapshot, | |
11fdf7f2 TL |
35 | const SnapshotChecker* snapshot_checker = nullptr, int level = 0, |
36 | Statistics* stats = nullptr, | |
37 | const std::atomic<bool>* shutting_down = nullptr); | |
7c673cae FG |
38 | |
39 | // Wrapper around MergeOperator::FullMergeV2() that records perf statistics. | |
40 | // Result of merge will be written to result if status returned is OK. | |
41 | // If operands is empty, the value will simply be copied to result. | |
11fdf7f2 TL |
42 | // Set `update_num_ops_stats` to true if it is from a user read, so that |
43 | // the latency is sensitive. | |
7c673cae FG |
44 | // Returns one of the following statuses: |
45 | // - OK: Entries were successfully merged. | |
46 | // - Corruption: Merge operator reported unsuccessful merge. | |
47 | static Status TimedFullMerge(const MergeOperator* merge_operator, | |
48 | const Slice& key, const Slice* value, | |
49 | const std::vector<Slice>& operands, | |
50 | std::string* result, Logger* logger, | |
51 | Statistics* statistics, Env* env, | |
11fdf7f2 TL |
52 | Slice* result_operand = nullptr, |
53 | bool update_num_ops_stats = false); | |
7c673cae FG |
54 | |
55 | // Merge entries until we hit | |
56 | // - a corrupted key | |
57 | // - a Put/Delete, | |
58 | // - a different user key, | |
59 | // - a specific sequence number (snapshot boundary), | |
60 | // - REMOVE_AND_SKIP_UNTIL returned from compaction filter, | |
61 | // or - the end of iteration | |
62 | // iter: (IN) points to the first merge type entry | |
63 | // (OUT) points to the first entry not included in the merge process | |
64 | // range_del_agg: (IN) filters merge operands covered by range tombstones. | |
65 | // stop_before: (IN) a sequence number that merge should not cross. | |
66 | // 0 means no restriction | |
67 | // at_bottom: (IN) true if the iterator covers the bottem level, which means | |
68 | // we could reach the start of the history of this user key. | |
69 | // | |
70 | // Returns one of the following statuses: | |
71 | // - OK: Entries were successfully merged. | |
72 | // - MergeInProgress: Put/Delete not encountered, and didn't reach the start | |
73 | // of key's history. Output consists of merge operands only. | |
74 | // - Corruption: Merge operator reported unsuccessful merge or a corrupted | |
75 | // key has been encountered and not expected (applies only when compiling | |
76 | // with asserts removed). | |
77 | // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true). | |
78 | // | |
79 | // REQUIRED: The first key in the input is not corrupted. | |
80 | Status MergeUntil(InternalIterator* iter, | |
494da23a | 81 | CompactionRangeDelAggregator* range_del_agg = nullptr, |
7c673cae FG |
82 | const SequenceNumber stop_before = 0, |
83 | const bool at_bottom = false); | |
84 | ||
85 | // Filters a merge operand using the compaction filter specified | |
86 | // in the constructor. Returns the decision that the filter made. | |
87 | // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the | |
88 | // optional outputs of compaction filter. | |
89 | CompactionFilter::Decision FilterMerge(const Slice& user_key, | |
90 | const Slice& value_slice); | |
91 | ||
92 | // Query the merge result | |
93 | // These are valid until the next MergeUntil call | |
94 | // If the merging was successful: | |
95 | // - keys() contains a single element with the latest sequence number of | |
96 | // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below. | |
97 | // - values() contains a single element with the result of merging all the | |
98 | // operands together | |
99 | // | |
100 | // IMPORTANT 1: the key type could change after the MergeUntil call. | |
101 | // Put/Delete + Merge + ... + Merge => Put | |
102 | // Merge + ... + Merge => Merge | |
103 | // | |
104 | // If the merge operator is not associative, and if a Put/Delete is not found | |
105 | // then the merging will be unsuccessful. In this case: | |
106 | // - keys() contains the list of internal keys seen in order of iteration. | |
107 | // - values() contains the list of values (merges) seen in the same order. | |
108 | // values() is parallel to keys() so that the first entry in | |
109 | // keys() is the key associated with the first entry in values() | |
110 | // and so on. These lists will be the same length. | |
111 | // All of these pairs will be merges over the same user key. | |
112 | // See IMPORTANT 2 note below. | |
113 | // | |
114 | // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. | |
115 | // So keys().back() was the first key seen by iterator. | |
116 | // TODO: Re-style this comment to be like the first one | |
117 | const std::deque<std::string>& keys() const { return keys_; } | |
118 | const std::vector<Slice>& values() const { | |
119 | return merge_context_.GetOperands(); | |
120 | } | |
121 | uint64_t TotalFilterTime() const { return total_filter_time_; } | |
122 | bool HasOperator() const { return user_merge_operator_ != nullptr; } | |
123 | ||
124 | // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will | |
125 | // return true and fill *until with the key to which we should skip. | |
126 | // If true, keys() and values() are empty. | |
127 | bool FilteredUntil(Slice* skip_until) const { | |
128 | if (!has_compaction_filter_skip_until_) { | |
129 | return false; | |
130 | } | |
131 | assert(compaction_filter_ != nullptr); | |
132 | assert(skip_until != nullptr); | |
133 | assert(compaction_filter_skip_until_.Valid()); | |
134 | *skip_until = compaction_filter_skip_until_.Encode(); | |
135 | return true; | |
136 | } | |
137 | ||
138 | private: | |
139 | Env* env_; | |
140 | const Comparator* user_comparator_; | |
141 | const MergeOperator* user_merge_operator_; | |
142 | const CompactionFilter* compaction_filter_; | |
143 | const std::atomic<bool>* shutting_down_; | |
144 | Logger* logger_; | |
145 | bool assert_valid_internal_key_; // enforce no internal key corruption? | |
11fdf7f2 | 146 | bool allow_single_operand_; |
7c673cae | 147 | SequenceNumber latest_snapshot_; |
11fdf7f2 | 148 | const SnapshotChecker* const snapshot_checker_; |
7c673cae FG |
149 | int level_; |
150 | ||
151 | // the scratch area that holds the result of MergeUntil | |
152 | // valid up to the next MergeUntil call | |
153 | ||
154 | // Keeps track of the sequence of keys seen | |
155 | std::deque<std::string> keys_; | |
156 | // Parallel with keys_; stores the operands | |
157 | mutable MergeContext merge_context_; | |
158 | ||
159 | StopWatchNano filter_timer_; | |
160 | uint64_t total_filter_time_; | |
161 | Statistics* stats_; | |
162 | ||
163 | bool has_compaction_filter_skip_until_ = false; | |
164 | std::string compaction_filter_value_; | |
165 | InternalKey compaction_filter_skip_until_; | |
166 | ||
167 | bool IsShuttingDown() { | |
168 | // This is a best-effort facility, so memory_order_relaxed is sufficient. | |
169 | return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); | |
170 | } | |
171 | }; | |
172 | ||
173 | // MergeOutputIterator can be used to iterate over the result of a merge. | |
174 | class MergeOutputIterator { | |
175 | public: | |
176 | // The MergeOutputIterator is bound to a MergeHelper instance. | |
177 | explicit MergeOutputIterator(const MergeHelper* merge_helper); | |
178 | ||
179 | // Seeks to the first record in the output. | |
180 | void SeekToFirst(); | |
181 | // Advances to the next record in the output. | |
182 | void Next(); | |
183 | ||
184 | Slice key() { return Slice(*it_keys_); } | |
185 | Slice value() { return Slice(*it_values_); } | |
186 | bool Valid() { return it_keys_ != merge_helper_->keys().rend(); } | |
187 | ||
188 | private: | |
189 | const MergeHelper* merge_helper_; | |
190 | std::deque<std::string>::const_reverse_iterator it_keys_; | |
191 | std::vector<Slice>::const_reverse_iterator it_values_; | |
192 | }; | |
193 | ||
194 | } // namespace rocksdb |