1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
6 #include "utilities/ttl/db_ttl_impl.h"
8 #include "db/write_batch_internal.h"
9 #include "rocksdb/convenience.h"
10 #include "rocksdb/env.h"
11 #include "rocksdb/iterator.h"
12 #include "rocksdb/utilities/db_ttl.h"
13 #include "util/coding.h"
14 #include "util/filename.h"
18 void DBWithTTLImpl::SanitizeOptions(int32_t ttl
, ColumnFamilyOptions
* options
,
20 if (options
->compaction_filter
) {
21 options
->compaction_filter
=
22 new TtlCompactionFilter(ttl
, env
, options
->compaction_filter
);
24 options
->compaction_filter_factory
=
25 std::shared_ptr
<CompactionFilterFactory
>(new TtlCompactionFilterFactory(
26 ttl
, env
, options
->compaction_filter_factory
));
29 if (options
->merge_operator
) {
30 options
->merge_operator
.reset(
31 new TtlMergeOperator(options
->merge_operator
, env
));
35 // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
36 DBWithTTLImpl::DBWithTTLImpl(DB
* db
) : DBWithTTL(db
) {}
38 DBWithTTLImpl::~DBWithTTLImpl() {
39 // Need to stop background compaction before getting rid of the filter
40 CancelAllBackgroundWork(db_
, /* wait = */ true);
41 delete GetOptions().compaction_filter
;
44 Status
UtilityDB::OpenTtlDB(const Options
& options
, const std::string
& dbname
,
45 StackableDB
** dbptr
, int32_t ttl
, bool read_only
) {
47 Status s
= DBWithTTL::Open(options
, dbname
, &db
, ttl
, read_only
);
56 Status
DBWithTTL::Open(const Options
& options
, const std::string
& dbname
,
57 DBWithTTL
** dbptr
, int32_t ttl
, bool read_only
) {
59 DBOptions
db_options(options
);
60 ColumnFamilyOptions
cf_options(options
);
61 std::vector
<ColumnFamilyDescriptor
> column_families
;
62 column_families
.push_back(
63 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, cf_options
));
64 std::vector
<ColumnFamilyHandle
*> handles
;
65 Status s
= DBWithTTL::Open(db_options
, dbname
, column_families
, &handles
,
66 dbptr
, {ttl
}, read_only
);
68 assert(handles
.size() == 1);
69 // i can delete the handle since DBImpl is always holding a reference to
70 // default column family
76 Status
DBWithTTL::Open(
77 const DBOptions
& db_options
, const std::string
& dbname
,
78 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
79 std::vector
<ColumnFamilyHandle
*>* handles
, DBWithTTL
** dbptr
,
80 std::vector
<int32_t> ttls
, bool read_only
) {
82 if (ttls
.size() != column_families
.size()) {
83 return Status::InvalidArgument(
84 "ttls size has to be the same as number of column families");
87 std::vector
<ColumnFamilyDescriptor
> column_families_sanitized
=
89 for (size_t i
= 0; i
< column_families_sanitized
.size(); ++i
) {
90 DBWithTTLImpl::SanitizeOptions(
91 ttls
[i
], &column_families_sanitized
[i
].options
,
92 db_options
.env
== nullptr ? Env::Default() : db_options
.env
);
98 st
= DB::OpenForReadOnly(db_options
, dbname
, column_families_sanitized
,
101 st
= DB::Open(db_options
, dbname
, column_families_sanitized
, handles
, &db
);
104 *dbptr
= new DBWithTTLImpl(db
);
111 Status
DBWithTTLImpl::CreateColumnFamilyWithTtl(
112 const ColumnFamilyOptions
& options
, const std::string
& column_family_name
,
113 ColumnFamilyHandle
** handle
, int ttl
) {
114 ColumnFamilyOptions sanitized_options
= options
;
115 DBWithTTLImpl::SanitizeOptions(ttl
, &sanitized_options
, GetEnv());
117 return DBWithTTL::CreateColumnFamily(sanitized_options
, column_family_name
,
121 Status
DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions
& options
,
122 const std::string
& column_family_name
,
123 ColumnFamilyHandle
** handle
) {
124 return CreateColumnFamilyWithTtl(options
, column_family_name
, handle
, 0);
127 // Appends the current timestamp to the string.
128 // Returns false if could not get the current_time, true if append succeeds
129 Status
DBWithTTLImpl::AppendTS(const Slice
& val
, std::string
* val_with_ts
,
131 val_with_ts
->reserve(kTSLength
+ val
.size());
132 char ts_string
[kTSLength
];
134 Status st
= env
->GetCurrentTime(&curtime
);
138 EncodeFixed32(ts_string
, (int32_t)curtime
);
139 val_with_ts
->append(val
.data(), val
.size());
140 val_with_ts
->append(ts_string
, kTSLength
);
144 // Returns corruption if the length of the string is lesser than timestamp, or
145 // timestamp refers to a time lesser than ttl-feature release time
146 Status
DBWithTTLImpl::SanityCheckTimestamp(const Slice
& str
) {
147 if (str
.size() < kTSLength
) {
148 return Status::Corruption("Error: value's length less than timestamp's\n");
150 // Checks that TS is not lesser than kMinTimestamp
151 // Gaurds against corruption & normal database opened incorrectly in ttl mode
152 int32_t timestamp_value
= DecodeFixed32(str
.data() + str
.size() - kTSLength
);
153 if (timestamp_value
< kMinTimestamp
) {
154 return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
159 // Checks if the string is stale or not according to TTl provided
160 bool DBWithTTLImpl::IsStale(const Slice
& value
, int32_t ttl
, Env
* env
) {
161 if (ttl
<= 0) { // Data is fresh if TTL is non-positive
165 if (!env
->GetCurrentTime(&curtime
).ok()) {
166 return false; // Treat the data as fresh if could not get current time
168 int32_t timestamp_value
=
169 DecodeFixed32(value
.data() + value
.size() - kTSLength
);
170 return (timestamp_value
+ ttl
) < curtime
;
173 // Strips the TS from the end of the slice
174 Status
DBWithTTLImpl::StripTS(PinnableSlice
* pinnable_val
) {
176 if (pinnable_val
->size() < kTSLength
) {
177 return Status::Corruption("Bad timestamp in key-value");
179 // Erasing characters which hold the TS
180 pinnable_val
->remove_suffix(kTSLength
);
184 // Strips the TS from the end of the string
185 Status
DBWithTTLImpl::StripTS(std::string
* str
) {
187 if (str
->length() < kTSLength
) {
188 return Status::Corruption("Bad timestamp in key-value");
190 // Erasing characters which hold the TS
191 str
->erase(str
->length() - kTSLength
, kTSLength
);
195 Status
DBWithTTLImpl::Put(const WriteOptions
& options
,
196 ColumnFamilyHandle
* column_family
, const Slice
& key
,
199 batch
.Put(column_family
, key
, val
);
200 return Write(options
, &batch
);
203 Status
DBWithTTLImpl::Get(const ReadOptions
& options
,
204 ColumnFamilyHandle
* column_family
, const Slice
& key
,
205 PinnableSlice
* value
) {
206 Status st
= db_
->Get(options
, column_family
, key
, value
);
210 st
= SanityCheckTimestamp(*value
);
214 return StripTS(value
);
217 std::vector
<Status
> DBWithTTLImpl::MultiGet(
218 const ReadOptions
& options
,
219 const std::vector
<ColumnFamilyHandle
*>& column_family
,
220 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
221 auto statuses
= db_
->MultiGet(options
, column_family
, keys
, values
);
222 for (size_t i
= 0; i
< keys
.size(); ++i
) {
223 if (!statuses
[i
].ok()) {
226 statuses
[i
] = SanityCheckTimestamp((*values
)[i
]);
227 if (!statuses
[i
].ok()) {
230 statuses
[i
] = StripTS(&(*values
)[i
]);
235 bool DBWithTTLImpl::KeyMayExist(const ReadOptions
& options
,
236 ColumnFamilyHandle
* column_family
,
237 const Slice
& key
, std::string
* value
,
239 bool ret
= db_
->KeyMayExist(options
, column_family
, key
, value
, value_found
);
240 if (ret
&& value
!= nullptr && value_found
!= nullptr && *value_found
) {
241 if (!SanityCheckTimestamp(*value
).ok() || !StripTS(value
).ok()) {
248 Status
DBWithTTLImpl::Merge(const WriteOptions
& options
,
249 ColumnFamilyHandle
* column_family
, const Slice
& key
,
250 const Slice
& value
) {
252 batch
.Merge(column_family
, key
, value
);
253 return Write(options
, &batch
);
256 Status
DBWithTTLImpl::Write(const WriteOptions
& opts
, WriteBatch
* updates
) {
257 class Handler
: public WriteBatch::Handler
{
259 explicit Handler(Env
* env
) : env_(env
) {}
260 WriteBatch updates_ttl
;
261 Status batch_rewrite_status
;
262 virtual Status
PutCF(uint32_t column_family_id
, const Slice
& key
,
263 const Slice
& value
) override
{
264 std::string value_with_ts
;
265 Status st
= AppendTS(value
, &value_with_ts
, env_
);
267 batch_rewrite_status
= st
;
269 WriteBatchInternal::Put(&updates_ttl
, column_family_id
, key
,
274 virtual Status
MergeCF(uint32_t column_family_id
, const Slice
& key
,
275 const Slice
& value
) override
{
276 std::string value_with_ts
;
277 Status st
= AppendTS(value
, &value_with_ts
, env_
);
279 batch_rewrite_status
= st
;
281 WriteBatchInternal::Merge(&updates_ttl
, column_family_id
, key
,
286 virtual Status
DeleteCF(uint32_t column_family_id
,
287 const Slice
& key
) override
{
288 WriteBatchInternal::Delete(&updates_ttl
, column_family_id
, key
);
291 virtual void LogData(const Slice
& blob
) override
{
292 updates_ttl
.PutLogData(blob
);
298 Handler
handler(GetEnv());
299 updates
->Iterate(&handler
);
300 if (!handler
.batch_rewrite_status
.ok()) {
301 return handler
.batch_rewrite_status
;
303 return db_
->Write(opts
, &(handler
.updates_ttl
));
307 Iterator
* DBWithTTLImpl::NewIterator(const ReadOptions
& opts
,
308 ColumnFamilyHandle
* column_family
) {
309 return new TtlIterator(db_
->NewIterator(opts
, column_family
));
312 } // namespace rocksdb
313 #endif // ROCKSDB_LITE