#include <cassert>
+#include "db/blob/blob_contents.h"
#include "db/blob/blob_file_addition.h"
+#include "db/blob/blob_file_completion_callback.h"
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_writer.h"
+#include "db/event_helpers.h"
#include "db/version_set.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h"
#include "options/cf_options.h"
+#include "options/options_helper.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "test_util/sync_point.h"
+#include "trace_replay/io_tracer.h"
#include "util/compression.h"
namespace ROCKSDB_NAMESPACE {
BlobFileBuilder::BlobFileBuilder(
- VersionSet* versions, Env* env, FileSystem* fs,
- const ImmutableCFOptions* immutable_cf_options,
+ VersionSet* versions, FileSystem* fs,
+ const ImmutableOptions* immutable_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
- int job_id, uint32_t column_family_id,
- const std::string& column_family_name, Env::IOPriority io_priority,
- Env::WriteLifeTimeHint write_hint,
+ std::string db_id, std::string db_session_id, int job_id,
+ uint32_t column_family_id, const std::string& column_family_name,
+ Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint,
+ const std::shared_ptr<IOTracer>& io_tracer,
+ BlobFileCompletionCallback* blob_callback,
+ BlobFileCreationReason creation_reason,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
- : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env,
- fs, immutable_cf_options, mutable_cf_options,
- file_options, job_id, column_family_id,
- column_family_name, io_priority, write_hint,
- blob_file_paths, blob_file_additions) {}
+ : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
+ immutable_options, mutable_cf_options, file_options,
+ db_id, db_session_id, job_id, column_family_id,
+ column_family_name, io_priority, write_hint, io_tracer,
+ blob_callback, creation_reason, blob_file_paths,
+ blob_file_additions) {}
BlobFileBuilder::BlobFileBuilder(
- std::function<uint64_t()> file_number_generator, Env* env, FileSystem* fs,
- const ImmutableCFOptions* immutable_cf_options,
+ std::function<uint64_t()> file_number_generator, FileSystem* fs,
+ const ImmutableOptions* immutable_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
- int job_id, uint32_t column_family_id,
- const std::string& column_family_name, Env::IOPriority io_priority,
- Env::WriteLifeTimeHint write_hint,
+ std::string db_id, std::string db_session_id, int job_id,
+ uint32_t column_family_id, const std::string& column_family_name,
+ Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint,
+ const std::shared_ptr<IOTracer>& io_tracer,
+ BlobFileCompletionCallback* blob_callback,
+ BlobFileCreationReason creation_reason,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: file_number_generator_(std::move(file_number_generator)),
- env_(env),
fs_(fs),
- immutable_cf_options_(immutable_cf_options),
+ immutable_options_(immutable_options),
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compression_type_(mutable_cf_options->blob_compression_type),
+ prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
file_options_(file_options),
+ db_id_(std::move(db_id)),
+ db_session_id_(std::move(db_session_id)),
job_id_(job_id),
column_family_id_(column_family_id),
column_family_name_(column_family_name),
io_priority_(io_priority),
write_hint_(write_hint),
+ io_tracer_(io_tracer),
+ blob_callback_(blob_callback),
+ creation_reason_(creation_reason),
blob_file_paths_(blob_file_paths),
blob_file_additions_(blob_file_additions),
blob_count_(0),
blob_bytes_(0) {
assert(file_number_generator_);
- assert(env_);
assert(fs_);
- assert(immutable_cf_options_);
+ assert(immutable_options_);
assert(file_options_);
assert(blob_file_paths_);
assert(blob_file_paths_->empty());
}
}
+ {
+ const Status s =
+ PutBlobIntoCacheIfNeeded(value, blob_file_number, blob_offset);
+ if (!s.ok()) {
+ ROCKS_LOG_WARN(immutable_options_->info_log,
+ "Failed to pre-populate the blob into blob cache: %s",
+ s.ToString().c_str());
+ }
+ }
+
BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);
assert(file_number_generator_);
const uint64_t blob_file_number = file_number_generator_();
- assert(immutable_cf_options_);
- assert(!immutable_cf_options_->cf_paths.empty());
- std::string blob_file_path = BlobFileName(
- immutable_cf_options_->cf_paths.front().path, blob_file_number);
+ assert(immutable_options_);
+ assert(!immutable_options_->cf_paths.empty());
+ std::string blob_file_path =
+ BlobFileName(immutable_options_->cf_paths.front().path, blob_file_number);
+
+ if (blob_callback_) {
+ blob_callback_->OnBlobFileCreationStarted(
+ blob_file_path, column_family_name_, job_id_, creation_reason_);
+ }
std::unique_ptr<FSWritableFile> file;
{
- TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile");
-
assert(file_options_);
- const Status s =
- NewWritableFile(fs_, blob_file_path, &file, *file_options_);
+ Status s = NewWritableFile(fs_, blob_file_path, &file, *file_options_);
+
+ TEST_SYNC_POINT_CALLBACK(
+ "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s);
+
if (!s.ok()) {
return s;
}
assert(file);
file->SetIOPriority(io_priority_);
file->SetWriteLifeTimeHint(write_hint_);
-
- Statistics* const statistics = immutable_cf_options_->statistics;
-
+ FileTypeSet tmp_set = immutable_options_->checksum_handoff_file_types;
+ Statistics* const statistics = immutable_options_->stats;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- std::move(file), blob_file_paths_->back(), *file_options_, env_,
- nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners,
- immutable_cf_options_->file_checksum_gen_factory));
+ std::move(file), blob_file_paths_->back(), *file_options_,
+ immutable_options_->clock, io_tracer_, statistics,
+ immutable_options_->listeners,
+ immutable_options_->file_checksum_gen_factory.get(),
+ tmp_set.Contains(FileType::kBlobFile), false));
- std::unique_ptr<BlobLogWriter> blob_log_writer(
- new BlobLogWriter(std::move(file_writer), env_, statistics,
- blob_file_number, immutable_cf_options_->use_fsync));
+ constexpr bool do_flush = false;
+
+ std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
+ std::move(file_writer), immutable_options_->clock, statistics,
+ blob_file_number, immutable_options_->use_fsync, do_flush));
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
expiration_range);
{
- TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader");
+ Status s = blob_log_writer->WriteHeader(header);
+
+ TEST_SYNC_POINT_CALLBACK(
+ "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s);
- const Status s = blob_log_writer->WriteHeader(header);
if (!s.ok()) {
return s;
}
assert(blob);
assert(compressed_blob);
assert(compressed_blob->empty());
+ assert(immutable_options_);
if (blob_compression_type_ == kNoCompression) {
return Status::OK();
constexpr uint32_t compression_format_version = 2;
- if (!CompressData(*blob, info, compression_format_version, compressed_blob)) {
+ bool success = false;
+
+ {
+ StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
+ BLOB_DB_COMPRESSION_MICROS);
+ success =
+ CompressData(*blob, info, compression_format_version, compressed_blob);
+ }
+
+ if (!success) {
return Status::Corruption("Error compressing blob");
}
uint64_t key_offset = 0;
- TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord");
+ Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
+
+ TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s);
- const Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
if (!s.ok()) {
return s;
}
std::string checksum_method;
std::string checksum_value;
- TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter");
+ Status s = writer_->AppendFooter(footer, &checksum_method, &checksum_value);
+
+ TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s);
- const Status s =
- writer_->AppendFooter(footer, &checksum_method, &checksum_value);
if (!s.ok()) {
return s;
}
const uint64_t blob_file_number = writer_->get_log_number();
+ if (blob_callback_) {
+ s = blob_callback_->OnBlobFileCompleted(
+ blob_file_paths_->back(), column_family_name_, job_id_,
+ blob_file_number, creation_reason_, s, checksum_value, checksum_method,
+ blob_count_, blob_bytes_);
+ }
+
assert(blob_file_additions_);
blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
std::move(checksum_method),
std::move(checksum_value));
- assert(immutable_cf_options_);
- ROCKS_LOG_INFO(immutable_cf_options_->info_log,
+ assert(immutable_options_);
+ ROCKS_LOG_INFO(immutable_options_->logger,
"[%s] [JOB %d] Generated blob file #%" PRIu64 ": %" PRIu64
" total blobs, %" PRIu64 " total bytes",
column_family_name_.c_str(), job_id_, blob_file_number,
blob_count_ = 0;
blob_bytes_ = 0;
- return Status::OK();
+ return s;
}
Status BlobFileBuilder::CloseBlobFileIfNeeded() {
return CloseBlobFile();
}
+void BlobFileBuilder::Abandon(const Status& s) {
+ if (!IsBlobFileOpen()) {
+ return;
+ }
+ if (blob_callback_) {
+ // BlobFileBuilder::Abandon() is called because of error while writing to
+ // Blob files. So we can ignore the below error.
+ blob_callback_
+ ->OnBlobFileCompleted(blob_file_paths_->back(), column_family_name_,
+ job_id_, writer_->get_log_number(),
+ creation_reason_, s, "", "", blob_count_,
+ blob_bytes_)
+ .PermitUncheckedError();
+ }
+
+ writer_.reset();
+ blob_count_ = 0;
+ blob_bytes_ = 0;
+}
+
+Status BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice& blob,
+ uint64_t blob_file_number,
+ uint64_t blob_offset) const {
+ Status s = Status::OK();
+
+ auto blob_cache = immutable_options_->blob_cache;
+ auto statistics = immutable_options_->statistics.get();
+ bool warm_cache =
+ prepopulate_blob_cache_ == PrepopulateBlobCache::kFlushOnly &&
+ creation_reason_ == BlobFileCreationReason::kFlush;
+
+ if (blob_cache && warm_cache) {
+ const OffsetableCacheKey base_cache_key(db_id_, db_session_id_,
+ blob_file_number);
+ const CacheKey cache_key = base_cache_key.WithOffset(blob_offset);
+ const Slice key = cache_key.AsSlice();
+
+ const Cache::Priority priority = Cache::Priority::BOTTOM;
+
+ // Objects to be put into the cache have to be heap-allocated and
+ // self-contained, i.e. own their contents. The Cache has to be able to
+ // take unique ownership of them.
+ CacheAllocationPtr allocation =
+ AllocateBlock(blob.size(), blob_cache->memory_allocator());
+ memcpy(allocation.get(), blob.data(), blob.size());
+ std::unique_ptr<BlobContents> buf =
+ BlobContents::Create(std::move(allocation), blob.size());
+
+ Cache::CacheItemHelper* const cache_item_helper =
+ BlobContents::GetCacheItemHelper();
+ assert(cache_item_helper);
+
+ if (immutable_options_->lowest_used_cache_tier ==
+ CacheTier::kNonVolatileBlockTier) {
+ s = blob_cache->Insert(key, buf.get(), cache_item_helper,
+ buf->ApproximateMemoryUsage(),
+ nullptr /* cache_handle */, priority);
+ } else {
+ s = blob_cache->Insert(key, buf.get(), buf->ApproximateMemoryUsage(),
+ cache_item_helper->del_cb,
+ nullptr /* cache_handle */, priority);
+ }
+
+ if (s.ok()) {
+ RecordTick(statistics, BLOB_DB_CACHE_ADD);
+ RecordTick(statistics, BLOB_DB_CACHE_BYTES_WRITE, buf->size());
+ buf.release();
+ } else {
+ RecordTick(statistics, BLOB_DB_CACHE_ADD_FAILURES);
+ }
+ }
+
+ return s;
+}
+
} // namespace ROCKSDB_NAMESPACE