return BlobFileName(path_to_dir_, file_number_);
}
-std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader(
- Env* env, const DBOptions& db_options,
- const EnvOptions& env_options) const {
- constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
- std::unique_ptr<RandomAccessFile> sfile;
- std::string path_name(PathName());
- Status s = env->NewRandomAccessFile(path_name, &sfile, env_options);
- if (!s.ok()) {
- // report something here.
- return nullptr;
- }
- sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize);
-
- std::unique_ptr<RandomAccessFileReader> sfile_reader;
- sfile_reader.reset(new RandomAccessFileReader(
- NewLegacyRandomAccessFileWrapper(sfile), path_name));
-
- std::shared_ptr<Reader> log_reader = std::make_shared<Reader>(
- std::move(sfile_reader), db_options.env, db_options.statistics.get());
-
- return log_reader;
-}
-
std::string BlobFile::DumpState() const {
char str[1000];
snprintf(
obsolete_.store(true);
}
-bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const {
- assert(last_fsync_ <= file_size_);
- return (hard) ? file_size_ > last_fsync_
- : (file_size_ - last_fsync_) >= bytes_per_sync;
-}
-
Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) {
BlobLogFooter footer;
footer.blob_count = blob_count_;
}
// this will close the file and reset the Writable File Pointer.
- Status s = log_writer_->AppendFooter(footer);
+ Status s = log_writer_->AppendFooter(footer, /* checksum_method */ nullptr,
+ /* checksum_value */ nullptr);
if (s.ok()) {
closed_ = true;
immutable_sequence_ = sequence;
assert(ra_file_reader_);
Slice result;
- char scratch[BlobLogFooter::kSize + 10];
- Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
- scratch);
+ std::string buf;
+ AlignedBuf aligned_buf;
+ Status s;
+ if (ra_file_reader_->use_direct_io()) {
+ s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize,
+ &result, nullptr, &aligned_buf);
+ } else {
+ buf.reserve(BlobLogFooter::kSize + 10);
+ s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize,
+ &result, &buf[0], nullptr);
+ }
if (!s.ok()) return s;
if (result.size() != BlobLogFooter::kSize) {
// should not happen
}
Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
- // assume that file has been fully fsync'd
- last_fsync_.store(file_size_);
blob_count_ = footer.blob_count;
expiration_range_ = footer.expiration_range;
closed_ = true;
Status s;
if (log_writer_.get()) {
s = log_writer_->Sync();
- last_fsync_.store(file_size_.load());
}
return s;
}
PathName()));
// Read file header.
- char header_buf[BlobLogHeader::kSize];
+ std::string header_buf;
+ AlignedBuf aligned_buf;
Slice header_slice;
- s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, header_buf);
+ if (file_reader->use_direct_io()) {
+ s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice,
+ nullptr, &aligned_buf);
+ } else {
+ header_buf.reserve(BlobLogHeader::kSize);
+ s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice,
+ &header_buf[0], nullptr);
+ }
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to read header of blob file %" PRIu64
assert(!footer_valid_);
return Status::OK();
}
- char footer_buf[BlobLogFooter::kSize];
+ std::string footer_buf;
Slice footer_slice;
- s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize,
- &footer_slice, footer_buf);
+ if (file_reader->use_direct_io()) {
+ s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize,
+ BlobLogFooter::kSize, &footer_slice, nullptr,
+ &aligned_buf);
+ } else {
+ footer_buf.reserve(BlobLogFooter::kSize);
+ s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize,
+ BlobLogFooter::kSize, &footer_slice, &footer_buf[0],
+ nullptr);
+ }
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to read footer of blob file %" PRIu64