]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/ttl/db_ttl_impl.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / utilities / ttl / db_ttl_impl.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 #ifndef ROCKSDB_LITE
5
6 #include "utilities/ttl/db_ttl_impl.h"
7
8 #include "db/write_batch_internal.h"
9 #include "rocksdb/convenience.h"
10 #include "rocksdb/env.h"
11 #include "rocksdb/iterator.h"
12 #include "rocksdb/utilities/db_ttl.h"
13 #include "util/coding.h"
14 #include "util/filename.h"
15
16 namespace rocksdb {
17
18 void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
19 Env* env) {
20 if (options->compaction_filter) {
21 options->compaction_filter =
22 new TtlCompactionFilter(ttl, env, options->compaction_filter);
23 } else {
24 options->compaction_filter_factory =
25 std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
26 ttl, env, options->compaction_filter_factory));
27 }
28
29 if (options->merge_operator) {
30 options->merge_operator.reset(
31 new TtlMergeOperator(options->merge_operator, env));
32 }
33 }
34
35 // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
36 DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {}
37
38 DBWithTTLImpl::~DBWithTTLImpl() {
39 // Need to stop background compaction before getting rid of the filter
40 CancelAllBackgroundWork(db_, /* wait = */ true);
41 delete GetOptions().compaction_filter;
42 }
43
44 Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
45 StackableDB** dbptr, int32_t ttl, bool read_only) {
46 DBWithTTL* db;
47 Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
48 if (s.ok()) {
49 *dbptr = db;
50 } else {
51 *dbptr = nullptr;
52 }
53 return s;
54 }
55
56 Status DBWithTTL::Open(const Options& options, const std::string& dbname,
57 DBWithTTL** dbptr, int32_t ttl, bool read_only) {
58
59 DBOptions db_options(options);
60 ColumnFamilyOptions cf_options(options);
61 std::vector<ColumnFamilyDescriptor> column_families;
62 column_families.push_back(
63 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
64 std::vector<ColumnFamilyHandle*> handles;
65 Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
66 dbptr, {ttl}, read_only);
67 if (s.ok()) {
68 assert(handles.size() == 1);
69 // i can delete the handle since DBImpl is always holding a reference to
70 // default column family
71 delete handles[0];
72 }
73 return s;
74 }
75
76 Status DBWithTTL::Open(
77 const DBOptions& db_options, const std::string& dbname,
78 const std::vector<ColumnFamilyDescriptor>& column_families,
79 std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
80 std::vector<int32_t> ttls, bool read_only) {
81
82 if (ttls.size() != column_families.size()) {
83 return Status::InvalidArgument(
84 "ttls size has to be the same as number of column families");
85 }
86
87 std::vector<ColumnFamilyDescriptor> column_families_sanitized =
88 column_families;
89 for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
90 DBWithTTLImpl::SanitizeOptions(
91 ttls[i], &column_families_sanitized[i].options,
92 db_options.env == nullptr ? Env::Default() : db_options.env);
93 }
94 DB* db;
95
96 Status st;
97 if (read_only) {
98 st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
99 handles, &db);
100 } else {
101 st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
102 }
103 if (st.ok()) {
104 *dbptr = new DBWithTTLImpl(db);
105 } else {
106 *dbptr = nullptr;
107 }
108 return st;
109 }
110
111 Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
112 const ColumnFamilyOptions& options, const std::string& column_family_name,
113 ColumnFamilyHandle** handle, int ttl) {
114 ColumnFamilyOptions sanitized_options = options;
115 DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv());
116
117 return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
118 handle);
119 }
120
121 Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
122 const std::string& column_family_name,
123 ColumnFamilyHandle** handle) {
124 return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
125 }
126
127 // Appends the current timestamp to the string.
128 // Returns false if could not get the current_time, true if append succeeds
129 Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
130 Env* env) {
131 val_with_ts->reserve(kTSLength + val.size());
132 char ts_string[kTSLength];
133 int64_t curtime;
134 Status st = env->GetCurrentTime(&curtime);
135 if (!st.ok()) {
136 return st;
137 }
138 EncodeFixed32(ts_string, (int32_t)curtime);
139 val_with_ts->append(val.data(), val.size());
140 val_with_ts->append(ts_string, kTSLength);
141 return st;
142 }
143
144 // Returns corruption if the length of the string is lesser than timestamp, or
145 // timestamp refers to a time lesser than ttl-feature release time
146 Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
147 if (str.size() < kTSLength) {
148 return Status::Corruption("Error: value's length less than timestamp's\n");
149 }
150 // Checks that TS is not lesser than kMinTimestamp
151 // Gaurds against corruption & normal database opened incorrectly in ttl mode
152 int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
153 if (timestamp_value < kMinTimestamp) {
154 return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
155 }
156 return Status::OK();
157 }
158
159 // Checks if the string is stale or not according to TTl provided
160 bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
161 if (ttl <= 0) { // Data is fresh if TTL is non-positive
162 return false;
163 }
164 int64_t curtime;
165 if (!env->GetCurrentTime(&curtime).ok()) {
166 return false; // Treat the data as fresh if could not get current time
167 }
168 int32_t timestamp_value =
169 DecodeFixed32(value.data() + value.size() - kTSLength);
170 return (timestamp_value + ttl) < curtime;
171 }
172
173 // Strips the TS from the end of the slice
174 Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
175 Status st;
176 if (pinnable_val->size() < kTSLength) {
177 return Status::Corruption("Bad timestamp in key-value");
178 }
179 // Erasing characters which hold the TS
180 pinnable_val->remove_suffix(kTSLength);
181 return st;
182 }
183
184 // Strips the TS from the end of the string
185 Status DBWithTTLImpl::StripTS(std::string* str) {
186 Status st;
187 if (str->length() < kTSLength) {
188 return Status::Corruption("Bad timestamp in key-value");
189 }
190 // Erasing characters which hold the TS
191 str->erase(str->length() - kTSLength, kTSLength);
192 return st;
193 }
194
195 Status DBWithTTLImpl::Put(const WriteOptions& options,
196 ColumnFamilyHandle* column_family, const Slice& key,
197 const Slice& val) {
198 WriteBatch batch;
199 batch.Put(column_family, key, val);
200 return Write(options, &batch);
201 }
202
203 Status DBWithTTLImpl::Get(const ReadOptions& options,
204 ColumnFamilyHandle* column_family, const Slice& key,
205 PinnableSlice* value) {
206 Status st = db_->Get(options, column_family, key, value);
207 if (!st.ok()) {
208 return st;
209 }
210 st = SanityCheckTimestamp(*value);
211 if (!st.ok()) {
212 return st;
213 }
214 return StripTS(value);
215 }
216
217 std::vector<Status> DBWithTTLImpl::MultiGet(
218 const ReadOptions& options,
219 const std::vector<ColumnFamilyHandle*>& column_family,
220 const std::vector<Slice>& keys, std::vector<std::string>* values) {
221 auto statuses = db_->MultiGet(options, column_family, keys, values);
222 for (size_t i = 0; i < keys.size(); ++i) {
223 if (!statuses[i].ok()) {
224 continue;
225 }
226 statuses[i] = SanityCheckTimestamp((*values)[i]);
227 if (!statuses[i].ok()) {
228 continue;
229 }
230 statuses[i] = StripTS(&(*values)[i]);
231 }
232 return statuses;
233 }
234
235 bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
236 ColumnFamilyHandle* column_family,
237 const Slice& key, std::string* value,
238 bool* value_found) {
239 bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
240 if (ret && value != nullptr && value_found != nullptr && *value_found) {
241 if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
242 return false;
243 }
244 }
245 return ret;
246 }
247
248 Status DBWithTTLImpl::Merge(const WriteOptions& options,
249 ColumnFamilyHandle* column_family, const Slice& key,
250 const Slice& value) {
251 WriteBatch batch;
252 batch.Merge(column_family, key, value);
253 return Write(options, &batch);
254 }
255
256 Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
257 class Handler : public WriteBatch::Handler {
258 public:
259 explicit Handler(Env* env) : env_(env) {}
260 WriteBatch updates_ttl;
261 Status batch_rewrite_status;
262 virtual Status PutCF(uint32_t column_family_id, const Slice& key,
263 const Slice& value) override {
264 std::string value_with_ts;
265 Status st = AppendTS(value, &value_with_ts, env_);
266 if (!st.ok()) {
267 batch_rewrite_status = st;
268 } else {
269 WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
270 value_with_ts);
271 }
272 return Status::OK();
273 }
274 virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
275 const Slice& value) override {
276 std::string value_with_ts;
277 Status st = AppendTS(value, &value_with_ts, env_);
278 if (!st.ok()) {
279 batch_rewrite_status = st;
280 } else {
281 WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
282 value_with_ts);
283 }
284 return Status::OK();
285 }
286 virtual Status DeleteCF(uint32_t column_family_id,
287 const Slice& key) override {
288 WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
289 return Status::OK();
290 }
291 virtual void LogData(const Slice& blob) override {
292 updates_ttl.PutLogData(blob);
293 }
294
295 private:
296 Env* env_;
297 };
298 Handler handler(GetEnv());
299 updates->Iterate(&handler);
300 if (!handler.batch_rewrite_status.ok()) {
301 return handler.batch_rewrite_status;
302 } else {
303 return db_->Write(opts, &(handler.updates_ttl));
304 }
305 }
306
307 Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
308 ColumnFamilyHandle* column_family) {
309 return new TtlIterator(db_->NewIterator(opts, column_family));
310 }
311
312 } // namespace rocksdb
313 #endif // ROCKSDB_LITE