]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/blob/blob_file_builder.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / blob / blob_file_builder.cc
index 57f05438c4727b3f0776fbee25d81dc1ca3cd974..5e0e7f6cb4ad0f040031eb7de42dc2c15c902bbb 100644 (file)
@@ -7,68 +7,84 @@
 
 #include <cassert>
 
+#include "db/blob/blob_contents.h"
 #include "db/blob/blob_file_addition.h"
+#include "db/blob/blob_file_completion_callback.h"
 #include "db/blob/blob_index.h"
 #include "db/blob/blob_log_format.h"
 #include "db/blob/blob_log_writer.h"
+#include "db/event_helpers.h"
 #include "db/version_set.h"
 #include "file/filename.h"
 #include "file/read_write_util.h"
 #include "file/writable_file_writer.h"
 #include "logging/logging.h"
 #include "options/cf_options.h"
+#include "options/options_helper.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/status.h"
 #include "test_util/sync_point.h"
+#include "trace_replay/io_tracer.h"
 #include "util/compression.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 BlobFileBuilder::BlobFileBuilder(
-    VersionSet* versions, Env* env, FileSystem* fs,
-    const ImmutableCFOptions* immutable_cf_options,
+    VersionSet* versions, FileSystem* fs,
+    const ImmutableOptions* immutable_options,
     const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
-    int job_id, uint32_t column_family_id,
-    const std::string& column_family_name, Env::IOPriority io_priority,
-    Env::WriteLifeTimeHint write_hint,
+    std::string db_id, std::string db_session_id, int job_id,
+    uint32_t column_family_id, const std::string& column_family_name,
+    Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint,
+    const std::shared_ptr<IOTracer>& io_tracer,
+    BlobFileCompletionCallback* blob_callback,
+    BlobFileCreationReason creation_reason,
     std::vector<std::string>* blob_file_paths,
     std::vector<BlobFileAddition>* blob_file_additions)
-    : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env,
-                      fs, immutable_cf_options, mutable_cf_options,
-                      file_options, job_id, column_family_id,
-                      column_family_name, io_priority, write_hint,
-                      blob_file_paths, blob_file_additions) {}
+    : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
+                      immutable_options, mutable_cf_options, file_options,
+                      db_id, db_session_id, job_id, column_family_id,
+                      column_family_name, io_priority, write_hint, io_tracer,
+                      blob_callback, creation_reason, blob_file_paths,
+                      blob_file_additions) {}
 
 BlobFileBuilder::BlobFileBuilder(
-    std::function<uint64_t()> file_number_generator, Env* env, FileSystem* fs,
-    const ImmutableCFOptions* immutable_cf_options,
+    std::function<uint64_t()> file_number_generator, FileSystem* fs,
+    const ImmutableOptions* immutable_options,
     const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
-    int job_id, uint32_t column_family_id,
-    const std::string& column_family_name, Env::IOPriority io_priority,
-    Env::WriteLifeTimeHint write_hint,
+    std::string db_id, std::string db_session_id, int job_id,
+    uint32_t column_family_id, const std::string& column_family_name,
+    Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint,
+    const std::shared_ptr<IOTracer>& io_tracer,
+    BlobFileCompletionCallback* blob_callback,
+    BlobFileCreationReason creation_reason,
     std::vector<std::string>* blob_file_paths,
     std::vector<BlobFileAddition>* blob_file_additions)
     : file_number_generator_(std::move(file_number_generator)),
-      env_(env),
       fs_(fs),
-      immutable_cf_options_(immutable_cf_options),
+      immutable_options_(immutable_options),
       min_blob_size_(mutable_cf_options->min_blob_size),
       blob_file_size_(mutable_cf_options->blob_file_size),
       blob_compression_type_(mutable_cf_options->blob_compression_type),
+      prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
       file_options_(file_options),
+      db_id_(std::move(db_id)),
+      db_session_id_(std::move(db_session_id)),
       job_id_(job_id),
       column_family_id_(column_family_id),
       column_family_name_(column_family_name),
       io_priority_(io_priority),
       write_hint_(write_hint),
+      io_tracer_(io_tracer),
+      blob_callback_(blob_callback),
+      creation_reason_(creation_reason),
       blob_file_paths_(blob_file_paths),
       blob_file_additions_(blob_file_additions),
       blob_count_(0),
       blob_bytes_(0) {
   assert(file_number_generator_);
-  assert(env_);
   assert(fs_);
-  assert(immutable_cf_options_);
+  assert(immutable_options_);
   assert(file_options_);
   assert(blob_file_paths_);
   assert(blob_file_paths_->empty());
@@ -122,6 +138,16 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
     }
   }
 
