1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include <unordered_set>
10 #include "db/blob/blob_index.h"
11 #include "monitoring/statistics.h"
12 #include "rocksdb/compaction_filter.h"
13 #include "rocksdb/env.h"
14 #include "utilities/blob_db/blob_db_gc_stats.h"
15 #include "utilities/blob_db/blob_db_impl.h"
16 #include "utilities/compaction_filters/layered_compaction_filter_base.h"
18 namespace ROCKSDB_NAMESPACE
{
21 struct BlobCompactionContext
{
22 BlobDBImpl
* blob_db_impl
= nullptr;
23 uint64_t next_file_number
= 0;
24 std::unordered_set
<uint64_t> current_blob_files
;
25 SequenceNumber fifo_eviction_seq
= 0;
26 uint64_t evict_expiration_up_to
= 0;
29 struct BlobCompactionContextGC
{
30 uint64_t cutoff_file_number
= 0;
33 // Compaction filter that deletes expired blob indexes from the base DB.
34 // Comes into two varieties, one for the non-GC case and one for the GC case.
35 class BlobIndexCompactionFilterBase
: public LayeredCompactionFilterBase
{
37 BlobIndexCompactionFilterBase(
38 BlobCompactionContext
&& _context
,
39 const CompactionFilter
* _user_comp_filter
,
40 std::unique_ptr
<const CompactionFilter
> _user_comp_filter_from_factory
,
41 uint64_t current_time
, Statistics
* stats
)
42 : LayeredCompactionFilterBase(_user_comp_filter
,
43 std::move(_user_comp_filter_from_factory
)),
44 context_(std::move(_context
)),
45 current_time_(current_time
),
48 ~BlobIndexCompactionFilterBase() override
;
50 // Filter expired blob indexes regardless of snapshots.
51 bool IgnoreSnapshots() const override
{ return true; }
53 Decision
FilterV2(int level
, const Slice
& key
, ValueType value_type
,
54 const Slice
& value
, std::string
* new_value
,
55 std::string
* skip_until
) const override
;
58 bool IsBlobFileOpened() const;
59 virtual bool OpenNewBlobFileIfNeeded() const;
60 bool ReadBlobFromOldFile(const Slice
& key
, const BlobIndex
& blob_index
,
61 PinnableSlice
* blob
, bool need_decompress
,
62 CompressionType
* compression_type
) const;
63 bool WriteBlobToNewFile(const Slice
& key
, const Slice
& blob
,
64 uint64_t* new_blob_file_number
,
65 uint64_t* new_blob_offset
) const;
66 bool CloseAndRegisterNewBlobFileIfNeeded() const;
67 bool CloseAndRegisterNewBlobFile() const;
69 Statistics
* statistics() const { return statistics_
; }
70 const BlobCompactionContext
& context() const { return context_
; }
73 Decision
HandleValueChange(const Slice
& key
, std::string
* new_value
) const;
76 BlobCompactionContext context_
;
77 const uint64_t current_time_
;
78 Statistics
* statistics_
;
80 mutable std::shared_ptr
<BlobFile
> blob_file_
;
81 mutable std::shared_ptr
<BlobLogWriter
> writer_
;
83 // It is safe to not using std::atomic since the compaction filter, created
84 // from a compaction filter factroy, will not be called from multiple threads.
85 mutable uint64_t expired_count_
= 0;
86 mutable uint64_t expired_size_
= 0;
87 mutable uint64_t evicted_count_
= 0;
88 mutable uint64_t evicted_size_
= 0;
91 class BlobIndexCompactionFilter
: public BlobIndexCompactionFilterBase
{
93 BlobIndexCompactionFilter(
94 BlobCompactionContext
&& _context
,
95 const CompactionFilter
* _user_comp_filter
,
96 std::unique_ptr
<const CompactionFilter
> _user_comp_filter_from_factory
,
97 uint64_t current_time
, Statistics
* stats
)
98 : BlobIndexCompactionFilterBase(std::move(_context
), _user_comp_filter
,
99 std::move(_user_comp_filter_from_factory
),
100 current_time
, stats
) {}
102 const char* Name() const override
{ return "BlobIndexCompactionFilter"; }
105 class BlobIndexCompactionFilterGC
: public BlobIndexCompactionFilterBase
{
107 BlobIndexCompactionFilterGC(
108 BlobCompactionContext
&& _context
, BlobCompactionContextGC
&& context_gc
,
109 const CompactionFilter
* _user_comp_filter
,
110 std::unique_ptr
<const CompactionFilter
> _user_comp_filter_from_factory
,
111 uint64_t current_time
, Statistics
* stats
)
112 : BlobIndexCompactionFilterBase(std::move(_context
), _user_comp_filter
,
113 std::move(_user_comp_filter_from_factory
),
114 current_time
, stats
),
115 context_gc_(std::move(context_gc
)) {}
117 ~BlobIndexCompactionFilterGC() override
;
119 const char* Name() const override
{ return "BlobIndexCompactionFilterGC"; }
121 BlobDecision
PrepareBlobOutput(const Slice
& key
, const Slice
& existing_value
,
122 std::string
* new_value
) const override
;
125 bool OpenNewBlobFileIfNeeded() const override
;
128 BlobCompactionContextGC context_gc_
;
129 mutable BlobDBGarbageCollectionStats gc_stats_
;
132 // Compaction filter factory; similarly to the filters above, it comes
133 // in two flavors, one that creates filters that support GC, and one
134 // that creates non-GC filters.
135 class BlobIndexCompactionFilterFactoryBase
: public CompactionFilterFactory
{
137 BlobIndexCompactionFilterFactoryBase(BlobDBImpl
* _blob_db_impl
, Env
* _env
,
138 const ColumnFamilyOptions
& _cf_options
,
139 Statistics
* _statistics
)
140 : blob_db_impl_(_blob_db_impl
),
142 statistics_(_statistics
),
143 user_comp_filter_(_cf_options
.compaction_filter
),
144 user_comp_filter_factory_(_cf_options
.compaction_filter_factory
) {}
147 std::unique_ptr
<CompactionFilter
> CreateUserCompactionFilterFromFactory(
148 const CompactionFilter::Context
& context
) const;
150 BlobDBImpl
* blob_db_impl() const { return blob_db_impl_
; }
151 Env
* env() const { return env_
; }
152 Statistics
* statistics() const { return statistics_
; }
153 const CompactionFilter
* user_comp_filter() const { return user_comp_filter_
; }
156 BlobDBImpl
* blob_db_impl_
;
158 Statistics
* statistics_
;
159 const CompactionFilter
* user_comp_filter_
;
160 std::shared_ptr
<CompactionFilterFactory
> user_comp_filter_factory_
;
163 class BlobIndexCompactionFilterFactory
164 : public BlobIndexCompactionFilterFactoryBase
{
166 BlobIndexCompactionFilterFactory(BlobDBImpl
* _blob_db_impl
, Env
* _env
,
167 const ColumnFamilyOptions
& _cf_options
,
168 Statistics
* _statistics
)
169 : BlobIndexCompactionFilterFactoryBase(_blob_db_impl
, _env
, _cf_options
,
172 const char* Name() const override
{
173 return "BlobIndexCompactionFilterFactory";
176 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
177 const CompactionFilter::Context
& context
) override
;
180 class BlobIndexCompactionFilterFactoryGC
181 : public BlobIndexCompactionFilterFactoryBase
{
183 BlobIndexCompactionFilterFactoryGC(BlobDBImpl
* _blob_db_impl
, Env
* _env
,
184 const ColumnFamilyOptions
& _cf_options
,
185 Statistics
* _statistics
)
186 : BlobIndexCompactionFilterFactoryBase(_blob_db_impl
, _env
, _cf_options
,
189 const char* Name() const override
{
190 return "BlobIndexCompactionFilterFactoryGC";
193 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
194 const CompactionFilter::Context
& context
) override
;
197 } // namespace blob_db
198 } // namespace ROCKSDB_NAMESPACE
199 #endif // ROCKSDB_LITE