#include <sys/types.h>
#ifdef OS_LINUX
#include <sys/statfs.h>
-#include <sys/syscall.h>
#include <sys/sysmacros.h>
#endif
#include "monitoring/iostats_context_imp.h"
namespace ROCKSDB_NAMESPACE {
+std::string IOErrorMsg(const std::string& context,
+ const std::string& file_name) {
+ if (file_name.empty()) {
+ return context;
+ }
+ return context + ": " + file_name;
+}
+
+// file_name can be left empty if it is not unkown.
+IOStatus IOError(const std::string& context, const std::string& file_name,
+ int err_number) {
+ switch (err_number) {
+ case ENOSPC: {
+ IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name),
+ strerror(err_number));
+ s.SetRetryable(true);
+ return s;
+ }
+ case ESTALE:
+ return IOStatus::IOError(IOStatus::kStaleFile);
+ case ENOENT:
+ return IOStatus::PathNotFound(IOErrorMsg(context, file_name),
+ strerror(err_number));
+ default:
+ return IOStatus::IOError(IOErrorMsg(context, file_name),
+ strerror(err_number));
+ }
+}
+
// A wrapper for fadvise, if the platform doesn't support fadvise,
// it will simply return 0.
int Fadvise(int fd, off_t offset, size_t len, int advice) {
return true;
}
-size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) {
-#ifdef OS_LINUX
- struct stat buf;
- int result = fstat(fd, &buf);
- if (result == -1) {
- return kDefaultPageSize;
- }
- if (major(buf.st_dev) == 0) {
- // Unnamed devices (e.g. non-device mounts), reserved as null device number.
- // These don't have an entry in /sys/dev/block/. Return a sensible default.
- return kDefaultPageSize;
- }
-
- // Reading queue/logical_block_size does not require special permissions.
- const int kBufferSize = 100;
- char path[kBufferSize];
- char real_path[PATH_MAX + 1];
- snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
- minor(buf.st_dev));
- if (realpath(path, real_path) == nullptr) {
- return kDefaultPageSize;
- }
- std::string device_dir(real_path);
- if (!device_dir.empty() && device_dir.back() == '/') {
- device_dir.pop_back();
- }
- // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
- // and nvme0n1 have it.
- // $ ls -al '/sys/dev/block/8:3'
- // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
- // ../../block/sda/sda3
- // $ ls -al '/sys/dev/block/259:4'
- // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
- // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
- size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
- if (parent_end == std::string::npos) {
- return kDefaultPageSize;
- }
- size_t parent_begin = device_dir.rfind('/', parent_end - 1);
- if (parent_begin == std::string::npos) {
- return kDefaultPageSize;
- }
- std::string parent =
- device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
- std::string child = device_dir.substr(parent_end + 1, std::string::npos);
- if (parent != "block" &&
- (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
- device_dir = device_dir.substr(0, parent_end);
- }
- std::string fname = device_dir + "/queue/logical_block_size";
- FILE* fp;
- size_t size = 0;
- fp = fopen(fname.c_str(), "r");
- if (fp != nullptr) {
- char* line = nullptr;
- size_t len = 0;
- if (getline(&line, &len, fp) != -1) {
- sscanf(line, "%zu", &size);
- }
- free(line);
- fclose(fp);
- }
- if (size != 0 && (size & (size - 1)) == 0) {
- return size;
- }
-#endif
- return kDefaultPageSize;
-}
-
#ifdef ROCKSDB_RANGESYNC_PRESENT
#if !defined(ZFS_SUPER_MAGIC)
#endif
bool IsSyncFileRangeSupported(int fd) {
- // The approach taken in this function is to build a blacklist of cases where
- // we know `sync_file_range` definitely will not work properly despite passing
- // the compile-time check (`ROCKSDB_RANGESYNC_PRESENT`). If we are unsure, or
- // if any of the checks fail in unexpected ways, we allow `sync_file_range` to
- // be used. This way should minimize risk of impacting existing use cases.
+ // This function tracks and checks for cases where we know `sync_file_range`
+ // definitely will not work properly despite passing the compile-time check
+ // (`ROCKSDB_RANGESYNC_PRESENT`). If we are unsure, or if any of the checks
+ // fail in unexpected ways, we allow `sync_file_range` to be used. This way
+ // should minimize risk of impacting existing use cases.
struct statfs buf;
int ret = fstatfs(fd, &buf);
assert(ret == 0);
// ("Function not implemented").
return false;
}
- // None of the cases on the blacklist matched, so allow `sync_file_range` use.
+ // None of the known cases matched, so allow `sync_file_range` use.
return true;
}
/*
* DirectIOHelper
*/
-#ifndef NDEBUG
namespace {
bool IsSectorAligned(const size_t off, size_t sector_size) {
- return off % sector_size == 0;
+ assert((sector_size & (sector_size - 1)) == 0);
+ return (off & (sector_size - 1)) == 0;
}
+#ifndef NDEBUG
bool IsSectorAligned(const void* ptr, size_t sector_size) {
return uintptr_t(ptr) % sector_size == 0;
}
-
-} // namespace
#endif
+} // namespace
/*
* PosixSequentialFile
*/
PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
- int fd, const EnvOptions& options)
+ int fd, size_t logical_block_size,
+ const EnvOptions& options)
: filename_(fname),
file_(file),
fd_(fd),
use_direct_io_(options.use_direct_reads),
- logical_sector_size_(GetLogicalBufferSize(fd_)) {
+ logical_sector_size_(logical_block_size) {
assert(!options.use_direct_reads || !options.use_mmap_reads);
}
IOStatus s;
size_t r = 0;
do {
+ clearerr(file_);
r = fread_unlocked(scratch, 1, n, file_);
} while (r == 0 && ferror(file_) && errno == EINTR);
*result = Slice(scratch, r);
ptr += r;
offset += r;
left -= r;
- if (r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
+ if (!IsSectorAligned(r, GetRequiredBufferAlignment())) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
break;
return static_cast<size_t>(rid - id);
}
#endif
+
+#ifdef OS_LINUX
+std::string RemoveTrailingSlash(const std::string& path) {
+ std::string p = path;
+ if (p.size() > 1 && p.back() == '/') {
+ p.pop_back();
+ }
+ return p;
+}
+
+Status LogicalBlockSizeCache::RefAndCacheLogicalBlockSize(
+ const std::vector<std::string>& directories) {
+ std::vector<std::string> dirs;
+ dirs.reserve(directories.size());
+ for (auto& d : directories) {
+ dirs.emplace_back(RemoveTrailingSlash(d));
+ }
+
+ std::map<std::string, size_t> dir_sizes;
+ {
+ ReadLock lock(&cache_mutex_);
+ for (const auto& dir : dirs) {
+ if (cache_.find(dir) == cache_.end()) {
+ dir_sizes.emplace(dir, 0);
+ }
+ }
+ }
+
+ Status s;
+ for (auto& dir_size : dir_sizes) {
+ s = get_logical_block_size_of_directory_(dir_size.first, &dir_size.second);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ WriteLock lock(&cache_mutex_);
+ for (const auto& dir : dirs) {
+ auto& v = cache_[dir];
+ v.ref++;
+ auto dir_size = dir_sizes.find(dir);
+ if (dir_size != dir_sizes.end()) {
+ v.size = dir_size->second;
+ }
+ }
+ return s;
+}
+
+void LogicalBlockSizeCache::UnrefAndTryRemoveCachedLogicalBlockSize(
+ const std::vector<std::string>& directories) {
+ std::vector<std::string> dirs;
+ dirs.reserve(directories.size());
+ for (auto& dir : directories) {
+ dirs.emplace_back(RemoveTrailingSlash(dir));
+ }
+
+ WriteLock lock(&cache_mutex_);
+ for (const auto& dir : dirs) {
+ auto it = cache_.find(dir);
+ if (it != cache_.end() && !(--(it->second.ref))) {
+ cache_.erase(it);
+ }
+ }
+}
+
+size_t LogicalBlockSizeCache::GetLogicalBlockSize(const std::string& fname,
+ int fd) {
+ std::string dir = fname.substr(0, fname.find_last_of("/"));
+ if (dir.empty()) {
+ dir = "/";
+ }
+ {
+ ReadLock lock(&cache_mutex_);
+ auto it = cache_.find(dir);
+ if (it != cache_.end()) {
+ return it->second.size;
+ }
+ }
+ return get_logical_block_size_of_fd_(fd);
+}
+#endif
+
+Status PosixHelper::GetLogicalBlockSizeOfDirectory(const std::string& directory,
+ size_t* size) {
+ int fd = open(directory.c_str(), O_DIRECTORY | O_RDONLY);
+ if (fd == -1) {
+ close(fd);
+ return Status::IOError("Cannot open directory " + directory);
+ }
+ *size = PosixHelper::GetLogicalBlockSizeOfFd(fd);
+ close(fd);
+ return Status::OK();
+}
+
+size_t PosixHelper::GetLogicalBlockSizeOfFd(int fd) {
+#ifdef OS_LINUX
+ struct stat buf;
+ int result = fstat(fd, &buf);
+ if (result == -1) {
+ return kDefaultPageSize;
+ }
+ if (major(buf.st_dev) == 0) {
+ // Unnamed devices (e.g. non-device mounts), reserved as null device number.
+ // These don't have an entry in /sys/dev/block/. Return a sensible default.
+ return kDefaultPageSize;
+ }
+
+ // Reading queue/logical_block_size does not require special permissions.
+ const int kBufferSize = 100;
+ char path[kBufferSize];
+ char real_path[PATH_MAX + 1];
+ snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
+ minor(buf.st_dev));
+ if (realpath(path, real_path) == nullptr) {
+ return kDefaultPageSize;
+ }
+ std::string device_dir(real_path);
+ if (!device_dir.empty() && device_dir.back() == '/') {
+ device_dir.pop_back();
+ }
+ // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
+ // and nvme0n1 have it.
+ // $ ls -al '/sys/dev/block/8:3'
+ // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
+ // ../../block/sda/sda3
+ // $ ls -al '/sys/dev/block/259:4'
+ // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
+ // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
+ size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
+ if (parent_end == std::string::npos) {
+ return kDefaultPageSize;
+ }
+ size_t parent_begin = device_dir.rfind('/', parent_end - 1);
+ if (parent_begin == std::string::npos) {
+ return kDefaultPageSize;
+ }
+ std::string parent =
+ device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
+ std::string child = device_dir.substr(parent_end + 1, std::string::npos);
+ if (parent != "block" &&
+ (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
+ device_dir = device_dir.substr(0, parent_end);
+ }
+ std::string fname = device_dir + "/queue/logical_block_size";
+ FILE* fp;
+ size_t size = 0;
+ fp = fopen(fname.c_str(), "r");
+ if (fp != nullptr) {
+ char* line = nullptr;
+ size_t len = 0;
+ if (getline(&line, &len, fp) != -1) {
+ sscanf(line, "%zu", &size);
+ }
+ free(line);
+ fclose(fp);
+ }
+ if (size != 0 && (size & (size - 1)) == 0) {
+ return size;
+ }
+#endif
+ (void)fd;
+ return kDefaultPageSize;
+}
+
/*
* PosixRandomAccessFile
*
* pread() based random-access
*/
PosixRandomAccessFile::PosixRandomAccessFile(
- const std::string& fname, int fd, const EnvOptions& options
+ const std::string& fname, int fd, size_t logical_block_size,
+ const EnvOptions& options
#if defined(ROCKSDB_IOURING_PRESENT)
,
ThreadLocalPtr* thread_local_io_urings
: filename_(fname),
fd_(fd),
use_direct_io_(options.use_direct_reads),
- logical_sector_size_(GetLogicalBufferSize(fd_))
+ logical_sector_size_(logical_block_size)
#if defined(ROCKSDB_IOURING_PRESENT)
,
thread_local_io_urings_(thread_local_io_urings)
size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) {
+ if (use_direct_io()) {
+ for (size_t i = 0; i < num_reqs; i++) {
+ assert(IsSectorAligned(reqs[i].offset, GetRequiredBufferAlignment()));
+ assert(IsSectorAligned(reqs[i].len, GetRequiredBufferAlignment()));
+ assert(IsSectorAligned(reqs[i].scratch, GetRequiredBufferAlignment()));
+ }
+ }
+
#if defined(ROCKSDB_IOURING_PRESENT)
struct io_uring* iu = nullptr;
if (thread_local_io_urings_) {
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
- Slice tmp_slice;
- req->status =
- Read(req->offset + req_wrap->finished_len,
- req->len - req_wrap->finished_len, options, &tmp_slice,
- req->scratch + req_wrap->finished_len, dbg);
- req->result =
- Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
+ if (use_direct_io() &&
+ !IsSectorAligned(req_wrap->finished_len,
+ GetRequiredBufferAlignment())) {
+ // Bytes reads don't fill sectors. Should only happen at the end
+ // of the file.
+ req->result = Slice(req->scratch, req_wrap->finished_len);
+ req->status = IOStatus::OK();
+ } else {
+ Slice tmp_slice;
+ req->status =
+ Read(req->offset + req_wrap->finished_len,
+ req->len - req_wrap->finished_len, options, &tmp_slice,
+ req->scratch + req_wrap->finished_len, dbg);
+ req->result =
+ Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
+ }
} else if (bytes_read < req_wrap->iov.iov_len) {
assert(bytes_read > 0);
assert(bytes_read + req_wrap->finished_len < req->len);
PosixMmapFile::~PosixMmapFile() {
if (fd_ >= 0) {
- PosixMmapFile::Close(IOOptions(), nullptr);
+ IOStatus s = PosixMmapFile::Close(IOOptions(), nullptr);
+ s.PermitUncheckedError();
}
}
* Use posix write to write data to a file.
*/
PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
+ size_t logical_block_size,
const EnvOptions& options)
: FSWritableFile(options),
filename_(fname),
use_direct_io_(options.use_direct_writes),
fd_(fd),
filesize_(0),
- logical_sector_size_(GetLogicalBufferSize(fd_)) {
+ logical_sector_size_(logical_block_size) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
allow_fallocate_ = options.allow_fallocate;
fallocate_with_keep_size_ = options.fallocate_with_keep_size;
PosixWritableFile::~PosixWritableFile() {
if (fd_ >= 0) {
- PosixWritableFile::Close(IOOptions(), nullptr);
+ IOStatus s = PosixWritableFile::Close(IOOptions(), nullptr);
+ s.PermitUncheckedError();
}
}
PosixRandomRWFile::~PosixRandomRWFile() {
if (fd_ >= 0) {
- Close(IOOptions(), nullptr);
+ IOStatus s = Close(IOOptions(), nullptr);
+ s.PermitUncheckedError();
}
}