+  {
+    const Status s =
+        PutBlobIntoCacheIfNeeded(value, blob_file_number, blob_offset);
+    if (!s.ok()) {
+      ROCKS_LOG_WARN(immutable_options_->info_log,
+                     "Failed to pre-populate the blob into blob cache: %s",
+                     s.ToString().c_str());
+    }
+  }
+
   BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
                         blob_compression_type_);
 
@@ -149,19 +175,25 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
   assert(file_number_generator_);
   const uint64_t blob_file_number = file_number_generator_();
 
-  assert(immutable_cf_options_);
-  assert(!immutable_cf_options_->cf_paths.empty());
-  std::string blob_file_path = BlobFileName(
-      immutable_cf_options_->cf_paths.front().path, blob_file_number);
+  assert(immutable_options_);
+  assert(!immutable_options_->cf_paths.empty());
+  std::string blob_file_path =
+      BlobFileName(immutable_options_->cf_paths.front().path, blob_file_number);
+
+  if (blob_callback_) {
+    blob_callback_->OnBlobFileCreationStarted(
+        blob_file_path, column_family_name_, job_id_, creation_reason_);
+  }
 
   std::unique_ptr<FSWritableFile> file;
 
   {
-    TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile");
-
     assert(file_options_);
-    const Status s =
-        NewWritableFile(fs_, blob_file_path, &file, *file_options_);
+    Status s = NewWritableFile(fs_, blob_file_path, &file, *file_options_);
+
+    TEST_SYNC_POINT_CALLBACK(
+        "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s);
+
     if (!s.ok()) {
       return s;
     }
@@ -176,17 +208,20 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
   assert(file);
   file->SetIOPriority(io_priority_);
   file->SetWriteLifeTimeHint(write_hint_);
-
-  Statistics* const statistics = immutable_cf_options_->statistics;
-
+  FileTypeSet tmp_set = immutable_options_->checksum_handoff_file_types;
+  Statistics* const statistics = immutable_options_->stats;
   std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
-      std::move(file), blob_file_paths_->back(), *file_options_, env_,
-      nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners,
-      immutable_cf_options_->file_checksum_gen_factory));
+      std::move(file), blob_file_paths_->back(), *file_options_,
+      immutable_options_->clock, io_tracer_, statistics,
+      immutable_options_->listeners,
+      immutable_options_->file_checksum_gen_factory.get(),
+      tmp_set.Contains(FileType::kBlobFile), false));
 
-  std::unique_ptr<BlobLogWriter> blob_log_writer(
-      new BlobLogWriter(std::move(file_writer), env_, statistics,
-                        blob_file_number, immutable_cf_options_->use_fsync));
+  constexpr bool do_flush = false;
+
+  std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
+      std::move(file_writer), immutable_options_->clock, statistics,
+      blob_file_number, immutable_options_->use_fsync, do_flush));
 
   constexpr bool has_ttl = false;
   constexpr ExpirationRange expiration_range;
