#endif
#include <errno.h>
#include <fcntl.h>
-
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
+
#include <algorithm>
// Get nano time includes
#if defined(OS_LINUX) || defined(OS_FREEBSD)
#include "env/composite_env_wrapper.h"
#include "env/io_posix.h"
-#include "logging/posix_logger.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_updater.h"
+#include "port/lang.h"
#include "port/port.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#define EXT4_SUPER_MAGIC 0xEF53
#endif
+extern "C" bool RocksDbIOUringEnable() __attribute__((__weak__));
+
namespace ROCKSDB_NAMESPACE {
namespace {
return allow_non_owner_access ? 0644 : 0600;
}
-static uint64_t gettid() {
- return Env::Default()->GetThreadID();
-}
-
// list of pathnames that are locked
// Only used for error message.
struct LockHoldingInfo {
class PosixFileLock : public FileLock {
public:
- int fd_;
+ int fd_ = /*invalid*/ -1;
std::string filename;
+
+ void Clear() {
+ fd_ = -1;
+ filename.clear();
+ }
+
+ virtual ~PosixFileLock() override {
+ // Check for destruction without UnlockFile
+ assert(fd_ == -1);
+ }
};
int cloexec_flags(int flags, const EnvOptions* options) {
public:
PosixFileSystem();
- const char* Name() const override { return "Posix File System"; }
+ static const char* kClassName() { return "PosixFileSystem"; }
+ const char* Name() const override { return kClassName(); }
+ const char* NickName() const override { return kDefaultName(); }
~PosixFileSystem() override {}
+ bool IsInstanceOf(const std::string& name) const override {
+ if (name == "posix") {
+ return true;
+ } else {
+ return FileSystem::IsInstanceOf(name);
+ }
+ }
void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
}
SetFD_CLOEXEC(fd, &options);
- if (options.use_mmap_reads && sizeof(void*) >= 8) {
+ if (options.use_mmap_reads) {
// Use of mmap for random reads has been removed because it
// kills performance when storage is fast.
// Use mmap when virtual address-space is plentiful.
options
#if defined(ROCKSDB_IOURING_PRESENT)
,
- thread_local_io_urings_.get()
+ !IsIOUringEnabled() ? nullptr : thread_local_io_urings_.get()
#endif
));
}
}
virtual IOStatus OpenWritableFile(const std::string& fname,
- const FileOptions& options,
- bool reopen,
+ const FileOptions& options, bool reopen,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* /*dbg*/) {
result->reset();
SetFD_CLOEXEC(fd, &options);
if (options.use_mmap_writes) {
- if (!checkedDiskForMmap_) {
- // this will be executed once in the program's lifetime.
- // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
- if (!SupportsFastAllocate(fname)) {
- forceMmapOff_ = true;
- }
- checkedDiskForMmap_ = true;
- }
+ MaybeForceDisableMmap(fd);
}
if (options.use_mmap_writes && !forceMmapOff_) {
result->reset(new PosixMmapFile(fname, fd, page_size_, options));
}
if (options.use_mmap_writes) {
- if (!checkedDiskForMmap_) {
- // this will be executed once in the program's lifetime.
- // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
- if (!SupportsFastAllocate(fname)) {
- forceMmapOff_ = true;
- }
- checkedDiskForMmap_ = true;
- }
+ MaybeForceDisableMmap(fd);
}
if (options.use_mmap_writes && !forceMmapOff_) {
result->reset(new PosixMmapFile(fname, fd, page_size_, options));
if (fd < 0) {
return IOError("While open directory", name, errno);
} else {
- result->reset(new PosixDirectory(fd));
+ result->reset(new PosixDirectory(fd, name));
}
return IOStatus::OK();
}
- IOStatus NewLogger(const std::string& fname, const IOOptions& /*opts*/,
- std::shared_ptr<Logger>* result,
- IODebugContext* /*dbg*/) override {
- FILE* f;
- {
- IOSTATS_TIMER_GUARD(open_nanos);
- f = fopen(fname.c_str(),
- "w"
-#ifdef __GLIBC_PREREQ
-#if __GLIBC_PREREQ(2, 7)
- "e" // glibc extension to enable O_CLOEXEC
-#endif
-#endif
- );
- }
- if (f == nullptr) {
- result->reset();
- return status_to_io_status(
- IOError("when fopen a file for new logger", fname, errno));
- } else {
- int fd = fileno(f);
-#ifdef ROCKSDB_FALLOCATE_PRESENT
- fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
-#endif
- SetFD_CLOEXEC(fd, nullptr);
- result->reset(new PosixLogger(f, &gettid, Env::Default()));
- return IOStatus::OK();
- }
- }
-
IOStatus FileExists(const std::string& fname, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
int result = access(fname.c_str(), F_OK);
return IOStatus::NotFound();
default:
assert(err == EIO || err == ENOMEM);
- return IOStatus::IOError("Unexpected error(" + ToString(err) +
+ return IOStatus::IOError("Unexpected error(" + std::to_string(err) +
") accessing file `" + fname + "' ");
}
}
- IOStatus GetChildren(const std::string& dir, const IOOptions& /*opts*/,
+ IOStatus GetChildren(const std::string& dir, const IOOptions& opts,
std::vector<std::string>* result,
IODebugContext* /*dbg*/) override {
result->clear();
+
DIR* d = opendir(dir.c_str());
if (d == nullptr) {
switch (errno) {
return IOError("While opendir", dir, errno);
}
}
+
+ // reset errno before calling readdir()
+ errno = 0;
struct dirent* entry;
+
while ((entry = readdir(d)) != nullptr) {
- result->push_back(entry->d_name);
+ // filter out '.' and '..' directory entries
+ // which appear only on some platforms
+ const bool ignore =
+ entry->d_type == DT_DIR &&
+ (strcmp(entry->d_name, ".") == 0 ||
+ strcmp(entry->d_name, "..") == 0
+#ifndef ASSERT_STATUS_CHECKED
+ // In case of ASSERT_STATUS_CHECKED, GetChildren support older
+ // version of API for debugging purpose.
+ || opts.do_not_recurse
+#endif
+ );
+ if (!ignore) {
+ result->push_back(entry->d_name);
+ }
+ errno = 0; // reset errno if readdir() success
}
- closedir(d);
+
+ // always attempt to close the dir
+ const auto pre_close_errno = errno; // errno may be modified by closedir
+ const int close_result = closedir(d);
+
+ if (pre_close_errno != 0) {
+ // error occurred during readdir
+ return IOError("While readdir", dir, pre_close_errno);
+ }
+
+ if (close_result != 0) {
+ // error occurred during closedir
+ return IOError("While closedir", dir, errno);
+ }
+
return IOStatus::OK();
}
const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
if (link(src.c_str(), target.c_str()) != 0) {
- if (errno == EXDEV) {
- return IOStatus::NotSupported("No cross FS links allowed");
+ if (errno == EXDEV || errno == ENOTSUP) {
+ return IOStatus::NotSupported(errno == EXDEV
+ ? "No cross FS links allowed"
+ : "Links not supported by FS");
}
return IOError("while link file to " + target, src, errno);
}
LockHoldingInfo lhi;
int64_t current_time = 0;
// Ignore status code as the time is only used for error message.
- Env::Default()->GetCurrentTime(¤t_time).PermitUncheckedError();
+ SystemClock::Default()
+ ->GetCurrentTime(¤t_time)
+ .PermitUncheckedError();
lhi.acquire_time = current_time;
lhi.acquiring_thread = Env::Default()->GetThreadID();
// closed, all locks the process holds for that *file* are released
const auto it_success = locked_files.insert({fname, lhi});
if (it_success.second == false) {
+ LockHoldingInfo prev_info = it_success.first->second;
mutex_locked_files.Unlock();
errno = ENOLCK;
- LockHoldingInfo& prev_info = it_success.first->second;
// Note that the thread ID printed is the same one as the one in
// posix logger, but posix logger prints it hex format.
return IOError("lock hold by current process, acquire time " +
- ToString(prev_info.acquire_time) +
+ std::to_string(prev_info.acquire_time) +
" acquiring thread " +
- ToString(prev_info.acquiring_thread),
+ std::to_string(prev_info.acquiring_thread),
fname, errno);
}
if (fd < 0) {
result = IOError("while open a file for lock", fname, errno);
} else if (LockOrUnlock(fd, true) == -1) {
- // if there is an error in locking, then remove the pathname from
- // lockedfiles
- locked_files.erase(fname);
result = IOError("While lock file", fname, errno);
close(fd);
} else {
my_lock->filename = fname;
*lock = my_lock;
}
+ if (!result.ok()) {
+ // If there is an error in locking, then remove the pathname from
+ // locked_files. (If we got this far, it did not exist in locked_files
+ // before this call.)
+ locked_files.erase(fname);
+ }
mutex_locked_files.Unlock();
return result;
result = IOError("unlock", my_lock->filename, errno);
}
close(my_lock->fd_);
+ my_lock->Clear();
delete my_lock;
mutex_locked_files.Unlock();
return result;
return IOStatus::OK();
}
- char the_path[256];
- char* ret = getcwd(the_path, 256);
+ char the_path[4096];
+ char* ret = getcwd(the_path, 4096);
if (ret == nullptr) {
- return IOStatus::IOError(strerror(errno));
+ return IOStatus::IOError(errnoStr(errno).c_str());
}
*output_path = ret;
return IOError("While doing statvfs", fname, errno);
}
- *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
+ // sbuf.bfree is total free space available to root
+ // sbuf.bavail is total free space available to unprivileged user
+ // sbuf.bavail <= sbuf.bfree ... pick correct based upon effective user id
+ if (geteuid()) {
+ // non-zero user is unprivileged, or -1 if error. take more conservative
+ // size
+ *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bavail);
+ } else {
+ // root user can access all disk space
+ *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
+ }
return IOStatus::OK();
}
}
FileOptions OptimizeForLogWrite(const FileOptions& file_options,
- const DBOptions& db_options) const override {
+ const DBOptions& db_options) const override {
FileOptions optimized = file_options;
optimized.use_mmap_writes = false;
optimized.use_direct_writes = false;
}
#endif
private:
- bool checkedDiskForMmap_;
- bool forceMmapOff_; // do we override Env options?
+ bool forceMmapOff_ = false; // do we override Env options?
// Returns true iff the named directory exists and is a directory.
virtual bool DirExists(const std::string& dname) {
return false; // stat() failed return false
}
- bool SupportsFastAllocate(const std::string& path) {
+ bool SupportsFastAllocate(int fd) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
struct statfs s;
- if (statfs(path.c_str(), &s)) {
+ if (fstatfs(fd, &s)) {
return false;
}
switch (s.f_type) {
return false;
}
#else
- (void)path;
+ (void)fd;
return false;
#endif
}
+ void MaybeForceDisableMmap(int fd) {
+ static std::once_flag s_check_disk_for_mmap_once;
+ assert(this == FileSystem::Default().get());
+ std::call_once(
+ s_check_disk_for_mmap_once,
+ [this](int fdesc) {
+ // this will be executed once in the program's lifetime.
+ // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
+ if (!SupportsFastAllocate(fdesc)) {
+ forceMmapOff_ = true;
+ }
+ },
+ fd);
+ }
+
+#ifdef ROCKSDB_IOURING_PRESENT
+ bool IsIOUringEnabled() {
+ if (RocksDbIOUringEnable && RocksDbIOUringEnable()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+#endif // ROCKSDB_IOURING_PRESENT
+
+ // EXPERIMENTAL
+ //
+ // TODO akankshamahajan:
+ // 1. Update Poll API to take into account min_completions
+ // and returns if number of handles in io_handles (any order) completed is
+ // equal to atleast min_completions.
+ // 2. Currently in case of direct_io, Read API is called because of which call
+ // to Poll API fails as it expects IOHandle to be populated.
+ virtual IOStatus Poll(std::vector<void*>& io_handles,
+ size_t /*min_completions*/) override {
+#if defined(ROCKSDB_IOURING_PRESENT)
+ // io_uring_queue_init.
+ struct io_uring* iu = nullptr;
+ if (thread_local_io_urings_) {
+ iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+ }
+
+ // Init failed, platform doesn't support io_uring.
+ if (iu == nullptr) {
+ return IOStatus::NotSupported("Poll");
+ }
+
+ for (size_t i = 0; i < io_handles.size(); i++) {
+ // The request has been completed in earlier runs.
+ if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
+ continue;
+ }
+ // Loop until IO for io_handles[i] is completed.
+ while (true) {
+ // io_uring_wait_cqe.
+ struct io_uring_cqe* cqe = nullptr;
+ ssize_t ret = io_uring_wait_cqe(iu, &cqe);
+ if (ret) {
+ // abort as it shouldn't be in indeterminate state and there is no
+ // good way currently to handle this error.
+ abort();
+ }
+
+ // Step 3: Populate the request.
+ assert(cqe != nullptr);
+ Posix_IOHandle* posix_handle =
+ static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
+ assert(posix_handle->iu == iu);
+ if (posix_handle->iu != iu) {
+ return IOStatus::IOError("");
+ }
+ // Reset cqe data to catch any stray reuse of it
+ static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
+
+ FSReadRequest req;
+ req.scratch = posix_handle->scratch;
+ req.offset = posix_handle->offset;
+ req.len = posix_handle->len;
+
+ size_t finished_len = 0;
+ size_t bytes_read = 0;
+ bool read_again = false;
+ UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
+ true /*async_read*/, posix_handle->use_direct_io,
+ posix_handle->alignment, finished_len, &req, bytes_read,
+ read_again);
+ posix_handle->is_finished = true;
+ io_uring_cqe_seen(iu, cqe);
+ posix_handle->cb(req, posix_handle->cb_arg);
+
+ (void)finished_len;
+ (void)bytes_read;
+ (void)read_again;
+
+ if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
+ break;
+ }
+ }
+ }
+ return IOStatus::OK();
+#else
+ (void)io_handles;
+ return IOStatus::NotSupported("Poll");
+#endif
+ }
+
+ virtual IOStatus AbortIO(std::vector<void*>& io_handles) override {
+#if defined(ROCKSDB_IOURING_PRESENT)
+ // io_uring_queue_init.
+ struct io_uring* iu = nullptr;
+ if (thread_local_io_urings_) {
+ iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+ }
+
+ // Init failed, platform doesn't support io_uring.
+ // If Poll is not supported then it didn't submit any request and it should
+ // return OK.
+ if (iu == nullptr) {
+ return IOStatus::OK();
+ }
+
+ for (size_t i = 0; i < io_handles.size(); i++) {
+ Posix_IOHandle* posix_handle =
+ static_cast<Posix_IOHandle*>(io_handles[i]);
+ if (posix_handle->is_finished == true) {
+ continue;
+ }
+ assert(posix_handle->iu == iu);
+ if (posix_handle->iu != iu) {
+ return IOStatus::IOError("");
+ }
+
+ // Prepare the cancel request.
+ struct io_uring_sqe* sqe;
+ sqe = io_uring_get_sqe(iu);
+
+ // In order to cancel the request, sqe->addr of cancel request should
+ // match with the read request submitted which is posix_handle->iov.
+ io_uring_prep_cancel(sqe, &posix_handle->iov, 0);
+ // Sets sqe->user_data to posix_handle.
+ io_uring_sqe_set_data(sqe, posix_handle);
+
+ // submit the request.
+ ssize_t ret = io_uring_submit(iu);
+ if (ret < 0) {
+ fprintf(stderr, "io_uring_submit error: %ld\n", long(ret));
+ return IOStatus::IOError("io_uring_submit() requested but returned " +
+ std::to_string(ret));
+ }
+ }
+
+ // After submitting the requests, wait for the requests.
+ for (size_t i = 0; i < io_handles.size(); i++) {
+ if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
+ continue;
+ }
+
+ while (true) {
+ struct io_uring_cqe* cqe = nullptr;
+ ssize_t ret = io_uring_wait_cqe(iu, &cqe);
+ if (ret) {
+ // abort as it shouldn't be in indeterminate state and there is no
+ // good way currently to handle this error.
+ abort();
+ }
+ assert(cqe != nullptr);
+
+ // Returns cqe->user_data.
+ Posix_IOHandle* posix_handle =
+ static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
+ assert(posix_handle->iu == iu);
+ if (posix_handle->iu != iu) {
+ return IOStatus::IOError("");
+ }
+ posix_handle->req_count++;
+
+ // Reset cqe data to catch any stray reuse of it
+ static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
+ io_uring_cqe_seen(iu, cqe);
+
+ // - If the request is cancelled successfully, the original request is
+ // completed with -ECANCELED and the cancel request is completed with
+ // a result of 0.
+ // - If the request was already running, the original may or
+ // may not complete in error. The cancel request will complete with
+ // -EALREADY for that case.
+ // - And finally, if the request to cancel wasn't
+ // found, the cancel request is completed with -ENOENT.
+ //
+ // Every handle has to wait for 2 requests completion: original one and
+ // the cancel request which is tracked by PosixHandle::req_count.
+ if (posix_handle->req_count == 2 &&
+ static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
+ posix_handle->is_finished = true;
+ FSReadRequest req;
+ req.status = IOStatus::Aborted();
+ posix_handle->cb(req, posix_handle->cb_arg);
+
+ break;
+ }
+ }
+ }
+ return IOStatus::OK();
+#else
+ // If Poll is not supported then it didn't submit any request and it should
+ // return OK.
+ (void)io_handles;
+ return IOStatus::OK();
+#endif
+ }
+
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring instance
std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_;
}
PosixFileSystem::PosixFileSystem()
- : checkedDiskForMmap_(false),
- forceMmapOff_(false),
+ : forceMmapOff_(false),
page_size_(getpagesize()),
allow_non_owner_access_(true) {
#if defined(ROCKSDB_IOURING_PRESENT)
// Default Posix FileSystem
//
std::shared_ptr<FileSystem> FileSystem::Default() {
- static PosixFileSystem default_fs;
- static std::shared_ptr<PosixFileSystem> default_fs_ptr(
- &default_fs, [](PosixFileSystem*) {});
- return default_fs_ptr;
+ STATIC_AVOID_DESTRUCTION(std::shared_ptr<FileSystem>, instance)
+ (std::make_shared<PosixFileSystem>());
+ return instance;
}
#ifndef ROCKSDB_LITE
static FactoryFunc<FileSystem> posix_filesystem_reg =
- ObjectLibrary::Default()->Register<FileSystem>(
- "posix://.*",
+ ObjectLibrary::Default()->AddFactory<FileSystem>(
+ ObjectLibrary::PatternEntry("posix").AddSeparator("://", false),
[](const std::string& /* uri */, std::unique_ptr<FileSystem>* f,
std::string* /* errmsg */) {
f->reset(new PosixFileSystem());