]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/ttl/db_ttl_impl.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / ttl / db_ttl_impl.cc
1 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file. See the AUTHORS file for names of contributors.
5 #ifndef ROCKSDB_LITE
6
7 #include "utilities/ttl/db_ttl_impl.h"
8
9 #include "db/write_batch_internal.h"
10 #include "file/filename.h"
11 #include "rocksdb/convenience.h"
12 #include "rocksdb/env.h"
13 #include "rocksdb/iterator.h"
14 #include "rocksdb/utilities/db_ttl.h"
15 #include "util/coding.h"
16
17 namespace ROCKSDB_NAMESPACE {
18
19 void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
20 Env* env) {
21 if (options->compaction_filter) {
22 options->compaction_filter =
23 new TtlCompactionFilter(ttl, env, options->compaction_filter);
24 } else {
25 options->compaction_filter_factory =
26 std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
27 ttl, env, options->compaction_filter_factory));
28 }
29
30 if (options->merge_operator) {
31 options->merge_operator.reset(
32 new TtlMergeOperator(options->merge_operator, env));
33 }
34 }
35
36 // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
37 DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
38
39 DBWithTTLImpl::~DBWithTTLImpl() {
40 if (!closed_) {
41 Close();
42 }
43 }
44
45 Status DBWithTTLImpl::Close() {
46 Status ret = Status::OK();
47 if (!closed_) {
48 Options default_options = GetOptions();
49 // Need to stop background compaction before getting rid of the filter
50 CancelAllBackgroundWork(db_, /* wait = */ true);
51 ret = db_->Close();
52 delete default_options.compaction_filter;
53 closed_ = true;
54 }
55 return ret;
56 }
57
58 Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
59 StackableDB** dbptr, int32_t ttl, bool read_only) {
60 DBWithTTL* db;
61 Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
62 if (s.ok()) {
63 *dbptr = db;
64 } else {
65 *dbptr = nullptr;
66 }
67 return s;
68 }
69
70 Status DBWithTTL::Open(const Options& options, const std::string& dbname,
71 DBWithTTL** dbptr, int32_t ttl, bool read_only) {
72
73 DBOptions db_options(options);
74 ColumnFamilyOptions cf_options(options);
75 std::vector<ColumnFamilyDescriptor> column_families;
76 column_families.push_back(
77 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
78 std::vector<ColumnFamilyHandle*> handles;
79 Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
80 dbptr, {ttl}, read_only);
81 if (s.ok()) {
82 assert(handles.size() == 1);
83 // i can delete the handle since DBImpl is always holding a reference to
84 // default column family
85 delete handles[0];
86 }
87 return s;
88 }
89
90 Status DBWithTTL::Open(
91 const DBOptions& db_options, const std::string& dbname,
92 const std::vector<ColumnFamilyDescriptor>& column_families,
93 std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
94 std::vector<int32_t> ttls, bool read_only) {
95
96 if (ttls.size() != column_families.size()) {
97 return Status::InvalidArgument(
98 "ttls size has to be the same as number of column families");
99 }
100
101 std::vector<ColumnFamilyDescriptor> column_families_sanitized =
102 column_families;
103 for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
104 DBWithTTLImpl::SanitizeOptions(
105 ttls[i], &column_families_sanitized[i].options,
106 db_options.env == nullptr ? Env::Default() : db_options.env);
107 }
108 DB* db;
109
110 Status st;
111 if (read_only) {
112 st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
113 handles, &db);
114 } else {
115 st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
116 }
117 if (st.ok()) {
118 *dbptr = new DBWithTTLImpl(db);
119 } else {
120 *dbptr = nullptr;
121 }
122 return st;
123 }
124
125 Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
126 const ColumnFamilyOptions& options, const std::string& column_family_name,
127 ColumnFamilyHandle** handle, int ttl) {
128 ColumnFamilyOptions sanitized_options = options;
129 DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv());
130
131 return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
132 handle);
133 }
134
135 Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
136 const std::string& column_family_name,
137 ColumnFamilyHandle** handle) {
138 return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
139 }
140
141 // Appends the current timestamp to the string.
142 // Returns false if could not get the current_time, true if append succeeds
143 Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
144 Env* env) {
145 val_with_ts->reserve(kTSLength + val.size());
146 char ts_string[kTSLength];
147 int64_t curtime;
148 Status st = env->GetCurrentTime(&curtime);
149 if (!st.ok()) {
150 return st;
151 }
152 EncodeFixed32(ts_string, (int32_t)curtime);
153 val_with_ts->append(val.data(), val.size());
154 val_with_ts->append(ts_string, kTSLength);
155 return st;
156 }
157
158 // Returns corruption if the length of the string is lesser than timestamp, or
159 // timestamp refers to a time lesser than ttl-feature release time
160 Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
161 if (str.size() < kTSLength) {
162 return Status::Corruption("Error: value's length less than timestamp's\n");
163 }
164 // Checks that TS is not lesser than kMinTimestamp
165 // Gaurds against corruption & normal database opened incorrectly in ttl mode
166 int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
167 if (timestamp_value < kMinTimestamp) {
168 return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
169 }
170 return Status::OK();
171 }
172
173 // Checks if the string is stale or not according to TTl provided
174 bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
175 if (ttl <= 0) { // Data is fresh if TTL is non-positive
176 return false;
177 }
178 int64_t curtime;
179 if (!env->GetCurrentTime(&curtime).ok()) {
180 return false; // Treat the data as fresh if could not get current time
181 }
182 int32_t timestamp_value =
183 DecodeFixed32(value.data() + value.size() - kTSLength);
184 return (timestamp_value + ttl) < curtime;
185 }
186
187 // Strips the TS from the end of the slice
188 Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
189 Status st;
190 if (pinnable_val->size() < kTSLength) {
191 return Status::Corruption("Bad timestamp in key-value");
192 }
193 // Erasing characters which hold the TS
194 pinnable_val->remove_suffix(kTSLength);
195 return st;
196 }
197
198 // Strips the TS from the end of the string
199 Status DBWithTTLImpl::StripTS(std::string* str) {
200 Status st;
201 if (str->length() < kTSLength) {
202 return Status::Corruption("Bad timestamp in key-value");
203 }
204 // Erasing characters which hold the TS
205 str->erase(str->length() - kTSLength, kTSLength);
206 return st;
207 }
208
209 Status DBWithTTLImpl::Put(const WriteOptions& options,
210 ColumnFamilyHandle* column_family, const Slice& key,
211 const Slice& val) {
212 WriteBatch batch;
213 batch.Put(column_family, key, val);
214 return Write(options, &batch);
215 }
216
217 Status DBWithTTLImpl::Get(const ReadOptions& options,
218 ColumnFamilyHandle* column_family, const Slice& key,
219 PinnableSlice* value) {
220 Status st = db_->Get(options, column_family, key, value);
221 if (!st.ok()) {
222 return st;
223 }
224 st = SanityCheckTimestamp(*value);
225 if (!st.ok()) {
226 return st;
227 }
228 return StripTS(value);
229 }
230
231 std::vector<Status> DBWithTTLImpl::MultiGet(
232 const ReadOptions& options,
233 const std::vector<ColumnFamilyHandle*>& column_family,
234 const std::vector<Slice>& keys, std::vector<std::string>* values) {
235 auto statuses = db_->MultiGet(options, column_family, keys, values);
236 for (size_t i = 0; i < keys.size(); ++i) {
237 if (!statuses[i].ok()) {
238 continue;
239 }
240 statuses[i] = SanityCheckTimestamp((*values)[i]);
241 if (!statuses[i].ok()) {
242 continue;
243 }
244 statuses[i] = StripTS(&(*values)[i]);
245 }
246 return statuses;
247 }
248
249 bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
250 ColumnFamilyHandle* column_family,
251 const Slice& key, std::string* value,
252 bool* value_found) {
253 bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
254 if (ret && value != nullptr && value_found != nullptr && *value_found) {
255 if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
256 return false;
257 }
258 }
259 return ret;
260 }
261
262 Status DBWithTTLImpl::Merge(const WriteOptions& options,
263 ColumnFamilyHandle* column_family, const Slice& key,
264 const Slice& value) {
265 WriteBatch batch;
266 batch.Merge(column_family, key, value);
267 return Write(options, &batch);
268 }
269
270 Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
271 class Handler : public WriteBatch::Handler {
272 public:
273 explicit Handler(Env* env) : env_(env) {}
274 WriteBatch updates_ttl;
275 Status batch_rewrite_status;
276 Status PutCF(uint32_t column_family_id, const Slice& key,
277 const Slice& value) override {
278 std::string value_with_ts;
279 Status st = AppendTS(value, &value_with_ts, env_);
280 if (!st.ok()) {
281 batch_rewrite_status = st;
282 } else {
283 WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
284 value_with_ts);
285 }
286 return Status::OK();
287 }
288 Status MergeCF(uint32_t column_family_id, const Slice& key,
289 const Slice& value) override {
290 std::string value_with_ts;
291 Status st = AppendTS(value, &value_with_ts, env_);
292 if (!st.ok()) {
293 batch_rewrite_status = st;
294 } else {
295 WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
296 value_with_ts);
297 }
298 return Status::OK();
299 }
300 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
301 WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
302 return Status::OK();
303 }
304 void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); }
305
306 private:
307 Env* env_;
308 };
309 Handler handler(GetEnv());
310 updates->Iterate(&handler);
311 if (!handler.batch_rewrite_status.ok()) {
312 return handler.batch_rewrite_status;
313 } else {
314 return db_->Write(opts, &(handler.updates_ttl));
315 }
316 }
317
318 Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
319 ColumnFamilyHandle* column_family) {
320 return new TtlIterator(db_->NewIterator(opts, column_family));
321 }
322
323 void DBWithTTLImpl::SetTtl(ColumnFamilyHandle *h, int32_t ttl) {
324 std::shared_ptr<TtlCompactionFilterFactory> filter;
325 Options opts;
326 opts = GetOptions(h);
327 filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
328 opts.compaction_filter_factory);
329 if (!filter)
330 return;
331 filter->SetTtl(ttl);
332 }
333
334 } // namespace ROCKSDB_NAMESPACE
335 #endif // ROCKSDB_LITE