@@ -195,9 +230,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
                        expiration_range);
 
   {
-    TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader");
+    Status s = blob_log_writer->WriteHeader(header);
+
+    TEST_SYNC_POINT_CALLBACK(
+        "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s);
 
-    const Status s = blob_log_writer->WriteHeader(header);
     if (!s.ok()) {
       return s;
     }
@@ -215,6 +252,7 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
   assert(blob);
   assert(compressed_blob);
   assert(compressed_blob->empty());
+  assert(immutable_options_);
 
   if (blob_compression_type_ == kNoCompression) {
     return Status::OK();
@@ -229,7 +267,16 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
 
   constexpr uint32_t compression_format_version = 2;
 
-  if (!CompressData(*blob, info, compression_format_version, compressed_blob)) {
+  bool success = false;
+
+  {
+    StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
+                         BLOB_DB_COMPRESSION_MICROS);
+    success =
+        CompressData(*blob, info, compression_format_version, compressed_blob);
+  }
+
+  if (!success) {
     return Status::Corruption("Error compressing blob");
   }
 
@@ -247,9 +294,10 @@ Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob,
 
   uint64_t key_offset = 0;
 
-  TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord");
+  Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
+
+  TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s);
 
-  const Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
   if (!s.ok()) {
     return s;
   }
@@ -271,23 +319,30 @@ Status BlobFileBuilder::CloseBlobFile() {
   std::string checksum_method;
   std::string checksum_value;
 
-  TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter");
+  Status s = writer_->AppendFooter(footer, &checksum_method, &checksum_value);
+
+  TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s);
 
-  const Status s =
-      writer_->AppendFooter(footer, &checksum_method, &checksum_value);
   if (!s.ok()) {
     return s;
   }
 
   const uint64_t blob_file_number = writer_->get_log_number();
 
+  if (blob_callback_) {
+    s = blob_callback_->OnBlobFileCompleted(
+        blob_file_paths_->back(), column_family_name_, job_id_,
+        blob_file_number, creation_reason_, s, checksum_value, checksum_method,
+        blob_count_, blob_bytes_);
+  }
+
   assert(blob_file_additions_);
   blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
                                      std::move(checksum_method),
                                      std::move(checksum_value));
 
-  assert(immutable_cf_options_);
-  ROCKS_LOG_INFO(immutable_cf_options_->info_log,
+  assert(immutable_options_);
+  ROCKS_LOG_INFO(immutable_options_->logger,
                  "[%s] [JOB %d] Generated blob file #%" PRIu64 ": %" PRIu64
                  " total blobs, %" PRIu64 " total bytes",
                  column_family_name_.c_str(), job_id_, blob_file_number,
@@ -297,7 +352,7 @@ Status BlobFileBuilder::CloseBlobFile() {
   blob_count_ = 0;
   blob_bytes_ = 0;
 
-  return Status::OK();
+  return s;
 }
 
 Status BlobFileBuilder::CloseBlobFileIfNeeded() {
@@ -313,4 +368,79 @@ Status BlobFileBuilder::CloseBlobFileIfNeeded() {
   return CloseBlobFile();
 }
 
+void BlobFileBuilder::Abandon(const Status& s) {
+  if (!IsBlobFileOpen()) {
+    return;
+  }
+  if (blob_callback_) {
+    // BlobFileBuilder::Abandon() is called because of error while writing to
+    // Blob files. So we can ignore the below error.
+    blob_callback_
+        ->OnBlobFileCompleted(blob_file_paths_->back(), column_family_name_,
+                              job_id_, writer_->get_log_number(),
+                              creation_reason_, s, "", "", blob_count_,
+                              blob_bytes_)
+        .PermitUncheckedError();
+  }
+
+  writer_.reset();
+  blob_count_ = 0;
+  blob_bytes_ = 0;
+}
+
+Status BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice& blob,
+                                                 uint64_t blob_file_number,
+                                                 uint64_t blob_offset) const {
+  Status s = Status::OK();
+
+  auto blob_cache = immutable_options_->blob_cache;
+  auto statistics = immutable_options_->statistics.get();
+  bool warm_cache =
+      prepopulate_blob_cache_ == PrepopulateBlobCache::kFlushOnly &&
+      creation_reason_ == BlobFileCreationReason::kFlush;
+
+  if (blob_cache && warm_cache) {
+    const OffsetableCacheKey base_cache_key(db_id_, db_session_id_,
+                                            blob_file_number);
+    const CacheKey cache_key = base_cache_key.WithOffset(blob_offset);
+    const Slice key = cache_key.AsSlice();
+
+    const Cache::Priority priority = Cache::Priority::BOTTOM;
+
+    // Objects to be put into the cache have to be heap-allocated and
+    // self-contained, i.e. own their contents. The Cache has to be able to
+    // take unique ownership of them.
+    CacheAllocationPtr allocation =
+        AllocateBlock(blob.size(), blob_cache->memory_allocator());
+    memcpy(allocation.get(), blob.data(), blob.size());
+    std::unique_ptr<BlobContents> buf =
+        BlobContents::Create(std::move(allocation), blob.size());
+
+    Cache::CacheItemHelper* const cache_item_helper =
+        BlobContents::GetCacheItemHelper();
+    assert(cache_item_helper);
+
+    if (immutable_options_->lowest_used_cache_tier ==
+        CacheTier::kNonVolatileBlockTier) {
+      s = blob_cache->Insert(key, buf.get(), cache_item_helper,
+                             buf->ApproximateMemoryUsage(),
+                             nullptr /* cache_handle */, priority);
+    } else {
+      s = blob_cache->Insert(key, buf.get(), buf->ApproximateMemoryUsage(),
+                             cache_item_helper->del_cb,
+                             nullptr /* cache_handle */, priority);
+    }
+
+    if (s.ok()) {
+      RecordTick(statistics, BLOB_DB_CACHE_ADD);
+      RecordTick(statistics, BLOB_DB_CACHE_BYTES_WRITE, buf->size());
+      buf.release();
+    } else {
+      RecordTick(statistics, BLOB_DB_CACHE_ADD_FAILURES);
+    }
+  }
+
+  return s;
+}
+
 }  // namespace ROCKSDB_NAMESPACE