1 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file. See the AUTHORS file for names of contributors.
13 #include "db/db_impl/db_impl.h"
14 #include "rocksdb/compaction_filter.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/merge_operator.h"
17 #include "rocksdb/system_clock.h"
18 #include "rocksdb/utilities/db_ttl.h"
19 #include "utilities/compaction_filters/layered_compaction_filter_base.h"
22 // Windows API macro interference
26 namespace ROCKSDB_NAMESPACE
{
30 class DBWithTTLImpl
: public DBWithTTL
{
32 static void SanitizeOptions(int32_t ttl
, ColumnFamilyOptions
* options
,
35 static void RegisterTtlClasses();
36 explicit DBWithTTLImpl(DB
* db
);
38 virtual ~DBWithTTLImpl();
40 virtual Status
Close() override
;
42 Status
CreateColumnFamilyWithTtl(const ColumnFamilyOptions
& options
,
43 const std::string
& column_family_name
,
44 ColumnFamilyHandle
** handle
,
47 Status
CreateColumnFamily(const ColumnFamilyOptions
& options
,
48 const std::string
& column_family_name
,
49 ColumnFamilyHandle
** handle
) override
;
51 using StackableDB::Put
;
52 virtual Status
Put(const WriteOptions
& options
,
53 ColumnFamilyHandle
* column_family
, const Slice
& key
,
54 const Slice
& val
) override
;
56 using StackableDB::Get
;
57 virtual Status
Get(const ReadOptions
& options
,
58 ColumnFamilyHandle
* column_family
, const Slice
& key
,
59 PinnableSlice
* value
) override
;
61 using StackableDB::MultiGet
;
62 virtual std::vector
<Status
> MultiGet(
63 const ReadOptions
& options
,
64 const std::vector
<ColumnFamilyHandle
*>& column_family
,
65 const std::vector
<Slice
>& keys
,
66 std::vector
<std::string
>* values
) override
;
68 using StackableDB::KeyMayExist
;
69 virtual bool KeyMayExist(const ReadOptions
& options
,
70 ColumnFamilyHandle
* column_family
, const Slice
& key
,
72 bool* value_found
= nullptr) override
;
74 using StackableDB::Merge
;
75 virtual Status
Merge(const WriteOptions
& options
,
76 ColumnFamilyHandle
* column_family
, const Slice
& key
,
77 const Slice
& value
) override
;
79 virtual Status
Write(const WriteOptions
& opts
, WriteBatch
* updates
) override
;
81 using StackableDB::NewIterator
;
82 virtual Iterator
* NewIterator(const ReadOptions
& opts
,
83 ColumnFamilyHandle
* column_family
) override
;
85 virtual DB
* GetBaseDB() override
{ return db_
; }
87 static bool IsStale(const Slice
& value
, int32_t ttl
, SystemClock
* clock
);
89 static Status
AppendTS(const Slice
& val
, std::string
* val_with_ts
,
92 static Status
SanityCheckTimestamp(const Slice
& str
);
94 static Status
StripTS(std::string
* str
);
96 static Status
StripTS(PinnableSlice
* str
);
98 static const uint32_t kTSLength
= sizeof(int32_t); // size of timestamp
100 static const int32_t kMinTimestamp
= 1368146402; // 05/09/2013:5:40PM GMT-8
102 static const int32_t kMaxTimestamp
= 2147483647; // 01/18/2038:7:14PM GMT-8
104 void SetTtl(int32_t ttl
) override
{ SetTtl(DefaultColumnFamily(), ttl
); }
106 void SetTtl(ColumnFamilyHandle
* h
, int32_t ttl
) override
;
109 // remember whether the Close completes or not
113 class TtlIterator
: public Iterator
{
115 explicit TtlIterator(Iterator
* iter
) : iter_(iter
) { assert(iter_
); }
117 ~TtlIterator() { delete iter_
; }
119 bool Valid() const override
{ return iter_
->Valid(); }
121 void SeekToFirst() override
{ iter_
->SeekToFirst(); }
123 void SeekToLast() override
{ iter_
->SeekToLast(); }
125 void Seek(const Slice
& target
) override
{ iter_
->Seek(target
); }
127 void SeekForPrev(const Slice
& target
) override
{ iter_
->SeekForPrev(target
); }
129 void Next() override
{ iter_
->Next(); }
131 void Prev() override
{ iter_
->Prev(); }
133 Slice
key() const override
{ return iter_
->key(); }
135 int32_t ttl_timestamp() const {
136 return DecodeFixed32(iter_
->value().data() + iter_
->value().size() -
137 DBWithTTLImpl::kTSLength
);
140 Slice
value() const override
{
141 // TODO: handle timestamp corruption like in general iterator semantics
142 assert(DBWithTTLImpl::SanityCheckTimestamp(iter_
->value()).ok());
143 Slice trimmed_value
= iter_
->value();
144 trimmed_value
.size_
-= DBWithTTLImpl::kTSLength
;
145 return trimmed_value
;
148 Status
status() const override
{ return iter_
->status(); }
154 class TtlCompactionFilter
: public LayeredCompactionFilterBase
{
156 TtlCompactionFilter(int32_t ttl
, SystemClock
* clock
,
157 const CompactionFilter
* _user_comp_filter
,
158 std::unique_ptr
<const CompactionFilter
>
159 _user_comp_filter_from_factory
= nullptr);
161 virtual bool Filter(int level
, const Slice
& key
, const Slice
& old_val
,
162 std::string
* new_val
, bool* value_changed
) const override
;
164 const char* Name() const override
{ return kClassName(); }
165 static const char* kClassName() { return "TtlCompactionFilter"; }
166 bool IsInstanceOf(const std::string
& name
) const override
{
167 if (name
== "Delete By TTL") {
170 return LayeredCompactionFilterBase::IsInstanceOf(name
);
174 Status
PrepareOptions(const ConfigOptions
& config_options
) override
;
175 Status
ValidateOptions(const DBOptions
& db_opts
,
176 const ColumnFamilyOptions
& cf_opts
) const override
;
183 class TtlCompactionFilterFactory
: public CompactionFilterFactory
{
185 TtlCompactionFilterFactory(
186 int32_t ttl
, SystemClock
* clock
,
187 std::shared_ptr
<CompactionFilterFactory
> comp_filter_factory
);
189 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
190 const CompactionFilter::Context
& context
) override
;
191 void SetTtl(int32_t ttl
) { ttl_
= ttl
; }
193 const char* Name() const override
{ return kClassName(); }
194 static const char* kClassName() { return "TtlCompactionFilterFactory"; }
195 Status
PrepareOptions(const ConfigOptions
& config_options
) override
;
196 Status
ValidateOptions(const DBOptions
& db_opts
,
197 const ColumnFamilyOptions
& cf_opts
) const override
;
198 const Customizable
* Inner() const override
{
199 return user_comp_filter_factory_
.get();
205 std::shared_ptr
<CompactionFilterFactory
> user_comp_filter_factory_
;
208 class TtlMergeOperator
: public MergeOperator
{
210 explicit TtlMergeOperator(const std::shared_ptr
<MergeOperator
>& merge_op
,
213 bool FullMergeV2(const MergeOperationInput
& merge_in
,
214 MergeOperationOutput
* merge_out
) const override
;
216 bool PartialMergeMulti(const Slice
& key
,
217 const std::deque
<Slice
>& operand_list
,
218 std::string
* new_value
, Logger
* logger
) const override
;
220 static const char* kClassName() { return "TtlMergeOperator"; }
222 const char* Name() const override
{ return kClassName(); }
223 bool IsInstanceOf(const std::string
& name
) const override
{
224 if (name
== "Merge By TTL") {
227 return MergeOperator::IsInstanceOf(name
);
231 Status
PrepareOptions(const ConfigOptions
& config_options
) override
;
232 Status
ValidateOptions(const DBOptions
& db_opts
,
233 const ColumnFamilyOptions
& cf_opts
) const override
;
234 const Customizable
* Inner() const override
{ return user_merge_op_
.get(); }
237 std::shared_ptr
<MergeOperator
> user_merge_op_
;
241 int RegisterTtlObjects(ObjectLibrary
& library
, const std::string
& /*arg*/);
244 } // namespace ROCKSDB_NAMESPACE
245 #endif // ROCKSDB_LITE