]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/ttl/db_ttl_impl.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / utilities / ttl / db_ttl_impl.h
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #pragma once
6
7 #ifndef ROCKSDB_LITE
8 #include <deque>
9 #include <string>
10 #include <vector>
11
12 #include "rocksdb/db.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/compaction_filter.h"
15 #include "rocksdb/merge_operator.h"
16 #include "rocksdb/utilities/utility_db.h"
17 #include "rocksdb/utilities/db_ttl.h"
18 #include "db/db_impl.h"
19
20 #ifdef _WIN32
21 // Windows API macro interference
22 #undef GetCurrentTime
23 #endif
24
25
26 namespace rocksdb {
27
28 class DBWithTTLImpl : public DBWithTTL {
29 public:
30 static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
31 Env* env);
32
33 explicit DBWithTTLImpl(DB* db);
34
35 virtual ~DBWithTTLImpl();
36
37 Status CreateColumnFamilyWithTtl(const ColumnFamilyOptions& options,
38 const std::string& column_family_name,
39 ColumnFamilyHandle** handle,
40 int ttl) override;
41
42 Status CreateColumnFamily(const ColumnFamilyOptions& options,
43 const std::string& column_family_name,
44 ColumnFamilyHandle** handle) override;
45
46 using StackableDB::Put;
47 virtual Status Put(const WriteOptions& options,
48 ColumnFamilyHandle* column_family, const Slice& key,
49 const Slice& val) override;
50
51 using StackableDB::Get;
52 virtual Status Get(const ReadOptions& options,
53 ColumnFamilyHandle* column_family, const Slice& key,
54 PinnableSlice* value) override;
55
56 using StackableDB::MultiGet;
57 virtual std::vector<Status> MultiGet(
58 const ReadOptions& options,
59 const std::vector<ColumnFamilyHandle*>& column_family,
60 const std::vector<Slice>& keys,
61 std::vector<std::string>* values) override;
62
63 using StackableDB::KeyMayExist;
64 virtual bool KeyMayExist(const ReadOptions& options,
65 ColumnFamilyHandle* column_family, const Slice& key,
66 std::string* value,
67 bool* value_found = nullptr) override;
68
69 using StackableDB::Merge;
70 virtual Status Merge(const WriteOptions& options,
71 ColumnFamilyHandle* column_family, const Slice& key,
72 const Slice& value) override;
73
74 virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
75
76 using StackableDB::NewIterator;
77 virtual Iterator* NewIterator(const ReadOptions& opts,
78 ColumnFamilyHandle* column_family) override;
79
80 virtual DB* GetBaseDB() override { return db_; }
81
82 static bool IsStale(const Slice& value, int32_t ttl, Env* env);
83
84 static Status AppendTS(const Slice& val, std::string* val_with_ts, Env* env);
85
86 static Status SanityCheckTimestamp(const Slice& str);
87
88 static Status StripTS(std::string* str);
89
90 static Status StripTS(PinnableSlice* str);
91
92 static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
93
94 static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
95
96 static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
97 };
98
99 class TtlIterator : public Iterator {
100
101 public:
102 explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); }
103
104 ~TtlIterator() { delete iter_; }
105
106 bool Valid() const override { return iter_->Valid(); }
107
108 void SeekToFirst() override { iter_->SeekToFirst(); }
109
110 void SeekToLast() override { iter_->SeekToLast(); }
111
112 void Seek(const Slice& target) override { iter_->Seek(target); }
113
114 void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); }
115
116 void Next() override { iter_->Next(); }
117
118 void Prev() override { iter_->Prev(); }
119
120 Slice key() const override { return iter_->key(); }
121
122 int32_t timestamp() const {
123 return DecodeFixed32(iter_->value().data() + iter_->value().size() -
124 DBWithTTLImpl::kTSLength);
125 }
126
127 Slice value() const override {
128 // TODO: handle timestamp corruption like in general iterator semantics
129 assert(DBWithTTLImpl::SanityCheckTimestamp(iter_->value()).ok());
130 Slice trimmed_value = iter_->value();
131 trimmed_value.size_ -= DBWithTTLImpl::kTSLength;
132 return trimmed_value;
133 }
134
135 Status status() const override { return iter_->status(); }
136
137 private:
138 Iterator* iter_;
139 };
140
141 class TtlCompactionFilter : public CompactionFilter {
142 public:
143 TtlCompactionFilter(
144 int32_t ttl, Env* env, const CompactionFilter* user_comp_filter,
145 std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
146 nullptr)
147 : ttl_(ttl),
148 env_(env),
149 user_comp_filter_(user_comp_filter),
150 user_comp_filter_from_factory_(
151 std::move(user_comp_filter_from_factory)) {
152 // Unlike the merge operator, compaction filter is necessary for TTL, hence
153 // this would be called even if user doesn't specify any compaction-filter
154 if (!user_comp_filter_) {
155 user_comp_filter_ = user_comp_filter_from_factory_.get();
156 }
157 }
158
159 virtual bool Filter(int level, const Slice& key, const Slice& old_val,
160 std::string* new_val, bool* value_changed) const
161 override {
162 if (DBWithTTLImpl::IsStale(old_val, ttl_, env_)) {
163 return true;
164 }
165 if (user_comp_filter_ == nullptr) {
166 return false;
167 }
168 assert(old_val.size() >= DBWithTTLImpl::kTSLength);
169 Slice old_val_without_ts(old_val.data(),
170 old_val.size() - DBWithTTLImpl::kTSLength);
171 if (user_comp_filter_->Filter(level, key, old_val_without_ts, new_val,
172 value_changed)) {
173 return true;
174 }
175 if (*value_changed) {
176 new_val->append(
177 old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
178 DBWithTTLImpl::kTSLength);
179 }
180 return false;
181 }
182
183 virtual const char* Name() const override { return "Delete By TTL"; }
184
185 private:
186 int32_t ttl_;
187 Env* env_;
188 const CompactionFilter* user_comp_filter_;
189 std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory_;
190 };
191
192 class TtlCompactionFilterFactory : public CompactionFilterFactory {
193 public:
194 TtlCompactionFilterFactory(
195 int32_t ttl, Env* env,
196 std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
197 : ttl_(ttl), env_(env), user_comp_filter_factory_(comp_filter_factory) {}
198
199 virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
200 const CompactionFilter::Context& context) override {
201 std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
202 nullptr;
203 if (user_comp_filter_factory_) {
204 user_comp_filter_from_factory =
205 user_comp_filter_factory_->CreateCompactionFilter(context);
206 }
207
208 return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
209 ttl_, env_, nullptr, std::move(user_comp_filter_from_factory)));
210 }
211
212 virtual const char* Name() const override {
213 return "TtlCompactionFilterFactory";
214 }
215
216 private:
217 int32_t ttl_;
218 Env* env_;
219 std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
220 };
221
222 class TtlMergeOperator : public MergeOperator {
223
224 public:
225 explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op,
226 Env* env)
227 : user_merge_op_(merge_op), env_(env) {
228 assert(merge_op);
229 assert(env);
230 }
231
232 virtual bool FullMergeV2(const MergeOperationInput& merge_in,
233 MergeOperationOutput* merge_out) const override {
234 const uint32_t ts_len = DBWithTTLImpl::kTSLength;
235 if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
236 ROCKS_LOG_ERROR(merge_in.logger,
237 "Error: Could not remove timestamp from existing value.");
238 return false;
239 }
240
241 // Extract time-stamp from each operand to be passed to user_merge_op_
242 std::vector<Slice> operands_without_ts;
243 for (const auto& operand : merge_in.operand_list) {
244 if (operand.size() < ts_len) {
245 ROCKS_LOG_ERROR(
246 merge_in.logger,
247 "Error: Could not remove timestamp from operand value.");
248 return false;
249 }
250 operands_without_ts.push_back(operand);
251 operands_without_ts.back().remove_suffix(ts_len);
252 }
253
254 // Apply the user merge operator (store result in *new_value)
255 bool good = true;
256 MergeOperationOutput user_merge_out(merge_out->new_value,
257 merge_out->existing_operand);
258 if (merge_in.existing_value) {
259 Slice existing_value_without_ts(merge_in.existing_value->data(),
260 merge_in.existing_value->size() - ts_len);
261 good = user_merge_op_->FullMergeV2(
262 MergeOperationInput(merge_in.key, &existing_value_without_ts,
263 operands_without_ts, merge_in.logger),
264 &user_merge_out);
265 } else {
266 good = user_merge_op_->FullMergeV2(
267 MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
268 merge_in.logger),
269 &user_merge_out);
270 }
271
272 // Return false if the user merge operator returned false
273 if (!good) {
274 return false;
275 }
276
277 if (merge_out->existing_operand.data()) {
278 merge_out->new_value.assign(merge_out->existing_operand.data(),
279 merge_out->existing_operand.size());
280 merge_out->existing_operand = Slice(nullptr, 0);
281 }
282
283 // Augment the *new_value with the ttl time-stamp
284 int64_t curtime;
285 if (!env_->GetCurrentTime(&curtime).ok()) {
286 ROCKS_LOG_ERROR(
287 merge_in.logger,
288 "Error: Could not get current time to be attached internally "
289 "to the new value.");
290 return false;
291 } else {
292 char ts_string[ts_len];
293 EncodeFixed32(ts_string, (int32_t)curtime);
294 merge_out->new_value.append(ts_string, ts_len);
295 return true;
296 }
297 }
298
299 virtual bool PartialMergeMulti(const Slice& key,
300 const std::deque<Slice>& operand_list,
301 std::string* new_value, Logger* logger) const
302 override {
303 const uint32_t ts_len = DBWithTTLImpl::kTSLength;
304 std::deque<Slice> operands_without_ts;
305
306 for (const auto& operand : operand_list) {
307 if (operand.size() < ts_len) {
308 ROCKS_LOG_ERROR(logger,
309 "Error: Could not remove timestamp from value.");
310 return false;
311 }
312
313 operands_without_ts.push_back(
314 Slice(operand.data(), operand.size() - ts_len));
315 }
316
317 // Apply the user partial-merge operator (store result in *new_value)
318 assert(new_value);
319 if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
320 logger)) {
321 return false;
322 }
323
324 // Augment the *new_value with the ttl time-stamp
325 int64_t curtime;
326 if (!env_->GetCurrentTime(&curtime).ok()) {
327 ROCKS_LOG_ERROR(
328 logger,
329 "Error: Could not get current time to be attached internally "
330 "to the new value.");
331 return false;
332 } else {
333 char ts_string[ts_len];
334 EncodeFixed32(ts_string, (int32_t)curtime);
335 new_value->append(ts_string, ts_len);
336 return true;
337 }
338 }
339
340 virtual const char* Name() const override { return "Merge By TTL"; }
341
342 private:
343 std::shared_ptr<MergeOperator> user_merge_op_;
344 Env* env_;
345 };
346 }
347 #endif // ROCKSDB_LITE