]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style license that can be | |
3 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
4 | ||
5 | #pragma once | |
6 | ||
7 | #ifndef ROCKSDB_LITE | |
8 | #include <deque> | |
9 | #include <string> | |
10 | #include <vector> | |
11 | ||
12 | #include "rocksdb/db.h" | |
13 | #include "rocksdb/env.h" | |
14 | #include "rocksdb/compaction_filter.h" | |
15 | #include "rocksdb/merge_operator.h" | |
16 | #include "rocksdb/utilities/utility_db.h" | |
17 | #include "rocksdb/utilities/db_ttl.h" | |
18 | #include "db/db_impl.h" | |
19 | ||
20 | #ifdef _WIN32 | |
21 | // Windows API macro interference | |
22 | #undef GetCurrentTime | |
23 | #endif | |
24 | ||
25 | ||
26 | namespace rocksdb { | |
27 | ||
28 | class DBWithTTLImpl : public DBWithTTL { | |
29 | public: | |
30 | static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, | |
31 | Env* env); | |
32 | ||
33 | explicit DBWithTTLImpl(DB* db); | |
34 | ||
35 | virtual ~DBWithTTLImpl(); | |
36 | ||
37 | Status CreateColumnFamilyWithTtl(const ColumnFamilyOptions& options, | |
38 | const std::string& column_family_name, | |
39 | ColumnFamilyHandle** handle, | |
40 | int ttl) override; | |
41 | ||
42 | Status CreateColumnFamily(const ColumnFamilyOptions& options, | |
43 | const std::string& column_family_name, | |
44 | ColumnFamilyHandle** handle) override; | |
45 | ||
46 | using StackableDB::Put; | |
47 | virtual Status Put(const WriteOptions& options, | |
48 | ColumnFamilyHandle* column_family, const Slice& key, | |
49 | const Slice& val) override; | |
50 | ||
51 | using StackableDB::Get; | |
52 | virtual Status Get(const ReadOptions& options, | |
53 | ColumnFamilyHandle* column_family, const Slice& key, | |
54 | PinnableSlice* value) override; | |
55 | ||
56 | using StackableDB::MultiGet; | |
57 | virtual std::vector<Status> MultiGet( | |
58 | const ReadOptions& options, | |
59 | const std::vector<ColumnFamilyHandle*>& column_family, | |
60 | const std::vector<Slice>& keys, | |
61 | std::vector<std::string>* values) override; | |
62 | ||
63 | using StackableDB::KeyMayExist; | |
64 | virtual bool KeyMayExist(const ReadOptions& options, | |
65 | ColumnFamilyHandle* column_family, const Slice& key, | |
66 | std::string* value, | |
67 | bool* value_found = nullptr) override; | |
68 | ||
69 | using StackableDB::Merge; | |
70 | virtual Status Merge(const WriteOptions& options, | |
71 | ColumnFamilyHandle* column_family, const Slice& key, | |
72 | const Slice& value) override; | |
73 | ||
74 | virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; | |
75 | ||
76 | using StackableDB::NewIterator; | |
77 | virtual Iterator* NewIterator(const ReadOptions& opts, | |
78 | ColumnFamilyHandle* column_family) override; | |
79 | ||
80 | virtual DB* GetBaseDB() override { return db_; } | |
81 | ||
82 | static bool IsStale(const Slice& value, int32_t ttl, Env* env); | |
83 | ||
84 | static Status AppendTS(const Slice& val, std::string* val_with_ts, Env* env); | |
85 | ||
86 | static Status SanityCheckTimestamp(const Slice& str); | |
87 | ||
88 | static Status StripTS(std::string* str); | |
89 | ||
90 | static Status StripTS(PinnableSlice* str); | |
91 | ||
92 | static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp | |
93 | ||
94 | static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8 | |
95 | ||
96 | static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8 | |
97 | }; | |
98 | ||
99 | class TtlIterator : public Iterator { | |
100 | ||
101 | public: | |
102 | explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); } | |
103 | ||
104 | ~TtlIterator() { delete iter_; } | |
105 | ||
106 | bool Valid() const override { return iter_->Valid(); } | |
107 | ||
108 | void SeekToFirst() override { iter_->SeekToFirst(); } | |
109 | ||
110 | void SeekToLast() override { iter_->SeekToLast(); } | |
111 | ||
112 | void Seek(const Slice& target) override { iter_->Seek(target); } | |
113 | ||
114 | void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); } | |
115 | ||
116 | void Next() override { iter_->Next(); } | |
117 | ||
118 | void Prev() override { iter_->Prev(); } | |
119 | ||
120 | Slice key() const override { return iter_->key(); } | |
121 | ||
122 | int32_t timestamp() const { | |
123 | return DecodeFixed32(iter_->value().data() + iter_->value().size() - | |
124 | DBWithTTLImpl::kTSLength); | |
125 | } | |
126 | ||
127 | Slice value() const override { | |
128 | // TODO: handle timestamp corruption like in general iterator semantics | |
129 | assert(DBWithTTLImpl::SanityCheckTimestamp(iter_->value()).ok()); | |
130 | Slice trimmed_value = iter_->value(); | |
131 | trimmed_value.size_ -= DBWithTTLImpl::kTSLength; | |
132 | return trimmed_value; | |
133 | } | |
134 | ||
135 | Status status() const override { return iter_->status(); } | |
136 | ||
137 | private: | |
138 | Iterator* iter_; | |
139 | }; | |
140 | ||
141 | class TtlCompactionFilter : public CompactionFilter { | |
142 | public: | |
143 | TtlCompactionFilter( | |
144 | int32_t ttl, Env* env, const CompactionFilter* user_comp_filter, | |
145 | std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory = | |
146 | nullptr) | |
147 | : ttl_(ttl), | |
148 | env_(env), | |
149 | user_comp_filter_(user_comp_filter), | |
150 | user_comp_filter_from_factory_( | |
151 | std::move(user_comp_filter_from_factory)) { | |
152 | // Unlike the merge operator, compaction filter is necessary for TTL, hence | |
153 | // this would be called even if user doesn't specify any compaction-filter | |
154 | if (!user_comp_filter_) { | |
155 | user_comp_filter_ = user_comp_filter_from_factory_.get(); | |
156 | } | |
157 | } | |
158 | ||
159 | virtual bool Filter(int level, const Slice& key, const Slice& old_val, | |
160 | std::string* new_val, bool* value_changed) const | |
161 | override { | |
162 | if (DBWithTTLImpl::IsStale(old_val, ttl_, env_)) { | |
163 | return true; | |
164 | } | |
165 | if (user_comp_filter_ == nullptr) { | |
166 | return false; | |
167 | } | |
168 | assert(old_val.size() >= DBWithTTLImpl::kTSLength); | |
169 | Slice old_val_without_ts(old_val.data(), | |
170 | old_val.size() - DBWithTTLImpl::kTSLength); | |
171 | if (user_comp_filter_->Filter(level, key, old_val_without_ts, new_val, | |
172 | value_changed)) { | |
173 | return true; | |
174 | } | |
175 | if (*value_changed) { | |
176 | new_val->append( | |
177 | old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength, | |
178 | DBWithTTLImpl::kTSLength); | |
179 | } | |
180 | return false; | |
181 | } | |
182 | ||
183 | virtual const char* Name() const override { return "Delete By TTL"; } | |
184 | ||
185 | private: | |
186 | int32_t ttl_; | |
187 | Env* env_; | |
188 | const CompactionFilter* user_comp_filter_; | |
189 | std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory_; | |
190 | }; | |
191 | ||
192 | class TtlCompactionFilterFactory : public CompactionFilterFactory { | |
193 | public: | |
194 | TtlCompactionFilterFactory( | |
195 | int32_t ttl, Env* env, | |
196 | std::shared_ptr<CompactionFilterFactory> comp_filter_factory) | |
197 | : ttl_(ttl), env_(env), user_comp_filter_factory_(comp_filter_factory) {} | |
198 | ||
199 | virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( | |
200 | const CompactionFilter::Context& context) override { | |
201 | std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory = | |
202 | nullptr; | |
203 | if (user_comp_filter_factory_) { | |
204 | user_comp_filter_from_factory = | |
205 | user_comp_filter_factory_->CreateCompactionFilter(context); | |
206 | } | |
207 | ||
208 | return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter( | |
209 | ttl_, env_, nullptr, std::move(user_comp_filter_from_factory))); | |
210 | } | |
211 | ||
212 | virtual const char* Name() const override { | |
213 | return "TtlCompactionFilterFactory"; | |
214 | } | |
215 | ||
216 | private: | |
217 | int32_t ttl_; | |
218 | Env* env_; | |
219 | std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_; | |
220 | }; | |
221 | ||
222 | class TtlMergeOperator : public MergeOperator { | |
223 | ||
224 | public: | |
225 | explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op, | |
226 | Env* env) | |
227 | : user_merge_op_(merge_op), env_(env) { | |
228 | assert(merge_op); | |
229 | assert(env); | |
230 | } | |
231 | ||
232 | virtual bool FullMergeV2(const MergeOperationInput& merge_in, | |
233 | MergeOperationOutput* merge_out) const override { | |
234 | const uint32_t ts_len = DBWithTTLImpl::kTSLength; | |
235 | if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) { | |
236 | ROCKS_LOG_ERROR(merge_in.logger, | |
237 | "Error: Could not remove timestamp from existing value."); | |
238 | return false; | |
239 | } | |
240 | ||
241 | // Extract time-stamp from each operand to be passed to user_merge_op_ | |
242 | std::vector<Slice> operands_without_ts; | |
243 | for (const auto& operand : merge_in.operand_list) { | |
244 | if (operand.size() < ts_len) { | |
245 | ROCKS_LOG_ERROR( | |
246 | merge_in.logger, | |
247 | "Error: Could not remove timestamp from operand value."); | |
248 | return false; | |
249 | } | |
250 | operands_without_ts.push_back(operand); | |
251 | operands_without_ts.back().remove_suffix(ts_len); | |
252 | } | |
253 | ||
254 | // Apply the user merge operator (store result in *new_value) | |
255 | bool good = true; | |
256 | MergeOperationOutput user_merge_out(merge_out->new_value, | |
257 | merge_out->existing_operand); | |
258 | if (merge_in.existing_value) { | |
259 | Slice existing_value_without_ts(merge_in.existing_value->data(), | |
260 | merge_in.existing_value->size() - ts_len); | |
261 | good = user_merge_op_->FullMergeV2( | |
262 | MergeOperationInput(merge_in.key, &existing_value_without_ts, | |
263 | operands_without_ts, merge_in.logger), | |
264 | &user_merge_out); | |
265 | } else { | |
266 | good = user_merge_op_->FullMergeV2( | |
267 | MergeOperationInput(merge_in.key, nullptr, operands_without_ts, | |
268 | merge_in.logger), | |
269 | &user_merge_out); | |
270 | } | |
271 | ||
272 | // Return false if the user merge operator returned false | |
273 | if (!good) { | |
274 | return false; | |
275 | } | |
276 | ||
277 | if (merge_out->existing_operand.data()) { | |
278 | merge_out->new_value.assign(merge_out->existing_operand.data(), | |
279 | merge_out->existing_operand.size()); | |
280 | merge_out->existing_operand = Slice(nullptr, 0); | |
281 | } | |
282 | ||
283 | // Augment the *new_value with the ttl time-stamp | |
284 | int64_t curtime; | |
285 | if (!env_->GetCurrentTime(&curtime).ok()) { | |
286 | ROCKS_LOG_ERROR( | |
287 | merge_in.logger, | |
288 | "Error: Could not get current time to be attached internally " | |
289 | "to the new value."); | |
290 | return false; | |
291 | } else { | |
292 | char ts_string[ts_len]; | |
293 | EncodeFixed32(ts_string, (int32_t)curtime); | |
294 | merge_out->new_value.append(ts_string, ts_len); | |
295 | return true; | |
296 | } | |
297 | } | |
298 | ||
299 | virtual bool PartialMergeMulti(const Slice& key, | |
300 | const std::deque<Slice>& operand_list, | |
301 | std::string* new_value, Logger* logger) const | |
302 | override { | |
303 | const uint32_t ts_len = DBWithTTLImpl::kTSLength; | |
304 | std::deque<Slice> operands_without_ts; | |
305 | ||
306 | for (const auto& operand : operand_list) { | |
307 | if (operand.size() < ts_len) { | |
308 | ROCKS_LOG_ERROR(logger, | |
309 | "Error: Could not remove timestamp from value."); | |
310 | return false; | |
311 | } | |
312 | ||
313 | operands_without_ts.push_back( | |
314 | Slice(operand.data(), operand.size() - ts_len)); | |
315 | } | |
316 | ||
317 | // Apply the user partial-merge operator (store result in *new_value) | |
318 | assert(new_value); | |
319 | if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value, | |
320 | logger)) { | |
321 | return false; | |
322 | } | |
323 | ||
324 | // Augment the *new_value with the ttl time-stamp | |
325 | int64_t curtime; | |
326 | if (!env_->GetCurrentTime(&curtime).ok()) { | |
327 | ROCKS_LOG_ERROR( | |
328 | logger, | |
329 | "Error: Could not get current time to be attached internally " | |
330 | "to the new value."); | |
331 | return false; | |
332 | } else { | |
333 | char ts_string[ts_len]; | |
334 | EncodeFixed32(ts_string, (int32_t)curtime); | |
335 | new_value->append(ts_string, ts_len); | |
336 | return true; | |
337 | } | |
338 | } | |
339 | ||
340 | virtual const char* Name() const override { return "Merge By TTL"; } | |
341 | ||
342 | private: | |
343 | std::shared_ptr<MergeOperator> user_merge_op_; | |
344 | Env* env_; | |
345 | }; | |
346 | } | |
347 | #endif // ROCKSDB_LITE |