]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style license that can be | |
3 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
4 | #ifndef ROCKSDB_LITE | |
5 | ||
6 | #include "utilities/ttl/db_ttl_impl.h" | |
7 | ||
8 | #include "db/write_batch_internal.h" | |
9 | #include "rocksdb/convenience.h" | |
10 | #include "rocksdb/env.h" | |
11 | #include "rocksdb/iterator.h" | |
12 | #include "rocksdb/utilities/db_ttl.h" | |
13 | #include "util/coding.h" | |
14 | #include "util/filename.h" | |
15 | ||
16 | namespace rocksdb { | |
17 | ||
18 | void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, | |
19 | Env* env) { | |
20 | if (options->compaction_filter) { | |
21 | options->compaction_filter = | |
22 | new TtlCompactionFilter(ttl, env, options->compaction_filter); | |
23 | } else { | |
24 | options->compaction_filter_factory = | |
25 | std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory( | |
26 | ttl, env, options->compaction_filter_factory)); | |
27 | } | |
28 | ||
29 | if (options->merge_operator) { | |
30 | options->merge_operator.reset( | |
31 | new TtlMergeOperator(options->merge_operator, env)); | |
32 | } | |
33 | } | |
34 | ||
35 | // Open the db inside DBWithTTLImpl because options needs pointer to its ttl | |
36 | DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {} | |
37 | ||
38 | DBWithTTLImpl::~DBWithTTLImpl() { | |
39 | // Need to stop background compaction before getting rid of the filter | |
40 | CancelAllBackgroundWork(db_, /* wait = */ true); | |
41 | delete GetOptions().compaction_filter; | |
42 | } | |
43 | ||
44 | Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname, | |
45 | StackableDB** dbptr, int32_t ttl, bool read_only) { | |
46 | DBWithTTL* db; | |
47 | Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only); | |
48 | if (s.ok()) { | |
49 | *dbptr = db; | |
50 | } else { | |
51 | *dbptr = nullptr; | |
52 | } | |
53 | return s; | |
54 | } | |
55 | ||
56 | Status DBWithTTL::Open(const Options& options, const std::string& dbname, | |
57 | DBWithTTL** dbptr, int32_t ttl, bool read_only) { | |
58 | ||
59 | DBOptions db_options(options); | |
60 | ColumnFamilyOptions cf_options(options); | |
61 | std::vector<ColumnFamilyDescriptor> column_families; | |
62 | column_families.push_back( | |
63 | ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); | |
64 | std::vector<ColumnFamilyHandle*> handles; | |
65 | Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles, | |
66 | dbptr, {ttl}, read_only); | |
67 | if (s.ok()) { | |
68 | assert(handles.size() == 1); | |
69 | // i can delete the handle since DBImpl is always holding a reference to | |
70 | // default column family | |
71 | delete handles[0]; | |
72 | } | |
73 | return s; | |
74 | } | |
75 | ||
76 | Status DBWithTTL::Open( | |
77 | const DBOptions& db_options, const std::string& dbname, | |
78 | const std::vector<ColumnFamilyDescriptor>& column_families, | |
79 | std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr, | |
80 | std::vector<int32_t> ttls, bool read_only) { | |
81 | ||
82 | if (ttls.size() != column_families.size()) { | |
83 | return Status::InvalidArgument( | |
84 | "ttls size has to be the same as number of column families"); | |
85 | } | |
86 | ||
87 | std::vector<ColumnFamilyDescriptor> column_families_sanitized = | |
88 | column_families; | |
89 | for (size_t i = 0; i < column_families_sanitized.size(); ++i) { | |
90 | DBWithTTLImpl::SanitizeOptions( | |
91 | ttls[i], &column_families_sanitized[i].options, | |
92 | db_options.env == nullptr ? Env::Default() : db_options.env); | |
93 | } | |
94 | DB* db; | |
95 | ||
96 | Status st; | |
97 | if (read_only) { | |
98 | st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized, | |
99 | handles, &db); | |
100 | } else { | |
101 | st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db); | |
102 | } | |
103 | if (st.ok()) { | |
104 | *dbptr = new DBWithTTLImpl(db); | |
105 | } else { | |
106 | *dbptr = nullptr; | |
107 | } | |
108 | return st; | |
109 | } | |
110 | ||
111 | Status DBWithTTLImpl::CreateColumnFamilyWithTtl( | |
112 | const ColumnFamilyOptions& options, const std::string& column_family_name, | |
113 | ColumnFamilyHandle** handle, int ttl) { | |
114 | ColumnFamilyOptions sanitized_options = options; | |
115 | DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv()); | |
116 | ||
117 | return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name, | |
118 | handle); | |
119 | } | |
120 | ||
121 | Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options, | |
122 | const std::string& column_family_name, | |
123 | ColumnFamilyHandle** handle) { | |
124 | return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0); | |
125 | } | |
126 | ||
127 | // Appends the current timestamp to the string. | |
128 | // Returns false if could not get the current_time, true if append succeeds | |
129 | Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts, | |
130 | Env* env) { | |
131 | val_with_ts->reserve(kTSLength + val.size()); | |
132 | char ts_string[kTSLength]; | |
133 | int64_t curtime; | |
134 | Status st = env->GetCurrentTime(&curtime); | |
135 | if (!st.ok()) { | |
136 | return st; | |
137 | } | |
138 | EncodeFixed32(ts_string, (int32_t)curtime); | |
139 | val_with_ts->append(val.data(), val.size()); | |
140 | val_with_ts->append(ts_string, kTSLength); | |
141 | return st; | |
142 | } | |
143 | ||
144 | // Returns corruption if the length of the string is lesser than timestamp, or | |
145 | // timestamp refers to a time lesser than ttl-feature release time | |
146 | Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) { | |
147 | if (str.size() < kTSLength) { | |
148 | return Status::Corruption("Error: value's length less than timestamp's\n"); | |
149 | } | |
150 | // Checks that TS is not lesser than kMinTimestamp | |
151 | // Gaurds against corruption & normal database opened incorrectly in ttl mode | |
152 | int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength); | |
153 | if (timestamp_value < kMinTimestamp) { | |
154 | return Status::Corruption("Error: Timestamp < ttl feature release time!\n"); | |
155 | } | |
156 | return Status::OK(); | |
157 | } | |
158 | ||
159 | // Checks if the string is stale or not according to TTl provided | |
160 | bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) { | |
161 | if (ttl <= 0) { // Data is fresh if TTL is non-positive | |
162 | return false; | |
163 | } | |
164 | int64_t curtime; | |
165 | if (!env->GetCurrentTime(&curtime).ok()) { | |
166 | return false; // Treat the data as fresh if could not get current time | |
167 | } | |
168 | int32_t timestamp_value = | |
169 | DecodeFixed32(value.data() + value.size() - kTSLength); | |
170 | return (timestamp_value + ttl) < curtime; | |
171 | } | |
172 | ||
173 | // Strips the TS from the end of the slice | |
174 | Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) { | |
175 | Status st; | |
176 | if (pinnable_val->size() < kTSLength) { | |
177 | return Status::Corruption("Bad timestamp in key-value"); | |
178 | } | |
179 | // Erasing characters which hold the TS | |
180 | pinnable_val->remove_suffix(kTSLength); | |
181 | return st; | |
182 | } | |
183 | ||
184 | // Strips the TS from the end of the string | |
185 | Status DBWithTTLImpl::StripTS(std::string* str) { | |
186 | Status st; | |
187 | if (str->length() < kTSLength) { | |
188 | return Status::Corruption("Bad timestamp in key-value"); | |
189 | } | |
190 | // Erasing characters which hold the TS | |
191 | str->erase(str->length() - kTSLength, kTSLength); | |
192 | return st; | |
193 | } | |
194 | ||
195 | Status DBWithTTLImpl::Put(const WriteOptions& options, | |
196 | ColumnFamilyHandle* column_family, const Slice& key, | |
197 | const Slice& val) { | |
198 | WriteBatch batch; | |
199 | batch.Put(column_family, key, val); | |
200 | return Write(options, &batch); | |
201 | } | |
202 | ||
203 | Status DBWithTTLImpl::Get(const ReadOptions& options, | |
204 | ColumnFamilyHandle* column_family, const Slice& key, | |
205 | PinnableSlice* value) { | |
206 | Status st = db_->Get(options, column_family, key, value); | |
207 | if (!st.ok()) { | |
208 | return st; | |
209 | } | |
210 | st = SanityCheckTimestamp(*value); | |
211 | if (!st.ok()) { | |
212 | return st; | |
213 | } | |
214 | return StripTS(value); | |
215 | } | |
216 | ||
217 | std::vector<Status> DBWithTTLImpl::MultiGet( | |
218 | const ReadOptions& options, | |
219 | const std::vector<ColumnFamilyHandle*>& column_family, | |
220 | const std::vector<Slice>& keys, std::vector<std::string>* values) { | |
221 | auto statuses = db_->MultiGet(options, column_family, keys, values); | |
222 | for (size_t i = 0; i < keys.size(); ++i) { | |
223 | if (!statuses[i].ok()) { | |
224 | continue; | |
225 | } | |
226 | statuses[i] = SanityCheckTimestamp((*values)[i]); | |
227 | if (!statuses[i].ok()) { | |
228 | continue; | |
229 | } | |
230 | statuses[i] = StripTS(&(*values)[i]); | |
231 | } | |
232 | return statuses; | |
233 | } | |
234 | ||
235 | bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options, | |
236 | ColumnFamilyHandle* column_family, | |
237 | const Slice& key, std::string* value, | |
238 | bool* value_found) { | |
239 | bool ret = db_->KeyMayExist(options, column_family, key, value, value_found); | |
240 | if (ret && value != nullptr && value_found != nullptr && *value_found) { | |
241 | if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) { | |
242 | return false; | |
243 | } | |
244 | } | |
245 | return ret; | |
246 | } | |
247 | ||
248 | Status DBWithTTLImpl::Merge(const WriteOptions& options, | |
249 | ColumnFamilyHandle* column_family, const Slice& key, | |
250 | const Slice& value) { | |
251 | WriteBatch batch; | |
252 | batch.Merge(column_family, key, value); | |
253 | return Write(options, &batch); | |
254 | } | |
255 | ||
256 | Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) { | |
257 | class Handler : public WriteBatch::Handler { | |
258 | public: | |
259 | explicit Handler(Env* env) : env_(env) {} | |
260 | WriteBatch updates_ttl; | |
261 | Status batch_rewrite_status; | |
494da23a TL |
262 | Status PutCF(uint32_t column_family_id, const Slice& key, |
263 | const Slice& value) override { | |
7c673cae FG |
264 | std::string value_with_ts; |
265 | Status st = AppendTS(value, &value_with_ts, env_); | |
266 | if (!st.ok()) { | |
267 | batch_rewrite_status = st; | |
268 | } else { | |
269 | WriteBatchInternal::Put(&updates_ttl, column_family_id, key, | |
270 | value_with_ts); | |
271 | } | |
272 | return Status::OK(); | |
273 | } | |
494da23a TL |
274 | Status MergeCF(uint32_t column_family_id, const Slice& key, |
275 | const Slice& value) override { | |
7c673cae FG |
276 | std::string value_with_ts; |
277 | Status st = AppendTS(value, &value_with_ts, env_); | |
278 | if (!st.ok()) { | |
279 | batch_rewrite_status = st; | |
280 | } else { | |
281 | WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, | |
282 | value_with_ts); | |
283 | } | |
284 | return Status::OK(); | |
285 | } | |
494da23a | 286 | Status DeleteCF(uint32_t column_family_id, const Slice& key) override { |
7c673cae FG |
287 | WriteBatchInternal::Delete(&updates_ttl, column_family_id, key); |
288 | return Status::OK(); | |
289 | } | |
494da23a | 290 | void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); } |
7c673cae FG |
291 | |
292 | private: | |
293 | Env* env_; | |
294 | }; | |
295 | Handler handler(GetEnv()); | |
296 | updates->Iterate(&handler); | |
297 | if (!handler.batch_rewrite_status.ok()) { | |
298 | return handler.batch_rewrite_status; | |
299 | } else { | |
300 | return db_->Write(opts, &(handler.updates_ttl)); | |
301 | } | |
302 | } | |
303 | ||
304 | Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts, | |
305 | ColumnFamilyHandle* column_family) { | |
306 | return new TtlIterator(db_->NewIterator(opts, column_family)); | |
307 | } | |
308 | ||
11fdf7f2 TL |
309 | void DBWithTTLImpl::SetTtl(ColumnFamilyHandle *h, int32_t ttl) { |
310 | std::shared_ptr<TtlCompactionFilterFactory> filter; | |
311 | Options opts; | |
312 | opts = GetOptions(h); | |
313 | filter = std::static_pointer_cast<TtlCompactionFilterFactory>( | |
314 | opts.compaction_filter_factory); | |
315 | if (!filter) | |
316 | return; | |
317 | filter->SetTtl(ttl); | |
318 | } | |
319 | ||
7c673cae FG |
320 | } // namespace rocksdb |
321 | #endif // ROCKSDB_LITE |