#include <memory>
#include <sstream>
-#include "db/blob_index.h"
+#include "db/blob/blob_index.h"
#include "db/db_impl/db_impl.h"
#include "db/write_batch_internal.h"
#include "env/composite_env_wrapper.h"
return Status::NotSupported("No blob directory in options");
}
- if (cf_options_.compaction_filter != nullptr ||
- cf_options_.compaction_filter_factory != nullptr) {
- return Status::NotSupported("Blob DB doesn't support compaction filter.");
- }
-
if (bdb_options_.garbage_collection_cutoff < 0.0 ||
bdb_options_.garbage_collection_cutoff > 1.0) {
return Status::InvalidArgument(
ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
+ if ((cf_options_.compaction_filter != nullptr ||
+ cf_options_.compaction_filter_factory != nullptr)) {
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "BlobDB only support compaction filter on non-TTL values.");
+ }
+
// Open blob directory.
s = env_->CreateDirIfMissing(blob_dir_);
if (!s.ok()) {
if (bdb_options_.enable_garbage_collection) {
db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
cf_options_.compaction_filter_factory =
- std::make_shared<BlobIndexCompactionFilterFactoryGC>(this, env_,
- statistics_);
+ std::make_shared<BlobIndexCompactionFilterFactoryGC>(
+ this, env_, cf_options_, statistics_);
} else {
db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
cf_options_.compaction_filter_factory =
- std::make_shared<BlobIndexCompactionFilterFactory>(this, env_,
- statistics_);
+ std::make_shared<BlobIndexCompactionFilterFactory>(
+ this, env_, cf_options_, statistics_);
}
+ // Reset user compaction filter after building into compaction factory.
+ cf_options_.compaction_filter = nullptr;
+
// Open base db.
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
if (!s.ok()) {
return s;
}
- db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
+ db_impl_ = static_cast_with_check<DBImpl>(db_->GetRootDB());
+
+ // Sanitize the blob_dir provided. Using a directory where the
+ // base DB stores its files for the default CF is not supported.
+ const ColumnFamilyData* const cfd =
+ static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
+ assert(cfd);
+
+ const ImmutableCFOptions* const ioptions = cfd->ioptions();
+ assert(ioptions);
+
+ assert(env_);
+
+ for (const auto& cf_path : ioptions->cf_paths) {
+ bool blob_dir_same_as_cf_dir = false;
+ s = env_->AreFilesSame(blob_dir_, cf_path.path, &blob_dir_same_as_cf_dir);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Error while sanitizing blob_dir %s, status: %s",
+ blob_dir_.c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ if (blob_dir_same_as_cf_dir) {
+ return Status::NotSupported(
+ "Using the base DB's storage directories for BlobDB files is not "
+ "supported.");
+ }
+ }
// Initialize SST file <-> oldest blob file mapping if garbage collection
// is enabled.
const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
MarkUnreferencedBlobFilesObsoleteImpl(
- [=](const std::shared_ptr<BlobFile>& blob_file) {
+ [this, obsolete_seq](const std::shared_ptr<BlobFile>& blob_file) {
WriteLock file_lock(&blob_file->mutex_);
return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
});
void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
MarkUnreferencedBlobFilesObsoleteImpl(
- [=](const std::shared_ptr<BlobFile>& blob_file) {
+ [this](const std::shared_ptr<BlobFile>& blob_file) {
return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
});
}
boffset);
}
- Writer::ElemType et = Writer::kEtNone;
+ BlobLogWriter::ElemType et = BlobLogWriter::kEtNone;
if (bfile->file_size_ == BlobLogHeader::kSize) {
- et = Writer::kEtFileHdr;
+ et = BlobLogWriter::kEtFileHdr;
} else if (bfile->file_size_ > BlobLogHeader::kSize) {
- et = Writer::kEtRecord;
+ et = BlobLogWriter::kEtRecord;
} else if (bfile->file_size_) {
ROCKS_LOG_WARN(db_options_.info_log,
"Open blob file: %s with wrong size: %" PRIu64,
return Status::Corruption("Invalid blob file size");
}
- bfile->log_writer_ = std::make_shared<Writer>(
+ bfile->log_writer_ = std::make_shared<BlobLogWriter>(
std::move(fwriter), env_, statistics_, bfile->file_number_,
- bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
+ db_options_.use_fsync, boffset);
bfile->log_writer_->last_elem_type_ = et;
return s;
Status BlobDBImpl::CheckOrCreateWriterLocked(
const std::shared_ptr<BlobFile>& blob_file,
- std::shared_ptr<Writer>* writer) {
+ std::shared_ptr<BlobLogWriter>* writer) {
assert(writer != nullptr);
*writer = blob_file->GetWriter();
if (*writer != nullptr) {
Status BlobDBImpl::CreateBlobFileAndWriter(
bool has_ttl, const ExpirationRange& expiration_range,
const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
- std::shared_ptr<Writer>* writer) {
+ std::shared_ptr<BlobLogWriter>* writer) {
+ TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
assert(has_ttl == (expiration_range.first || expiration_range.second));
assert(blob_file);
assert(writer);
return Status::OK();
}
- std::shared_ptr<Writer> writer;
+ std::shared_ptr<BlobLogWriter> writer;
const Status s = CreateBlobFileAndWriter(
/* has_ttl */ false, ExpirationRange(),
/* reason */ "SelectBlobFile", blob_file, &writer);
std::ostringstream oss;
oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
- std::shared_ptr<Writer> writer;
+ std::shared_ptr<BlobLogWriter> writer;
const Status s =
CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
/* reason */ oss.str(), blob_file, &writer);
StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_WRITE);
uint32_t default_cf_id =
- reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+ static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
+ ->GetID();
Status s;
BlobInserter blob_inserter(options, this, default_cf_id);
{
Status s;
std::string index_entry;
uint32_t column_family_id =
- reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+ static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
+ ->GetID();
if (value.size() < bdb_options_.min_blob_size) {
if (expiration == kNoExpiration) {
// Put as normal value
Slice value_compressed = GetCompressedSlice(value, &compression_output);
std::string headerbuf;
- Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
+ BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed,
+ expiration);
// Check DB size limit before selecting blob file to
// Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
return *compression_output;
}
+Status BlobDBImpl::DecompressSlice(const Slice& compressed_value,
+ CompressionType compression_type,
+ PinnableSlice* value_output) const {
+ assert(compression_type != kNoCompression);
+
+ BlockContents contents;
+ auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
+
+ {
+ StopWatch decompression_sw(env_, statistics_, BLOB_DB_DECOMPRESSION_MICROS);
+ UncompressionContext context(compression_type);
+ UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
+ compression_type);
+ Status s = UncompressBlockContentsForCompressionType(
+ info, compressed_value.data(), compressed_value.size(), &contents,
+ kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
+ if (!s.ok()) {
+ return Status::Corruption("Unable to decompress blob.");
+ }
+ }
+
+ value_output->PinSelf(contents.data);
+
+ return Status::OK();
+}
+
Status BlobDBImpl::CompactFiles(
const CompactionOptions& compact_options,
const std::vector<std::string>& input_file_names, const int output_level,
return s;
}
-void BlobDBImpl::GetCompactionContextCommon(
- BlobCompactionContext* context) const {
+void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) {
assert(context);
+ context->blob_db_impl = this;
context->next_file_number = next_file_number_.load();
context->current_blob_files.clear();
for (auto& p : blob_files_) {
ReadLock l(&mutex_);
GetCompactionContextCommon(context);
- context_gc->blob_db_impl = this;
-
if (!live_imm_non_ttl_blob_files_.empty()) {
auto it = live_imm_non_ttl_blob_files_.begin();
std::advance(it, bdb_options_.garbage_collection_cutoff *
uint64_t key_offset = 0;
{
WriteLock lockbfile_w(&bfile->mutex_);
- std::shared_ptr<Writer> writer;
+ std::shared_ptr<BlobLogWriter> writer;
s = CheckOrCreateWriterLocked(bfile, &writer);
if (!s.ok()) {
return s;
}
if (compression_type != kNoCompression) {
- BlockContents contents;
- auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
-
- {
- StopWatch decompression_sw(env_, statistics_,
- BLOB_DB_DECOMPRESSION_MICROS);
- UncompressionContext context(compression_type);
- UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
- compression_type);
- s = UncompressBlockContentsForCompressionType(
- info, value->data(), value->size(), &contents,
- kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
- }
-
+ s = DecompressSlice(*value, compression_type, value);
if (!s.ok()) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(
blob_index.file_number(), blob_index.offset(), blob_index.size(),
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
}
-
- return Status::Corruption("Unable to uncompress blob.");
+ return s;
}
-
- value->PinSelf(contents.data);
}
return Status::OK();
const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
// Allocate the buffer. This is safe in C++11
- std::string buffer_str(static_cast<size_t>(record_size), static_cast<char>(0));
- char* buffer = &buffer_str[0];
+ std::string buf;
+ AlignedBuf aligned_buf;
// A partial blob record contain checksum, key and value.
Slice blob_record;
{
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
- s = reader->Read(record_offset, static_cast<size_t>(record_size), &blob_record, buffer);
+ if (reader->use_direct_io()) {
+ s = reader->Read(IOOptions(), record_offset,
+ static_cast<size_t>(record_size), &blob_record, nullptr,
+ &aligned_buf);
+ } else {
+ buf.reserve(static_cast<size_t>(record_size));
+ s = reader->Read(IOOptions(), record_offset,
+ static_cast<size_t>(record_size), &blob_record, &buf[0],
+ nullptr);
+ }
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
}
Status BlobDBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
- return Get(read_options, column_family, key, value, nullptr /*expiration*/);
+ return Get(read_options, column_family, key, value,
+ static_cast<uint64_t*>(nullptr) /*expiration*/);
}
Status BlobDBImpl::Get(const ReadOptions& read_options,
}
Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
+ TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
assert(bfile);
assert(!bfile->Immutable());
assert(!bfile->Obsolete());
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
auto* cfd =
- reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
+ static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
+ ->cfd();
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
ManagedSnapshot* own_snapshot = nullptr;