1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
12 #include "rocksdb/db.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/compaction_filter.h"
15 #include "rocksdb/merge_operator.h"
16 #include "rocksdb/utilities/utility_db.h"
17 #include "rocksdb/utilities/db_ttl.h"
18 #include "db/db_impl.h"
21 // Windows API macro interference
28 class DBWithTTLImpl
: public DBWithTTL
{
30 static void SanitizeOptions(int32_t ttl
, ColumnFamilyOptions
* options
,
33 explicit DBWithTTLImpl(DB
* db
);
35 virtual ~DBWithTTLImpl();
37 Status
CreateColumnFamilyWithTtl(const ColumnFamilyOptions
& options
,
38 const std::string
& column_family_name
,
39 ColumnFamilyHandle
** handle
,
42 Status
CreateColumnFamily(const ColumnFamilyOptions
& options
,
43 const std::string
& column_family_name
,
44 ColumnFamilyHandle
** handle
) override
;
46 using StackableDB::Put
;
47 virtual Status
Put(const WriteOptions
& options
,
48 ColumnFamilyHandle
* column_family
, const Slice
& key
,
49 const Slice
& val
) override
;
51 using StackableDB::Get
;
52 virtual Status
Get(const ReadOptions
& options
,
53 ColumnFamilyHandle
* column_family
, const Slice
& key
,
54 PinnableSlice
* value
) override
;
56 using StackableDB::MultiGet
;
57 virtual std::vector
<Status
> MultiGet(
58 const ReadOptions
& options
,
59 const std::vector
<ColumnFamilyHandle
*>& column_family
,
60 const std::vector
<Slice
>& keys
,
61 std::vector
<std::string
>* values
) override
;
63 using StackableDB::KeyMayExist
;
64 virtual bool KeyMayExist(const ReadOptions
& options
,
65 ColumnFamilyHandle
* column_family
, const Slice
& key
,
67 bool* value_found
= nullptr) override
;
69 using StackableDB::Merge
;
70 virtual Status
Merge(const WriteOptions
& options
,
71 ColumnFamilyHandle
* column_family
, const Slice
& key
,
72 const Slice
& value
) override
;
74 virtual Status
Write(const WriteOptions
& opts
, WriteBatch
* updates
) override
;
76 using StackableDB::NewIterator
;
77 virtual Iterator
* NewIterator(const ReadOptions
& opts
,
78 ColumnFamilyHandle
* column_family
) override
;
80 virtual DB
* GetBaseDB() override
{ return db_
; }
82 static bool IsStale(const Slice
& value
, int32_t ttl
, Env
* env
);
84 static Status
AppendTS(const Slice
& val
, std::string
* val_with_ts
, Env
* env
);
86 static Status
SanityCheckTimestamp(const Slice
& str
);
88 static Status
StripTS(std::string
* str
);
90 static Status
StripTS(PinnableSlice
* str
);
92 static const uint32_t kTSLength
= sizeof(int32_t); // size of timestamp
94 static const int32_t kMinTimestamp
= 1368146402; // 05/09/2013:5:40PM GMT-8
96 static const int32_t kMaxTimestamp
= 2147483647; // 01/18/2038:7:14PM GMT-8
99 class TtlIterator
: public Iterator
{
102 explicit TtlIterator(Iterator
* iter
) : iter_(iter
) { assert(iter_
); }
104 ~TtlIterator() { delete iter_
; }
106 bool Valid() const override
{ return iter_
->Valid(); }
108 void SeekToFirst() override
{ iter_
->SeekToFirst(); }
110 void SeekToLast() override
{ iter_
->SeekToLast(); }
112 void Seek(const Slice
& target
) override
{ iter_
->Seek(target
); }
114 void SeekForPrev(const Slice
& target
) override
{ iter_
->SeekForPrev(target
); }
116 void Next() override
{ iter_
->Next(); }
118 void Prev() override
{ iter_
->Prev(); }
120 Slice
key() const override
{ return iter_
->key(); }
122 int32_t timestamp() const {
123 return DecodeFixed32(iter_
->value().data() + iter_
->value().size() -
124 DBWithTTLImpl::kTSLength
);
127 Slice
value() const override
{
128 // TODO: handle timestamp corruption like in general iterator semantics
129 assert(DBWithTTLImpl::SanityCheckTimestamp(iter_
->value()).ok());
130 Slice trimmed_value
= iter_
->value();
131 trimmed_value
.size_
-= DBWithTTLImpl::kTSLength
;
132 return trimmed_value
;
135 Status
status() const override
{ return iter_
->status(); }
141 class TtlCompactionFilter
: public CompactionFilter
{
144 int32_t ttl
, Env
* env
, const CompactionFilter
* user_comp_filter
,
145 std::unique_ptr
<const CompactionFilter
> user_comp_filter_from_factory
=
149 user_comp_filter_(user_comp_filter
),
150 user_comp_filter_from_factory_(
151 std::move(user_comp_filter_from_factory
)) {
152 // Unlike the merge operator, compaction filter is necessary for TTL, hence
153 // this would be called even if user doesn't specify any compaction-filter
154 if (!user_comp_filter_
) {
155 user_comp_filter_
= user_comp_filter_from_factory_
.get();
159 virtual bool Filter(int level
, const Slice
& key
, const Slice
& old_val
,
160 std::string
* new_val
, bool* value_changed
) const
162 if (DBWithTTLImpl::IsStale(old_val
, ttl_
, env_
)) {
165 if (user_comp_filter_
== nullptr) {
168 assert(old_val
.size() >= DBWithTTLImpl::kTSLength
);
169 Slice
old_val_without_ts(old_val
.data(),
170 old_val
.size() - DBWithTTLImpl::kTSLength
);
171 if (user_comp_filter_
->Filter(level
, key
, old_val_without_ts
, new_val
,
175 if (*value_changed
) {
177 old_val
.data() + old_val
.size() - DBWithTTLImpl::kTSLength
,
178 DBWithTTLImpl::kTSLength
);
183 virtual const char* Name() const override
{ return "Delete By TTL"; }
188 const CompactionFilter
* user_comp_filter_
;
189 std::unique_ptr
<const CompactionFilter
> user_comp_filter_from_factory_
;
192 class TtlCompactionFilterFactory
: public CompactionFilterFactory
{
194 TtlCompactionFilterFactory(
195 int32_t ttl
, Env
* env
,
196 std::shared_ptr
<CompactionFilterFactory
> comp_filter_factory
)
197 : ttl_(ttl
), env_(env
), user_comp_filter_factory_(comp_filter_factory
) {}
199 virtual std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
200 const CompactionFilter::Context
& context
) override
{
201 std::unique_ptr
<const CompactionFilter
> user_comp_filter_from_factory
=
203 if (user_comp_filter_factory_
) {
204 user_comp_filter_from_factory
=
205 user_comp_filter_factory_
->CreateCompactionFilter(context
);
208 return std::unique_ptr
<TtlCompactionFilter
>(new TtlCompactionFilter(
209 ttl_
, env_
, nullptr, std::move(user_comp_filter_from_factory
)));
212 virtual const char* Name() const override
{
213 return "TtlCompactionFilterFactory";
219 std::shared_ptr
<CompactionFilterFactory
> user_comp_filter_factory_
;
222 class TtlMergeOperator
: public MergeOperator
{
225 explicit TtlMergeOperator(const std::shared_ptr
<MergeOperator
>& merge_op
,
227 : user_merge_op_(merge_op
), env_(env
) {
232 virtual bool FullMergeV2(const MergeOperationInput
& merge_in
,
233 MergeOperationOutput
* merge_out
) const override
{
234 const uint32_t ts_len
= DBWithTTLImpl::kTSLength
;
235 if (merge_in
.existing_value
&& merge_in
.existing_value
->size() < ts_len
) {
236 ROCKS_LOG_ERROR(merge_in
.logger
,
237 "Error: Could not remove timestamp from existing value.");
241 // Extract time-stamp from each operand to be passed to user_merge_op_
242 std::vector
<Slice
> operands_without_ts
;
243 for (const auto& operand
: merge_in
.operand_list
) {
244 if (operand
.size() < ts_len
) {
247 "Error: Could not remove timestamp from operand value.");
250 operands_without_ts
.push_back(operand
);
251 operands_without_ts
.back().remove_suffix(ts_len
);
254 // Apply the user merge operator (store result in *new_value)
256 MergeOperationOutput
user_merge_out(merge_out
->new_value
,
257 merge_out
->existing_operand
);
258 if (merge_in
.existing_value
) {
259 Slice
existing_value_without_ts(merge_in
.existing_value
->data(),
260 merge_in
.existing_value
->size() - ts_len
);
261 good
= user_merge_op_
->FullMergeV2(
262 MergeOperationInput(merge_in
.key
, &existing_value_without_ts
,
263 operands_without_ts
, merge_in
.logger
),
266 good
= user_merge_op_
->FullMergeV2(
267 MergeOperationInput(merge_in
.key
, nullptr, operands_without_ts
,
272 // Return false if the user merge operator returned false
277 if (merge_out
->existing_operand
.data()) {
278 merge_out
->new_value
.assign(merge_out
->existing_operand
.data(),
279 merge_out
->existing_operand
.size());
280 merge_out
->existing_operand
= Slice(nullptr, 0);
283 // Augment the *new_value with the ttl time-stamp
285 if (!env_
->GetCurrentTime(&curtime
).ok()) {
288 "Error: Could not get current time to be attached internally "
289 "to the new value.");
292 char ts_string
[ts_len
];
293 EncodeFixed32(ts_string
, (int32_t)curtime
);
294 merge_out
->new_value
.append(ts_string
, ts_len
);
299 virtual bool PartialMergeMulti(const Slice
& key
,
300 const std::deque
<Slice
>& operand_list
,
301 std::string
* new_value
, Logger
* logger
) const
303 const uint32_t ts_len
= DBWithTTLImpl::kTSLength
;
304 std::deque
<Slice
> operands_without_ts
;
306 for (const auto& operand
: operand_list
) {
307 if (operand
.size() < ts_len
) {
308 ROCKS_LOG_ERROR(logger
,
309 "Error: Could not remove timestamp from value.");
313 operands_without_ts
.push_back(
314 Slice(operand
.data(), operand
.size() - ts_len
));
317 // Apply the user partial-merge operator (store result in *new_value)
319 if (!user_merge_op_
->PartialMergeMulti(key
, operands_without_ts
, new_value
,
324 // Augment the *new_value with the ttl time-stamp
326 if (!env_
->GetCurrentTime(&curtime
).ok()) {
329 "Error: Could not get current time to be attached internally "
330 "to the new value.");
333 char ts_string
[ts_len
];
334 EncodeFixed32(ts_string
, (int32_t)curtime
);
335 new_value
->append(ts_string
, ts_len
);
340 virtual const char* Name() const override
{ return "Merge By TTL"; }
343 std::shared_ptr
<MergeOperator
> user_merge_op_
;
347 #endif // ROCKSDB_LITE