]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/utilities/blob_db/blob_db_impl.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_db_impl.cc
index 5f2ca249814c18764e1e601428ea5f9c4bf9b0de..79c034db757d79f8e4771c583b45f3b8aa455e44 100644 (file)
@@ -12,7 +12,7 @@
 #include <memory>
 #include <sstream>
 
-#include "db/blob_index.h"
+#include "db/blob/blob_index.h"
 #include "db/db_impl/db_impl.h"
 #include "db/write_batch_internal.h"
 #include "env/composite_env_wrapper.h"
@@ -137,11 +137,6 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
     return Status::NotSupported("No blob directory in options");
   }
 
-  if (cf_options_.compaction_filter != nullptr ||
-      cf_options_.compaction_filter_factory != nullptr) {
-    return Status::NotSupported("Blob DB doesn't support compaction filter.");
-  }
-
   if (bdb_options_.garbage_collection_cutoff < 0.0 ||
       bdb_options_.garbage_collection_cutoff > 1.0) {
     return Status::InvalidArgument(
@@ -169,6 +164,12 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
 
   ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
 
+  if ((cf_options_.compaction_filter != nullptr ||
+       cf_options_.compaction_filter_factory != nullptr)) {
+    ROCKS_LOG_INFO(db_options_.info_log,
+                   "BlobDB only support compaction filter on non-TTL values.");
+  }
+
   // Open blob directory.
   s = env_->CreateDirIfMissing(blob_dir_);
   if (!s.ok()) {
@@ -194,22 +195,53 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
   if (bdb_options_.enable_garbage_collection) {
     db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
     cf_options_.compaction_filter_factory =
-        std::make_shared<BlobIndexCompactionFilterFactoryGC>(this, env_,
-                                                             statistics_);
+        std::make_shared<BlobIndexCompactionFilterFactoryGC>(
+            this, env_, cf_options_, statistics_);
   } else {
     db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
     cf_options_.compaction_filter_factory =
-        std::make_shared<BlobIndexCompactionFilterFactory>(this, env_,
-                                                           statistics_);
+        std::make_shared<BlobIndexCompactionFilterFactory>(
+            this, env_, cf_options_, statistics_);
   }
 
+  // Reset user compaction filter after building into compaction factory.
+  cf_options_.compaction_filter = nullptr;
+
   // Open base db.
   ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
   s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
   if (!s.ok()) {
     return s;
   }
-  db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
+  db_impl_ = static_cast_with_check<DBImpl>(db_->GetRootDB());
+
+  // Sanitize the blob_dir provided. Using a directory where the
+  // base DB stores its files for the default CF is not supported.
+  const ColumnFamilyData* const cfd =
+      static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
+  assert(cfd);
+
+  const ImmutableCFOptions* const ioptions = cfd->ioptions();
+  assert(ioptions);
+
+  assert(env_);
+
+  for (const auto& cf_path : ioptions->cf_paths) {
+    bool blob_dir_same_as_cf_dir = false;
+    s = env_->AreFilesSame(blob_dir_, cf_path.path, &blob_dir_same_as_cf_dir);
+    if (!s.ok()) {
+      ROCKS_LOG_ERROR(db_options_.info_log,
+                      "Error while sanitizing blob_dir %s, status: %s",
+                      blob_dir_.c_str(), s.ToString().c_str());
+      return s;
+    }
+
+    if (blob_dir_same_as_cf_dir) {
+      return Status::NotSupported(
+          "Using the base DB's storage directories for BlobDB files is not "
+          "supported.");
+    }
+  }
 
   // Initialize SST file <-> oldest blob file mapping if garbage collection
   // is enabled.
@@ -622,7 +654,7 @@ void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
   const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
 
   MarkUnreferencedBlobFilesObsoleteImpl(
-      [=](const std::shared_ptr<BlobFile>& blob_file) {
+      [this, obsolete_seq](const std::shared_ptr<BlobFile>& blob_file) {
         WriteLock file_lock(&blob_file->mutex_);
         return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
       });
@@ -630,7 +662,7 @@ void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
 
 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
   MarkUnreferencedBlobFilesObsoleteImpl(
-      [=](const std::shared_ptr<BlobFile>& blob_file) {
+      [this](const std::shared_ptr<BlobFile>& blob_file) {
         return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
       });
 }
@@ -710,11 +742,11 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
                     boffset);
   }
 
-  Writer::ElemType et = Writer::kEtNone;
+  BlobLogWriter::ElemType et = BlobLogWriter::kEtNone;
   if (bfile->file_size_ == BlobLogHeader::kSize) {
-    et = Writer::kEtFileHdr;
+    et = BlobLogWriter::kEtFileHdr;
   } else if (bfile->file_size_ > BlobLogHeader::kSize) {
-    et = Writer::kEtRecord;
+    et = BlobLogWriter::kEtRecord;
   } else if (bfile->file_size_) {
     ROCKS_LOG_WARN(db_options_.info_log,
                    "Open blob file: %s with wrong size: %" PRIu64,
@@ -722,9 +754,9 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
     return Status::Corruption("Invalid blob file size");
   }
 
-  bfile->log_writer_ = std::make_shared<Writer>(
+  bfile->log_writer_ = std::make_shared<BlobLogWriter>(
       std::move(fwriter), env_, statistics_, bfile->file_number_,
-      bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
+      db_options_.use_fsync, boffset);
   bfile->log_writer_->last_elem_type_ = et;
 
   return s;
@@ -766,7 +798,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
 
 Status BlobDBImpl::CheckOrCreateWriterLocked(
     const std::shared_ptr<BlobFile>& blob_file,
-    std::shared_ptr<Writer>* writer) {
+    std::shared_ptr<BlobLogWriter>* writer) {
   assert(writer != nullptr);
   *writer = blob_file->GetWriter();
   if (*writer != nullptr) {
@@ -782,7 +814,8 @@ Status BlobDBImpl::CheckOrCreateWriterLocked(
 Status BlobDBImpl::CreateBlobFileAndWriter(
     bool has_ttl, const ExpirationRange& expiration_range,
     const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
-    std::shared_ptr<Writer>* writer) {
+    std::shared_ptr<BlobLogWriter>* writer) {
+  TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
   assert(has_ttl == (expiration_range.first || expiration_range.second));
   assert(blob_file);
   assert(writer);
@@ -838,7 +871,7 @@ Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
     return Status::OK();
   }
 
-  std::shared_ptr<Writer> writer;
+  std::shared_ptr<BlobLogWriter> writer;
   const Status s = CreateBlobFileAndWriter(
       /* has_ttl */ false, ExpirationRange(),
       /* reason */ "SelectBlobFile", blob_file, &writer);
@@ -884,7 +917,7 @@ Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
   std::ostringstream oss;
   oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
 
-  std::shared_ptr<Writer> writer;
+  std::shared_ptr<BlobLogWriter> writer;
   const Status s =
       CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
                               /* reason */ oss.str(), blob_file, &writer);
@@ -962,7 +995,8 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
   StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
   RecordTick(statistics_, BLOB_DB_NUM_WRITE);
   uint32_t default_cf_id =
-      reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+      static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
+          ->GetID();
   Status s;
   BlobInserter blob_inserter(options, this, default_cf_id);
   {
@@ -1017,7 +1051,8 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
   Status s;
   std::string index_entry;
   uint32_t column_family_id =
-      reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+      static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
+          ->GetID();
   if (value.size() < bdb_options_.min_blob_size) {
     if (expiration == kNoExpiration) {
       // Put as normal value
@@ -1035,7 +1070,8 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
     Slice value_compressed = GetCompressedSlice(value, &compression_output);
 
     std::string headerbuf;
-    Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
+    BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed,
+                                       expiration);
 
     // Check DB size limit before selecting blob file to
     // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
@@ -1109,6 +1145,32 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
   return *compression_output;
 }
 
+Status BlobDBImpl::DecompressSlice(const Slice& compressed_value,
+                                   CompressionType compression_type,
+                                   PinnableSlice* value_output) const {
+  assert(compression_type != kNoCompression);
+
+  BlockContents contents;
+  auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
+
+  {
+    StopWatch decompression_sw(env_, statistics_, BLOB_DB_DECOMPRESSION_MICROS);
+    UncompressionContext context(compression_type);
+    UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
+                           compression_type);
+    Status s = UncompressBlockContentsForCompressionType(
+        info, compressed_value.data(), compressed_value.size(), &contents,
+        kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
+    if (!s.ok()) {
+      return Status::Corruption("Unable to decompress blob.");
+    }
+  }
+
+  value_output->PinSelf(contents.data);
+
+  return Status::OK();
+}
+
 Status BlobDBImpl::CompactFiles(
     const CompactionOptions& compact_options,
     const std::vector<std::string>& input_file_names, const int output_level,
@@ -1137,10 +1199,10 @@ Status BlobDBImpl::CompactFiles(
   return s;
 }
 
-void BlobDBImpl::GetCompactionContextCommon(
-    BlobCompactionContext* context) const {
+void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) {
   assert(context);
 
+  context->blob_db_impl = this;
   context->next_file_number = next_file_number_.load();
   context->current_blob_files.clear();
   for (auto& p : blob_files_) {
@@ -1165,8 +1227,6 @@ void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
   ReadLock l(&mutex_);
   GetCompactionContextCommon(context);
 
-  context_gc->blob_db_impl = this;
-
   if (!live_imm_non_ttl_blob_files_.empty()) {
     auto it = live_imm_non_ttl_blob_files_.begin();
     std::advance(it, bdb_options_.garbage_collection_cutoff *
@@ -1284,7 +1344,7 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
   uint64_t key_offset = 0;
   {
     WriteLock lockbfile_w(&bfile->mutex_);
-    std::shared_ptr<Writer> writer;
+    std::shared_ptr<BlobLogWriter> writer;
     s = CheckOrCreateWriterLocked(bfile, &writer);
     if (!s.ok()) {
       return s;
@@ -1390,20 +1450,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
   }
 
   if (compression_type != kNoCompression) {
-    BlockContents contents;
-    auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
-
-    {
-      StopWatch decompression_sw(env_, statistics_,
-                                 BLOB_DB_DECOMPRESSION_MICROS);
-      UncompressionContext context(compression_type);
-      UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
-                             compression_type);
-      s = UncompressBlockContentsForCompressionType(
-          info, value->data(), value->size(), &contents,
-          kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
-    }
-
+    s = DecompressSlice(*value, compression_type, value);
     if (!s.ok()) {
       if (debug_level_ >= 2) {
         ROCKS_LOG_ERROR(
@@ -1414,11 +1461,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
             blob_index.file_number(), blob_index.offset(), blob_index.size(),
             key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
       }
-
-      return Status::Corruption("Unable to uncompress blob.");
+      return s;
     }
-
-    value->PinSelf(contents.data);
   }
 
   return Status::OK();
@@ -1482,15 +1526,24 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
   const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
 
   // Allocate the buffer. This is safe in C++11
-  std::string buffer_str(static_cast<size_t>(record_size), static_cast<char>(0));
-  char* buffer = &buffer_str[0];
+  std::string buf;
+  AlignedBuf aligned_buf;
 
   // A partial blob record contain checksum, key and value.
   Slice blob_record;
 
   {
     StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
-    s = reader->Read(record_offset, static_cast<size_t>(record_size), &blob_record, buffer);
+    if (reader->use_direct_io()) {
+      s = reader->Read(IOOptions(), record_offset,
+                       static_cast<size_t>(record_size), &blob_record, nullptr,
+                       &aligned_buf);
+    } else {
+      buf.reserve(static_cast<size_t>(record_size));
+      s = reader->Read(IOOptions(), record_offset,
+                       static_cast<size_t>(record_size), &blob_record, &buf[0],
+                       nullptr);
+    }
     RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
   }
 
@@ -1552,7 +1605,8 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
 Status BlobDBImpl::Get(const ReadOptions& read_options,
                        ColumnFamilyHandle* column_family, const Slice& key,
                        PinnableSlice* value) {
-  return Get(read_options, column_family, key, value, nullptr /*expiration*/);
+  return Get(read_options, column_family, key, value,
+             static_cast<uint64_t*>(nullptr) /*expiration*/);
 }
 
 Status BlobDBImpl::Get(const ReadOptions& read_options,
@@ -1668,6 +1722,7 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
 }
 
 Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
+  TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
   assert(bfile);
   assert(!bfile->Immutable());
   assert(!bfile->Obsolete());
@@ -1978,7 +2033,8 @@ void BlobDBImpl::CopyBlobFiles(
 
 Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
   auto* cfd =
-      reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
+      static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
+          ->cfd();
   // Get a snapshot to avoid blob file get deleted between we
   // fetch and index entry and reading from the file.
   ManagedSnapshot* own_snapshot = nullptr;