]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/blob_db/blob_compaction_filter.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_compaction_filter.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 #ifndef ROCKSDB_LITE
7
8 #include <unordered_set>
9
10 #include "db/blob/blob_index.h"
11 #include "monitoring/statistics.h"
12 #include "rocksdb/compaction_filter.h"
13 #include "rocksdb/env.h"
14 #include "utilities/blob_db/blob_db_gc_stats.h"
15 #include "utilities/blob_db/blob_db_impl.h"
16 #include "utilities/compaction_filters/layered_compaction_filter_base.h"
17
18 namespace ROCKSDB_NAMESPACE {
19 namespace blob_db {
20
21 struct BlobCompactionContext {
22 BlobDBImpl* blob_db_impl = nullptr;
23 uint64_t next_file_number = 0;
24 std::unordered_set<uint64_t> current_blob_files;
25 SequenceNumber fifo_eviction_seq = 0;
26 uint64_t evict_expiration_up_to = 0;
27 };
28
29 struct BlobCompactionContextGC {
30 uint64_t cutoff_file_number = 0;
31 };
32
33 // Compaction filter that deletes expired blob indexes from the base DB.
34 // Comes into two varieties, one for the non-GC case and one for the GC case.
35 class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase {
36 public:
37 BlobIndexCompactionFilterBase(
38 BlobCompactionContext&& _context,
39 const CompactionFilter* _user_comp_filter,
40 std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
41 uint64_t current_time, Statistics* stats)
42 : LayeredCompactionFilterBase(_user_comp_filter,
43 std::move(_user_comp_filter_from_factory)),
44 context_(std::move(_context)),
45 current_time_(current_time),
46 statistics_(stats) {}
47
48 ~BlobIndexCompactionFilterBase() override;
49
50 // Filter expired blob indexes regardless of snapshots.
51 bool IgnoreSnapshots() const override { return true; }
52
53 Decision FilterV2(int level, const Slice& key, ValueType value_type,
54 const Slice& value, std::string* new_value,
55 std::string* skip_until) const override;
56
57 protected:
58 bool IsBlobFileOpened() const;
59 virtual bool OpenNewBlobFileIfNeeded() const;
60 bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
61 PinnableSlice* blob, bool need_decompress,
62 CompressionType* compression_type) const;
63 bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
64 uint64_t* new_blob_file_number,
65 uint64_t* new_blob_offset) const;
66 bool CloseAndRegisterNewBlobFileIfNeeded() const;
67 bool CloseAndRegisterNewBlobFile() const;
68
69 Statistics* statistics() const { return statistics_; }
70 const BlobCompactionContext& context() const { return context_; }
71
72 private:
73 Decision HandleValueChange(const Slice& key, std::string* new_value) const;
74
75 private:
76 BlobCompactionContext context_;
77 const uint64_t current_time_;
78 Statistics* statistics_;
79
80 mutable std::shared_ptr<BlobFile> blob_file_;
81 mutable std::shared_ptr<BlobLogWriter> writer_;
82
83 // It is safe to not using std::atomic since the compaction filter, created
84 // from a compaction filter factroy, will not be called from multiple threads.
85 mutable uint64_t expired_count_ = 0;
86 mutable uint64_t expired_size_ = 0;
87 mutable uint64_t evicted_count_ = 0;
88 mutable uint64_t evicted_size_ = 0;
89 };
90
91 class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
92 public:
93 BlobIndexCompactionFilter(
94 BlobCompactionContext&& _context,
95 const CompactionFilter* _user_comp_filter,
96 std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
97 uint64_t current_time, Statistics* stats)
98 : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
99 std::move(_user_comp_filter_from_factory),
100 current_time, stats) {}
101
102 const char* Name() const override { return "BlobIndexCompactionFilter"; }
103 };
104
105 class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
106 public:
107 BlobIndexCompactionFilterGC(
108 BlobCompactionContext&& _context, BlobCompactionContextGC&& context_gc,
109 const CompactionFilter* _user_comp_filter,
110 std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
111 uint64_t current_time, Statistics* stats)
112 : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
113 std::move(_user_comp_filter_from_factory),
114 current_time, stats),
115 context_gc_(std::move(context_gc)) {}
116
117 ~BlobIndexCompactionFilterGC() override;
118
119 const char* Name() const override { return "BlobIndexCompactionFilterGC"; }
120
121 BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value,
122 std::string* new_value) const override;
123
124 private:
125 bool OpenNewBlobFileIfNeeded() const override;
126
127 private:
128 BlobCompactionContextGC context_gc_;
129 mutable BlobDBGarbageCollectionStats gc_stats_;
130 };
131
132 // Compaction filter factory; similarly to the filters above, it comes
133 // in two flavors, one that creates filters that support GC, and one
134 // that creates non-GC filters.
135 class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
136 public:
137 BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, Env* _env,
138 const ColumnFamilyOptions& _cf_options,
139 Statistics* _statistics)
140 : blob_db_impl_(_blob_db_impl),
141 env_(_env),
142 statistics_(_statistics),
143 user_comp_filter_(_cf_options.compaction_filter),
144 user_comp_filter_factory_(_cf_options.compaction_filter_factory) {}
145
146 protected:
147 std::unique_ptr<CompactionFilter> CreateUserCompactionFilterFromFactory(
148 const CompactionFilter::Context& context) const;
149
150 BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
151 Env* env() const { return env_; }
152 Statistics* statistics() const { return statistics_; }
153 const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
154
155 private:
156 BlobDBImpl* blob_db_impl_;
157 Env* env_;
158 Statistics* statistics_;
159 const CompactionFilter* user_comp_filter_;
160 std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
161 };
162
163 class BlobIndexCompactionFilterFactory
164 : public BlobIndexCompactionFilterFactoryBase {
165 public:
166 BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, Env* _env,
167 const ColumnFamilyOptions& _cf_options,
168 Statistics* _statistics)
169 : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
170 _statistics) {}
171
172 const char* Name() const override {
173 return "BlobIndexCompactionFilterFactory";
174 }
175
176 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
177 const CompactionFilter::Context& context) override;
178 };
179
180 class BlobIndexCompactionFilterFactoryGC
181 : public BlobIndexCompactionFilterFactoryBase {
182 public:
183 BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, Env* _env,
184 const ColumnFamilyOptions& _cf_options,
185 Statistics* _statistics)
186 : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
187 _statistics) {}
188
189 const char* Name() const override {
190 return "BlobIndexCompactionFilterFactoryGC";
191 }
192
193 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
194 const CompactionFilter::Context& context) override;
195 };
196
197 } // namespace blob_db
198 } // namespace ROCKSDB_NAMESPACE
199 #endif // ROCKSDB_LITE