#include <unordered_set>
-#include "db/blob_index.h"
+#include "db/blob/blob_index.h"
#include "monitoring/statistics.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "utilities/blob_db/blob_db_gc_stats.h"
#include "utilities/blob_db/blob_db_impl.h"
+#include "utilities/compaction_filters/layered_compaction_filter_base.h"
namespace ROCKSDB_NAMESPACE {
namespace blob_db {
struct BlobCompactionContext {
+ BlobDBImpl* blob_db_impl = nullptr;
uint64_t next_file_number = 0;
std::unordered_set<uint64_t> current_blob_files;
SequenceNumber fifo_eviction_seq = 0;
};
struct BlobCompactionContextGC {
- BlobDBImpl* blob_db_impl = nullptr;
uint64_t cutoff_file_number = 0;
};
// Compaction filter that deletes expired blob indexes from the base DB.
// Comes into two varieties, one for the non-GC case and one for the GC case.
-class BlobIndexCompactionFilterBase : public CompactionFilter {
+class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase {
public:
- BlobIndexCompactionFilterBase(BlobCompactionContext&& context,
- uint64_t current_time, Statistics* stats)
- : context_(std::move(context)),
+ BlobIndexCompactionFilterBase(
+ BlobCompactionContext&& _context,
+ const CompactionFilter* _user_comp_filter,
+ std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
+ uint64_t current_time, Statistics* stats)
+ : LayeredCompactionFilterBase(_user_comp_filter,
+ std::move(_user_comp_filter_from_factory)),
+ context_(std::move(_context)),
current_time_(current_time),
statistics_(stats) {}
- ~BlobIndexCompactionFilterBase() override {
- RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
- RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
- RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
- RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
- }
+ ~BlobIndexCompactionFilterBase() override;
// Filter expired blob indexes regardless of snapshots.
bool IgnoreSnapshots() const override { return true; }
- Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type,
- const Slice& value, std::string* /*new_value*/,
- std::string* /*skip_until*/) const override;
+ Decision FilterV2(int level, const Slice& key, ValueType value_type,
+ const Slice& value, std::string* new_value,
+ std::string* skip_until) const override;
protected:
+ bool IsBlobFileOpened() const;
+ virtual bool OpenNewBlobFileIfNeeded() const;
+ bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
+ PinnableSlice* blob, bool need_decompress,
+ CompressionType* compression_type) const;
+ bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
+ uint64_t* new_blob_file_number,
+ uint64_t* new_blob_offset) const;
+ bool CloseAndRegisterNewBlobFileIfNeeded() const;
+ bool CloseAndRegisterNewBlobFile() const;
+
Statistics* statistics() const { return statistics_; }
+ const BlobCompactionContext& context() const { return context_; }
+
+ private:
+ Decision HandleValueChange(const Slice& key, std::string* new_value) const;
private:
BlobCompactionContext context_;
const uint64_t current_time_;
Statistics* statistics_;
+
+ mutable std::shared_ptr<BlobFile> blob_file_;
+ mutable std::shared_ptr<BlobLogWriter> writer_;
+
// It is safe to not using std::atomic since the compaction filter, created
// from a compaction filter factroy, will not be called from multiple threads.
mutable uint64_t expired_count_ = 0;
class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
public:
- BlobIndexCompactionFilter(BlobCompactionContext&& context,
- uint64_t current_time, Statistics* stats)
- : BlobIndexCompactionFilterBase(std::move(context), current_time, stats) {
- }
+ BlobIndexCompactionFilter(
+ BlobCompactionContext&& _context,
+ const CompactionFilter* _user_comp_filter,
+ std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
+ uint64_t current_time, Statistics* stats)
+ : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
+ std::move(_user_comp_filter_from_factory),
+ current_time, stats) {}
const char* Name() const override { return "BlobIndexCompactionFilter"; }
};
class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
public:
- BlobIndexCompactionFilterGC(BlobCompactionContext&& context,
- BlobCompactionContextGC&& context_gc,
- uint64_t current_time, Statistics* stats)
- : BlobIndexCompactionFilterBase(std::move(context), current_time, stats),
+ BlobIndexCompactionFilterGC(
+ BlobCompactionContext&& _context, BlobCompactionContextGC&& context_gc,
+ const CompactionFilter* _user_comp_filter,
+ std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
+ uint64_t current_time, Statistics* stats)
+ : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
+ std::move(_user_comp_filter_from_factory),
+ current_time, stats),
context_gc_(std::move(context_gc)) {}
~BlobIndexCompactionFilterGC() override;
std::string* new_value) const override;
private:
- bool OpenNewBlobFileIfNeeded() const;
- bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
- PinnableSlice* blob,
- CompressionType* compression_type) const;
- bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
- uint64_t* new_blob_file_number,
- uint64_t* new_blob_offset) const;
- bool CloseAndRegisterNewBlobFileIfNeeded() const;
- bool CloseAndRegisterNewBlobFile() const;
+ bool OpenNewBlobFileIfNeeded() const override;
private:
BlobCompactionContextGC context_gc_;
- mutable std::shared_ptr<BlobFile> blob_file_;
- mutable std::shared_ptr<Writer> writer_;
mutable BlobDBGarbageCollectionStats gc_stats_;
};
class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
public:
BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, Env* _env,
+ const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
- : blob_db_impl_(_blob_db_impl), env_(_env), statistics_(_statistics) {}
+ : blob_db_impl_(_blob_db_impl),
+ env_(_env),
+ statistics_(_statistics),
+ user_comp_filter_(_cf_options.compaction_filter),
+ user_comp_filter_factory_(_cf_options.compaction_filter_factory) {}
protected:
+ std::unique_ptr<CompactionFilter> CreateUserCompactionFilterFromFactory(
+ const CompactionFilter::Context& context) const;
+
BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
Env* env() const { return env_; }
Statistics* statistics() const { return statistics_; }
+ const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
private:
BlobDBImpl* blob_db_impl_;
Env* env_;
Statistics* statistics_;
+ const CompactionFilter* user_comp_filter_;
+ std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};
class BlobIndexCompactionFilterFactory
: public BlobIndexCompactionFilterFactoryBase {
public:
BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, Env* _env,
+ const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
- : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
- }
+ : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
+ _statistics) {}
const char* Name() const override {
return "BlobIndexCompactionFilterFactory";
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
- const CompactionFilter::Context& /*context*/) override;
+ const CompactionFilter::Context& context) override;
};
class BlobIndexCompactionFilterFactoryGC
: public BlobIndexCompactionFilterFactoryBase {
public:
BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, Env* _env,
+ const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
- : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
- }
+ : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
+ _statistics) {}
const char* Name() const override {
return "BlobIndexCompactionFilterFactoryGC";
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
- const CompactionFilter::Context& /*context*/) override;
+ const CompactionFilter::Context& context) override;
};
} // namespace blob_db