]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/blob_db/blob_compaction_filter.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_compaction_filter.h
index 409df26ac85fa22b4a62933088ee3f95bb5fa51e..ab1be03d70e19ca94a9c864af5c91590e07aa8cc 100644 (file)
@@ -7,17 +7,19 @@
 
 #include <unordered_set>
 
-#include "db/blob_index.h"
+#include "db/blob/blob_index.h"
 #include "monitoring/statistics.h"
 #include "rocksdb/compaction_filter.h"
 #include "rocksdb/env.h"
 #include "utilities/blob_db/blob_db_gc_stats.h"
 #include "utilities/blob_db/blob_db_impl.h"
+#include "utilities/compaction_filters/layered_compaction_filter_base.h"
 
 namespace ROCKSDB_NAMESPACE {
 namespace blob_db {
 
 struct BlobCompactionContext {
+  BlobDBImpl* blob_db_impl = nullptr;
   uint64_t next_file_number = 0;
   std::unordered_set<uint64_t> current_blob_files;
   SequenceNumber fifo_eviction_seq = 0;
@@ -25,41 +27,59 @@ struct BlobCompactionContext {
 };
 
 struct BlobCompactionContextGC {
-  BlobDBImpl* blob_db_impl = nullptr;
   uint64_t cutoff_file_number = 0;
 };
 
 // Compaction filter that deletes expired blob indexes from the base DB.
 // Comes into two varieties, one for the non-GC case and one for the GC case.
-class BlobIndexCompactionFilterBase : public CompactionFilter {
+class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase {
  public:
-  BlobIndexCompactionFilterBase(BlobCompactionContext&& context,
-                                uint64_t current_time, Statistics* stats)
-      : context_(std::move(context)),
+  BlobIndexCompactionFilterBase(
+      BlobCompactionContext&& _context,
+      const CompactionFilter* _user_comp_filter,
+      std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
+      uint64_t current_time, Statistics* stats)
+      : LayeredCompactionFilterBase(_user_comp_filter,
+                                    std::move(_user_comp_filter_from_factory)),
+        context_(std::move(_context)),
         current_time_(current_time),
         statistics_(stats) {}
 
-  ~BlobIndexCompactionFilterBase() override {
-    RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
-    RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
-    RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
-    RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
-  }
+  ~BlobIndexCompactionFilterBase() override;
 
   // Filter expired blob indexes regardless of snapshots.
   bool IgnoreSnapshots() const override { return true; }
 
-  Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type,
-                    const Slice& value, std::string* /*new_value*/,
-                    std::string* /*skip_until*/) const override;
+  Decision FilterV2(int level, const Slice& key, ValueType value_type,
+                    const Slice& value, std::string* new_value,
+                    std::string* skip_until) const override;
 
  protected:
+  bool IsBlobFileOpened() const;
+  virtual bool OpenNewBlobFileIfNeeded() const;
+  bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
+                           PinnableSlice* blob, bool need_decompress,
+                           CompressionType* compression_type) const;
+  bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
+                          uint64_t* new_blob_file_number,
+                          uint64_t* new_blob_offset) const;
+  bool CloseAndRegisterNewBlobFileIfNeeded() const;
+  bool CloseAndRegisterNewBlobFile() const;
+
   Statistics* statistics() const { return statistics_; }
+  const BlobCompactionContext& context() const { return context_; }
+
+ private:
+  Decision HandleValueChange(const Slice& key, std::string* new_value) const;
 
  private:
   BlobCompactionContext context_;
   const uint64_t current_time_;
   Statistics* statistics_;
+
+  mutable std::shared_ptr<BlobFile> blob_file_;
+  mutable std::shared_ptr<BlobLogWriter> writer_;
+
   // It is safe to not using std::atomic since the compaction filter, created
   // from a compaction filter factroy, will not be called from multiple threads.
   mutable uint64_t expired_count_ = 0;
@@ -70,20 +90,28 @@ class BlobIndexCompactionFilterBase : public CompactionFilter {
 
 class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
  public:
-  BlobIndexCompactionFilter(BlobCompactionContext&& context,
-                            uint64_t current_time, Statistics* stats)
-      : BlobIndexCompactionFilterBase(std::move(context), current_time, stats) {
-  }
+  BlobIndexCompactionFilter(
+      BlobCompactionContext&& _context,
+      const CompactionFilter* _user_comp_filter,
+      std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
+      uint64_t current_time, Statistics* stats)
+      : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
+                                      std::move(_user_comp_filter_from_factory),
+                                      current_time, stats) {}
 
   const char* Name() const override { return "BlobIndexCompactionFilter"; }
 };
 
 class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
  public:
-  BlobIndexCompactionFilterGC(BlobCompactionContext&& context,
-                              BlobCompactionContextGC&& context_gc,
-                              uint64_t current_time, Statistics* stats)
-      : BlobIndexCompactionFilterBase(std::move(context), current_time, stats),
+  BlobIndexCompactionFilterGC(
+      BlobCompactionContext&& _context, BlobCompactionContextGC&& context_gc,
+      const CompactionFilter* _user_comp_filter,
+      std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
+      uint64_t current_time, Statistics* stats)
+      : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
+                                      std::move(_user_comp_filter_from_factory),
+                                      current_time, stats),
         context_gc_(std::move(context_gc)) {}
 
   ~BlobIndexCompactionFilterGC() override;
