1 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file. See the AUTHORS file for names of contributors.
7 #include "utilities/ttl/db_ttl_impl.h"
9 #include "db/write_batch_internal.h"
10 #include "file/filename.h"
11 #include "rocksdb/convenience.h"
12 #include "rocksdb/env.h"
13 #include "rocksdb/iterator.h"
14 #include "rocksdb/utilities/db_ttl.h"
15 #include "util/coding.h"
17 namespace ROCKSDB_NAMESPACE
{
19 void DBWithTTLImpl::SanitizeOptions(int32_t ttl
, ColumnFamilyOptions
* options
,
21 if (options
->compaction_filter
) {
22 options
->compaction_filter
=
23 new TtlCompactionFilter(ttl
, env
, options
->compaction_filter
);
25 options
->compaction_filter_factory
=
26 std::shared_ptr
<CompactionFilterFactory
>(new TtlCompactionFilterFactory(
27 ttl
, env
, options
->compaction_filter_factory
));
30 if (options
->merge_operator
) {
31 options
->merge_operator
.reset(
32 new TtlMergeOperator(options
->merge_operator
, env
));
36 // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
37 DBWithTTLImpl::DBWithTTLImpl(DB
* db
) : DBWithTTL(db
), closed_(false) {}
39 DBWithTTLImpl::~DBWithTTLImpl() {
45 Status
DBWithTTLImpl::Close() {
46 Status ret
= Status::OK();
48 Options default_options
= GetOptions();
49 // Need to stop background compaction before getting rid of the filter
50 CancelAllBackgroundWork(db_
, /* wait = */ true);
52 delete default_options
.compaction_filter
;
58 Status
UtilityDB::OpenTtlDB(const Options
& options
, const std::string
& dbname
,
59 StackableDB
** dbptr
, int32_t ttl
, bool read_only
) {
61 Status s
= DBWithTTL::Open(options
, dbname
, &db
, ttl
, read_only
);
70 Status
DBWithTTL::Open(const Options
& options
, const std::string
& dbname
,
71 DBWithTTL
** dbptr
, int32_t ttl
, bool read_only
) {
73 DBOptions
db_options(options
);
74 ColumnFamilyOptions
cf_options(options
);
75 std::vector
<ColumnFamilyDescriptor
> column_families
;
76 column_families
.push_back(
77 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, cf_options
));
78 std::vector
<ColumnFamilyHandle
*> handles
;
79 Status s
= DBWithTTL::Open(db_options
, dbname
, column_families
, &handles
,
80 dbptr
, {ttl
}, read_only
);
82 assert(handles
.size() == 1);
83 // i can delete the handle since DBImpl is always holding a reference to
84 // default column family
90 Status
DBWithTTL::Open(
91 const DBOptions
& db_options
, const std::string
& dbname
,
92 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
93 std::vector
<ColumnFamilyHandle
*>* handles
, DBWithTTL
** dbptr
,
94 std::vector
<int32_t> ttls
, bool read_only
) {
96 if (ttls
.size() != column_families
.size()) {
97 return Status::InvalidArgument(
98 "ttls size has to be the same as number of column families");
101 std::vector
<ColumnFamilyDescriptor
> column_families_sanitized
=
103 for (size_t i
= 0; i
< column_families_sanitized
.size(); ++i
) {
104 DBWithTTLImpl::SanitizeOptions(
105 ttls
[i
], &column_families_sanitized
[i
].options
,
106 db_options
.env
== nullptr ? Env::Default() : db_options
.env
);
112 st
= DB::OpenForReadOnly(db_options
, dbname
, column_families_sanitized
,
115 st
= DB::Open(db_options
, dbname
, column_families_sanitized
, handles
, &db
);
118 *dbptr
= new DBWithTTLImpl(db
);
125 Status
DBWithTTLImpl::CreateColumnFamilyWithTtl(
126 const ColumnFamilyOptions
& options
, const std::string
& column_family_name
,
127 ColumnFamilyHandle
** handle
, int ttl
) {
128 ColumnFamilyOptions sanitized_options
= options
;
129 DBWithTTLImpl::SanitizeOptions(ttl
, &sanitized_options
, GetEnv());
131 return DBWithTTL::CreateColumnFamily(sanitized_options
, column_family_name
,
135 Status
DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions
& options
,
136 const std::string
& column_family_name
,
137 ColumnFamilyHandle
** handle
) {
138 return CreateColumnFamilyWithTtl(options
, column_family_name
, handle
, 0);
141 // Appends the current timestamp to the string.
142 // Returns false if could not get the current_time, true if append succeeds
143 Status
DBWithTTLImpl::AppendTS(const Slice
& val
, std::string
* val_with_ts
,
145 val_with_ts
->reserve(kTSLength
+ val
.size());
146 char ts_string
[kTSLength
];
148 Status st
= env
->GetCurrentTime(&curtime
);
152 EncodeFixed32(ts_string
, (int32_t)curtime
);
153 val_with_ts
->append(val
.data(), val
.size());
154 val_with_ts
->append(ts_string
, kTSLength
);
158 // Returns corruption if the length of the string is lesser than timestamp, or
159 // timestamp refers to a time lesser than ttl-feature release time
160 Status
DBWithTTLImpl::SanityCheckTimestamp(const Slice
& str
) {
161 if (str
.size() < kTSLength
) {
162 return Status::Corruption("Error: value's length less than timestamp's\n");
164 // Checks that TS is not lesser than kMinTimestamp
165 // Gaurds against corruption & normal database opened incorrectly in ttl mode
166 int32_t timestamp_value
= DecodeFixed32(str
.data() + str
.size() - kTSLength
);
167 if (timestamp_value
< kMinTimestamp
) {
168 return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
173 // Checks if the string is stale or not according to TTl provided
174 bool DBWithTTLImpl::IsStale(const Slice
& value
, int32_t ttl
, Env
* env
) {
175 if (ttl
<= 0) { // Data is fresh if TTL is non-positive
179 if (!env
->GetCurrentTime(&curtime
).ok()) {
180 return false; // Treat the data as fresh if could not get current time
182 int32_t timestamp_value
=
183 DecodeFixed32(value
.data() + value
.size() - kTSLength
);
184 return (timestamp_value
+ ttl
) < curtime
;
187 // Strips the TS from the end of the slice
188 Status
DBWithTTLImpl::StripTS(PinnableSlice
* pinnable_val
) {
190 if (pinnable_val
->size() < kTSLength
) {
191 return Status::Corruption("Bad timestamp in key-value");
193 // Erasing characters which hold the TS
194 pinnable_val
->remove_suffix(kTSLength
);
198 // Strips the TS from the end of the string
199 Status
DBWithTTLImpl::StripTS(std::string
* str
) {
201 if (str
->length() < kTSLength
) {
202 return Status::Corruption("Bad timestamp in key-value");
204 // Erasing characters which hold the TS
205 str
->erase(str
->length() - kTSLength
, kTSLength
);
209 Status
DBWithTTLImpl::Put(const WriteOptions
& options
,
210 ColumnFamilyHandle
* column_family
, const Slice
& key
,
213 batch
.Put(column_family
, key
, val
);
214 return Write(options
, &batch
);
217 Status
DBWithTTLImpl::Get(const ReadOptions
& options
,
218 ColumnFamilyHandle
* column_family
, const Slice
& key
,
219 PinnableSlice
* value
) {
220 Status st
= db_
->Get(options
, column_family
, key
, value
);
224 st
= SanityCheckTimestamp(*value
);
228 return StripTS(value
);
231 std::vector
<Status
> DBWithTTLImpl::MultiGet(
232 const ReadOptions
& options
,
233 const std::vector
<ColumnFamilyHandle
*>& column_family
,
234 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
235 auto statuses
= db_
->MultiGet(options
, column_family
, keys
, values
);
236 for (size_t i
= 0; i
< keys
.size(); ++i
) {
237 if (!statuses
[i
].ok()) {
240 statuses
[i
] = SanityCheckTimestamp((*values
)[i
]);
241 if (!statuses
[i
].ok()) {
244 statuses
[i
] = StripTS(&(*values
)[i
]);
249 bool DBWithTTLImpl::KeyMayExist(const ReadOptions
& options
,
250 ColumnFamilyHandle
* column_family
,
251 const Slice
& key
, std::string
* value
,
253 bool ret
= db_
->KeyMayExist(options
, column_family
, key
, value
, value_found
);
254 if (ret
&& value
!= nullptr && value_found
!= nullptr && *value_found
) {
255 if (!SanityCheckTimestamp(*value
).ok() || !StripTS(value
).ok()) {
262 Status
DBWithTTLImpl::Merge(const WriteOptions
& options
,
263 ColumnFamilyHandle
* column_family
, const Slice
& key
,
264 const Slice
& value
) {
266 batch
.Merge(column_family
, key
, value
);
267 return Write(options
, &batch
);
270 Status
DBWithTTLImpl::Write(const WriteOptions
& opts
, WriteBatch
* updates
) {
271 class Handler
: public WriteBatch::Handler
{
273 explicit Handler(Env
* env
) : env_(env
) {}
274 WriteBatch updates_ttl
;
275 Status batch_rewrite_status
;
276 Status
PutCF(uint32_t column_family_id
, const Slice
& key
,
277 const Slice
& value
) override
{
278 std::string value_with_ts
;
279 Status st
= AppendTS(value
, &value_with_ts
, env_
);
281 batch_rewrite_status
= st
;
283 WriteBatchInternal::Put(&updates_ttl
, column_family_id
, key
,
288 Status
MergeCF(uint32_t column_family_id
, const Slice
& key
,
289 const Slice
& value
) override
{
290 std::string value_with_ts
;
291 Status st
= AppendTS(value
, &value_with_ts
, env_
);
293 batch_rewrite_status
= st
;
295 WriteBatchInternal::Merge(&updates_ttl
, column_family_id
, key
,
300 Status
DeleteCF(uint32_t column_family_id
, const Slice
& key
) override
{
301 WriteBatchInternal::Delete(&updates_ttl
, column_family_id
, key
);
304 void LogData(const Slice
& blob
) override
{ updates_ttl
.PutLogData(blob
); }
309 Handler
handler(GetEnv());
310 updates
->Iterate(&handler
);
311 if (!handler
.batch_rewrite_status
.ok()) {
312 return handler
.batch_rewrite_status
;
314 return db_
->Write(opts
, &(handler
.updates_ttl
));
318 Iterator
* DBWithTTLImpl::NewIterator(const ReadOptions
& opts
,
319 ColumnFamilyHandle
* column_family
) {
320 return new TtlIterator(db_
->NewIterator(opts
, column_family
));
323 void DBWithTTLImpl::SetTtl(ColumnFamilyHandle
*h
, int32_t ttl
) {
324 std::shared_ptr
<TtlCompactionFilterFactory
> filter
;
326 opts
= GetOptions(h
);
327 filter
= std::static_pointer_cast
<TtlCompactionFilterFactory
>(
328 opts
.compaction_filter_factory
);
334 } // namespace ROCKSDB_NAMESPACE
335 #endif // ROCKSDB_LITE