@@ -94,20 +122,10 @@ class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
                                  std::string* new_value) const override;
 
  private:
-  bool OpenNewBlobFileIfNeeded() const;
-  bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
-                           PinnableSlice* blob,
-                           CompressionType* compression_type) const;
-  bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
-                          uint64_t* new_blob_file_number,
-                          uint64_t* new_blob_offset) const;
-  bool CloseAndRegisterNewBlobFileIfNeeded() const;
-  bool CloseAndRegisterNewBlobFile() const;
+  bool OpenNewBlobFileIfNeeded() const override;
 
  private:
   BlobCompactionContextGC context_gc_;
-  mutable std::shared_ptr<BlobFile> blob_file_;
-  mutable std::shared_ptr<Writer> writer_;
   mutable BlobDBGarbageCollectionStats gc_stats_;
 };
 
@@ -117,50 +135,63 @@ class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
 class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
  public:
   BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, Env* _env,
+                                       const ColumnFamilyOptions& _cf_options,
                                        Statistics* _statistics)
-      : blob_db_impl_(_blob_db_impl), env_(_env), statistics_(_statistics) {}
+      : blob_db_impl_(_blob_db_impl),
+        env_(_env),
+        statistics_(_statistics),
+        user_comp_filter_(_cf_options.compaction_filter),
+        user_comp_filter_factory_(_cf_options.compaction_filter_factory) {}
 
  protected:
+  std::unique_ptr<CompactionFilter> CreateUserCompactionFilterFromFactory(
+      const CompactionFilter::Context& context) const;
+
   BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
   Env* env() const { return env_; }
   Statistics* statistics() const { return statistics_; }
+  const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
 
  private:
   BlobDBImpl* blob_db_impl_;
   Env* env_;
   Statistics* statistics_;
+  const CompactionFilter* user_comp_filter_;
+  std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
 };
 
 class BlobIndexCompactionFilterFactory
     : public BlobIndexCompactionFilterFactoryBase {
  public:
   BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, Env* _env,
+                                   const ColumnFamilyOptions& _cf_options,
                                    Statistics* _statistics)
-      : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
-  }
+      : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
+                                             _statistics) {}
 
   const char* Name() const override {
     return "BlobIndexCompactionFilterFactory";
   }
 
   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
-      const CompactionFilter::Context& /*context*/) override;
+      const CompactionFilter::Context& context) override;
 };
 
 class BlobIndexCompactionFilterFactoryGC
     : public BlobIndexCompactionFilterFactoryBase {
  public:
   BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, Env* _env,
+                                     const ColumnFamilyOptions& _cf_options,
                                      Statistics* _statistics)
-      : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
-  }
+      : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
+                                             _statistics) {}
 
   const char* Name() const override {
     return "BlobIndexCompactionFilterFactoryGC";
   }
 
   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
-      const CompactionFilter::Context& /*context*/) override;
+      const CompactionFilter::Context& context) override;
 };
 
 }  // namespace blob_db