]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/env/fs_posix.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / env / fs_posix.cc
index 92133eb227cc811738e308346910a95b9382ff4d..e179a421dcd8f1a1e4c7f8bedb986db986035159 100644 (file)
@@ -15,7 +15,6 @@
 #endif
 #include <errno.h>
 #include <fcntl.h>
-
 #include <pthread.h>
 #include <signal.h>
 #include <stdio.h>
@@ -32,6 +31,7 @@
 #include <sys/time.h>
 #include <sys/types.h>
 #include <time.h>
+
 #include <algorithm>
 // Get nano time includes
 #if defined(OS_LINUX) || defined(OS_FREEBSD)
@@ -48,9 +48,9 @@
 
 #include "env/composite_env_wrapper.h"
 #include "env/io_posix.h"
-#include "logging/posix_logger.h"
 #include "monitoring/iostats_context_imp.h"
 #include "monitoring/thread_status_updater.h"
+#include "port/lang.h"
 #include "port/port.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
@@ -73,6 +73,8 @@
 #define EXT4_SUPER_MAGIC 0xEF53
 #endif
 
+extern "C" bool RocksDbIOUringEnable() __attribute__((__weak__));
+
 namespace ROCKSDB_NAMESPACE {
 
 namespace {
@@ -81,10 +83,6 @@ inline mode_t GetDBFileMode(bool allow_non_owner_access) {
   return allow_non_owner_access ? 0644 : 0600;
 }
 
-static uint64_t gettid() {
-  return Env::Default()->GetThreadID();
-}
-
 // list of pathnames that are locked
 // Only used for error message.
 struct LockHoldingInfo {
@@ -109,8 +107,18 @@ static int LockOrUnlock(int fd, bool lock) {
 
 class PosixFileLock : public FileLock {
  public:
-  int fd_;
+  int fd_ = /*invalid*/ -1;
   std::string filename;
+
+  void Clear() {
+    fd_ = -1;
+    filename.clear();
+  }
+
+  virtual ~PosixFileLock() override {
+    // Check for destruction without UnlockFile
+    assert(fd_ == -1);
+  }
 };
 
 int cloexec_flags(int flags, const EnvOptions* options) {
@@ -131,9 +139,18 @@ class PosixFileSystem : public FileSystem {
  public:
   PosixFileSystem();
 
-  const char* Name() const override { return "Posix File System"; }
+  static const char* kClassName() { return "PosixFileSystem"; }
+  const char* Name() const override { return kClassName(); }
+  const char* NickName() const override { return kDefaultName(); }
 
   ~PosixFileSystem() override {}
+  bool IsInstanceOf(const std::string& name) const override {
+    if (name == "posix") {
+      return true;
+    } else {
+      return FileSystem::IsInstanceOf(name);
+    }
+  }
 
   void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
     if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
@@ -226,7 +243,7 @@ class PosixFileSystem : public FileSystem {
     }
     SetFD_CLOEXEC(fd, &options);
 
-    if (options.use_mmap_reads && sizeof(void*) >= 8) {
+    if (options.use_mmap_reads) {
       // Use of mmap for random reads has been removed because it
       // kills performance when storage is fast.
       // Use mmap when virtual address-space is plentiful.
@@ -259,7 +276,7 @@ class PosixFileSystem : public FileSystem {
           options
 #if defined(ROCKSDB_IOURING_PRESENT)
           ,
-          thread_local_io_urings_.get()
+          !IsIOUringEnabled() ? nullptr : thread_local_io_urings_.get()
 #endif
               ));
     }
@@ -267,8 +284,7 @@ class PosixFileSystem : public FileSystem {
   }
 
   virtual IOStatus OpenWritableFile(const std::string& fname,
-                                    const FileOptions& options,
-                                    bool reopen,
+                                    const FileOptions& options, bool reopen,
                                     std::unique_ptr<FSWritableFile>* result,
                                     IODebugContext* /*dbg*/) {
     result->reset();
@@ -314,14 +330,7 @@ class PosixFileSystem : public FileSystem {
     SetFD_CLOEXEC(fd, &options);
 
     if (options.use_mmap_writes) {
-      if (!checkedDiskForMmap_) {
-        // this will be executed once in the program's lifetime.
-        // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
-        if (!SupportsFastAllocate(fname)) {
-          forceMmapOff_ = true;
-        }
-        checkedDiskForMmap_ = true;
-      }
+      MaybeForceDisableMmap(fd);
     }
     if (options.use_mmap_writes && !forceMmapOff_) {
       result->reset(new PosixMmapFile(fname, fd, page_size_, options));
@@ -420,14 +429,7 @@ class PosixFileSystem : public FileSystem {
     }
 
     if (options.use_mmap_writes) {
-      if (!checkedDiskForMmap_) {
-        // this will be executed once in the program's lifetime.
-        // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
-        if (!SupportsFastAllocate(fname)) {
-          forceMmapOff_ = true;
-        }
-        checkedDiskForMmap_ = true;
-      }
+      MaybeForceDisableMmap(fd);
     }
     if (options.use_mmap_writes && !forceMmapOff_) {
       result->reset(new PosixMmapFile(fname, fd, page_size_, options));
@@ -545,41 +547,11 @@ class PosixFileSystem : public FileSystem {
     if (fd < 0) {
       return IOError("While open directory", name, errno);
     } else {
-      result->reset(new PosixDirectory(fd));
+      result->reset(new PosixDirectory(fd, name));
     }
     return IOStatus::OK();
   }
 
-  IOStatus NewLogger(const std::string& fname, const IOOptions& /*opts*/,
-                   std::shared_ptr<Logger>* result,
-                   IODebugContext* /*dbg*/) override {
-    FILE* f;
-    {
-      IOSTATS_TIMER_GUARD(open_nanos);
-      f = fopen(fname.c_str(),
-                "w"
-#ifdef __GLIBC_PREREQ
-#if __GLIBC_PREREQ(2, 7)
-                "e"  // glibc extension to enable O_CLOEXEC
-#endif
-#endif
-      );
-    }
-    if (f == nullptr) {
-      result->reset();
-      return status_to_io_status(
-              IOError("when fopen a file for new logger", fname, errno));
-    } else {
-      int fd = fileno(f);
-#ifdef ROCKSDB_FALLOCATE_PRESENT
-      fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
-#endif
-      SetFD_CLOEXEC(fd, nullptr);
-      result->reset(new PosixLogger(f, &gettid, Env::Default()));
-      return IOStatus::OK();
-    }
-  }
-
   IOStatus FileExists(const std::string& fname, const IOOptions& /*opts*/,
                       IODebugContext* /*dbg*/) override {
     int result = access(fname.c_str(), F_OK);
@@ -598,15 +570,16 @@ class PosixFileSystem : public FileSystem {
         return IOStatus::NotFound();
       default:
         assert(err == EIO || err == ENOMEM);
-        return IOStatus::IOError("Unexpected error(" + ToString(err) +
+        return IOStatus::IOError("Unexpected error(" + std::to_string(err) +
                                  ") accessing file `" + fname + "' ");
     }
   }
 
-  IOStatus GetChildren(const std::string& dir, const IOOptions& /*opts*/,
+  IOStatus GetChildren(const std::string& dir, const IOOptions& opts,
                        std::vector<std::string>* result,
                        IODebugContext* /*dbg*/) override {
     result->clear();
+
     DIR* d = opendir(dir.c_str());
     if (d == nullptr) {
       switch (errno) {
@@ -618,11 +591,44 @@ class PosixFileSystem : public FileSystem {
           return IOError("While opendir", dir, errno);
       }
     }
+
+    // reset errno before calling readdir()
+    errno = 0;
     struct dirent* entry;
+
     while ((entry = readdir(d)) != nullptr) {
-      result->push_back(entry->d_name);
+      // filter out '.' and '..' directory entries
+      // which appear only on some platforms
+      const bool ignore =
+          entry->d_type == DT_DIR &&
+          (strcmp(entry->d_name, ".") == 0 ||
+           strcmp(entry->d_name, "..") == 0
+#ifndef ASSERT_STATUS_CHECKED
+           // In case of ASSERT_STATUS_CHECKED, GetChildren support older
+           // version of API for debugging purpose.
+           || opts.do_not_recurse
+#endif
+          );
+      if (!ignore) {
+        result->push_back(entry->d_name);
+      }
+      errno = 0;  // reset errno if readdir() success
     }
-    closedir(d);
+
+    // always attempt to close the dir
+    const auto pre_close_errno = errno;  // errno may be modified by closedir
+    const int close_result = closedir(d);
+
+    if (pre_close_errno != 0) {
+      // error occurred during readdir
+      return IOError("While readdir", dir, pre_close_errno);
+    }
+
+    if (close_result != 0) {
+      // error occurred during closedir
+      return IOError("While closedir", dir, errno);
+    }
+
     return IOStatus::OK();
   }
 
@@ -704,8 +710,10 @@ class PosixFileSystem : public FileSystem {
                     const IOOptions& /*opts*/,
                     IODebugContext* /*dbg*/) override {
     if (link(src.c_str(), target.c_str()) != 0) {
-      if (errno == EXDEV) {
-        return IOStatus::NotSupported("No cross FS links allowed");
+      if (errno == EXDEV || errno == ENOTSUP) {
+        return IOStatus::NotSupported(errno == EXDEV
+                                          ? "No cross FS links allowed"
+                                          : "Links not supported by FS");
       }
       return IOError("while link file to " + target, src, errno);
     }
@@ -750,7 +758,9 @@ class PosixFileSystem : public FileSystem {
     LockHoldingInfo lhi;
     int64_t current_time = 0;
     // Ignore status code as the time is only used for error message.
-    Env::Default()->GetCurrentTime(&current_time).PermitUncheckedError();
+    SystemClock::Default()
+        ->GetCurrentTime(&current_time)
+        .PermitUncheckedError();
     lhi.acquire_time = current_time;
     lhi.acquiring_thread = Env::Default()->GetThreadID();
 
@@ -766,15 +776,15 @@ class PosixFileSystem : public FileSystem {
     // closed, all locks the process holds for that *file* are released
     const auto it_success = locked_files.insert({fname, lhi});
     if (it_success.second == false) {
+      LockHoldingInfo prev_info = it_success.first->second;
       mutex_locked_files.Unlock();
       errno = ENOLCK;
-      LockHoldingInfo& prev_info = it_success.first->second;
       // Note that the thread ID printed is the same one as the one in
       // posix logger, but posix logger prints it hex format.
       return IOError("lock hold by current process, acquire time " +
-                         ToString(prev_info.acquire_time) +
+                         std::to_string(prev_info.acquire_time) +
                          " acquiring thread " +
-                         ToString(prev_info.acquiring_thread),
+                         std::to_string(prev_info.acquiring_thread),
                      fname, errno);
     }
 
@@ -789,9 +799,6 @@ class PosixFileSystem : public FileSystem {
     if (fd < 0) {
       result = IOError("while open a file for lock", fname, errno);
     } else if (LockOrUnlock(fd, true) == -1) {
-      // if there is an error in locking, then remove the pathname from
-      // lockedfiles
-      locked_files.erase(fname);
       result = IOError("While lock file", fname, errno);
       close(fd);
     } else {
@@ -801,6 +808,12 @@ class PosixFileSystem : public FileSystem {
       my_lock->filename = fname;
       *lock = my_lock;
     }
+    if (!result.ok()) {
+      // If there is an error in locking, then remove the pathname from
+      // locked_files. (If we got this far, it did not exist in locked_files
+      // before this call.)
+      locked_files.erase(fname);
+    }
 
     mutex_locked_files.Unlock();
     return result;
@@ -820,6 +833,7 @@ class PosixFileSystem : public FileSystem {
       result = IOError("unlock", my_lock->filename, errno);
     }
     close(my_lock->fd_);
+    my_lock->Clear();
     delete my_lock;
     mutex_locked_files.Unlock();
     return result;
@@ -833,10 +847,10 @@ class PosixFileSystem : public FileSystem {
       return IOStatus::OK();
     }
 
-    char the_path[256];
-    char* ret = getcwd(the_path, 256);
+    char the_path[4096];
+    char* ret = getcwd(the_path, 4096);
     if (ret == nullptr) {
-      return IOStatus::IOError(strerror(errno));
+      return IOStatus::IOError(errnoStr(errno).c_str());
     }
 
     *output_path = ret;
@@ -870,7 +884,17 @@ class PosixFileSystem : public FileSystem {
       return IOError("While doing statvfs", fname, errno);
     }
 
-    *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
+    // sbuf.bfree is total free space available to root
+    // sbuf.bavail is total free space available to unprivileged user
+    //  sbuf.bavail <= sbuf.bfree ... pick correct based upon effective user id
+    if (geteuid()) {
+      // non-zero user is unprivileged, or -1 if error.  take more conservative
+      // size
+      *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bavail);
+    } else {
+      // root user can access all disk space
+      *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
+    }
     return IOStatus::OK();
   }
 
@@ -899,7 +923,7 @@ class PosixFileSystem : public FileSystem {
   }
 
   FileOptions OptimizeForLogWrite(const FileOptions& file_options,
-                                 const DBOptions& db_options) const override {
+                                  const DBOptions& db_options) const override {
     FileOptions optimized = file_options;
     optimized.use_mmap_writes = false;
     optimized.use_direct_writes = false;
@@ -931,8 +955,7 @@ class PosixFileSystem : public FileSystem {
   }
 #endif
  private:
-  bool checkedDiskForMmap_;
-  bool forceMmapOff_;  // do we override Env options?
+  bool forceMmapOff_ = false;  // do we override Env options?
 
   // Returns true iff the named directory exists and is a directory.
   virtual bool DirExists(const std::string& dname) {
@@ -943,10 +966,10 @@ class PosixFileSystem : public FileSystem {
     return false;  // stat() failed return false
   }
 
-  bool SupportsFastAllocate(const std::string& path) {
+  bool SupportsFastAllocate(int fd) {
 #ifdef ROCKSDB_FALLOCATE_PRESENT
     struct statfs s;
-    if (statfs(path.c_str(), &s)) {
+    if (fstatfs(fd, &s)) {
       return false;
     }
     switch (s.f_type) {
@@ -960,11 +983,222 @@ class PosixFileSystem : public FileSystem {
         return false;
     }
 #else
-    (void)path;
+    (void)fd;
     return false;
 #endif
   }
 
+  void MaybeForceDisableMmap(int fd) {
+    static std::once_flag s_check_disk_for_mmap_once;
+    assert(this == FileSystem::Default().get());
+    std::call_once(
+        s_check_disk_for_mmap_once,
+        [this](int fdesc) {
+          // this will be executed once in the program's lifetime.
+          // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
+          if (!SupportsFastAllocate(fdesc)) {
+            forceMmapOff_ = true;
+          }
+        },
+        fd);
+  }
+
+#ifdef ROCKSDB_IOURING_PRESENT
+  bool IsIOUringEnabled() {
+    if (RocksDbIOUringEnable && RocksDbIOUringEnable()) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+#endif  // ROCKSDB_IOURING_PRESENT
+
+  // EXPERIMENTAL
+  //
+  // TODO akankshamahajan:
+  // 1. Update Poll API to take into account min_completions
+  // and returns if number of handles in io_handles (any order) completed is
+  // equal to atleast min_completions.
+  // 2. Currently in case of direct_io, Read API is called because of which call
+  // to Poll API fails as it expects IOHandle to be populated.
+  virtual IOStatus Poll(std::vector<void*>& io_handles,
+                        size_t /*min_completions*/) override {
+#if defined(ROCKSDB_IOURING_PRESENT)
+    // io_uring_queue_init.
+    struct io_uring* iu = nullptr;
+    if (thread_local_io_urings_) {
+      iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+    }
+
+    // Init failed, platform doesn't support io_uring.
+    if (iu == nullptr) {
+      return IOStatus::NotSupported("Poll");
+    }
+
+    for (size_t i = 0; i < io_handles.size(); i++) {
+      // The request has been completed in earlier runs.
+      if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
+        continue;
+      }
+      // Loop until IO for io_handles[i] is completed.
+      while (true) {
+        // io_uring_wait_cqe.
+        struct io_uring_cqe* cqe = nullptr;
+        ssize_t ret = io_uring_wait_cqe(iu, &cqe);
+        if (ret) {
+          // abort as it shouldn't be in indeterminate state and there is no
+          // good way currently to handle this error.
+          abort();
+        }
+
+        // Step 3: Populate the request.
+        assert(cqe != nullptr);
+        Posix_IOHandle* posix_handle =
+            static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
+        assert(posix_handle->iu == iu);
+        if (posix_handle->iu != iu) {
+          return IOStatus::IOError("");
+        }
+        // Reset cqe data to catch any stray reuse of it
+        static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
+
+        FSReadRequest req;
+        req.scratch = posix_handle->scratch;
+        req.offset = posix_handle->offset;
+        req.len = posix_handle->len;
+
+        size_t finished_len = 0;
+        size_t bytes_read = 0;
+        bool read_again = false;
+        UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
+                     true /*async_read*/, posix_handle->use_direct_io,
+                     posix_handle->alignment, finished_len, &req, bytes_read,
+                     read_again);
+        posix_handle->is_finished = true;
+        io_uring_cqe_seen(iu, cqe);
+        posix_handle->cb(req, posix_handle->cb_arg);
+
+        (void)finished_len;
+        (void)bytes_read;
+        (void)read_again;
+
+        if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
+          break;
+        }
+      }
+    }
+    return IOStatus::OK();
+#else
+    (void)io_handles;
+    return IOStatus::NotSupported("Poll");
+#endif
+  }
+
+  virtual IOStatus AbortIO(std::vector<void*>& io_handles) override {
+#if defined(ROCKSDB_IOURING_PRESENT)
+    // io_uring_queue_init.
+    struct io_uring* iu = nullptr;
+    if (thread_local_io_urings_) {
+      iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
+    }
+
+    // Init failed, platform doesn't support io_uring.
+    // If Poll is not supported then it didn't submit any request and it should
+    // return OK.
+    if (iu == nullptr) {
+      return IOStatus::OK();
+    }
+
+    for (size_t i = 0; i < io_handles.size(); i++) {
+      Posix_IOHandle* posix_handle =
+          static_cast<Posix_IOHandle*>(io_handles[i]);
+      if (posix_handle->is_finished == true) {
+        continue;
+      }
+      assert(posix_handle->iu == iu);
+      if (posix_handle->iu != iu) {
+        return IOStatus::IOError("");
+      }
+
+      // Prepare the cancel request.
+      struct io_uring_sqe* sqe;
+      sqe = io_uring_get_sqe(iu);
+
+      // In order to cancel the request, sqe->addr of cancel request should
+      // match with the read request submitted which is posix_handle->iov.
+      io_uring_prep_cancel(sqe, &posix_handle->iov, 0);
+      // Sets sqe->user_data to posix_handle.
+      io_uring_sqe_set_data(sqe, posix_handle);
+
+      // submit the request.
+      ssize_t ret = io_uring_submit(iu);
+      if (ret < 0) {
+        fprintf(stderr, "io_uring_submit error: %ld\n", long(ret));
+        return IOStatus::IOError("io_uring_submit() requested but returned " +
+                                 std::to_string(ret));
+      }
+    }
+
+    // After submitting the requests, wait for the requests.
+    for (size_t i = 0; i < io_handles.size(); i++) {
+      if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
+        continue;
+      }
+
+      while (true) {
+        struct io_uring_cqe* cqe = nullptr;
+        ssize_t ret = io_uring_wait_cqe(iu, &cqe);
+        if (ret) {
+          // abort as it shouldn't be in indeterminate state and there is no
+          // good way currently to handle this error.
+          abort();
+        }
+        assert(cqe != nullptr);
+
+        // Returns cqe->user_data.
+        Posix_IOHandle* posix_handle =
+            static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
+        assert(posix_handle->iu == iu);
+        if (posix_handle->iu != iu) {
+          return IOStatus::IOError("");
+        }
+        posix_handle->req_count++;
+
+        // Reset cqe data to catch any stray reuse of it
+        static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
+        io_uring_cqe_seen(iu, cqe);
+
+        // - If the request is cancelled successfully, the original request is
+        //   completed with -ECANCELED and the cancel request is completed with
+        //   a result of 0.
+        // - If the request was already running, the original may or
+        //   may not complete in error. The cancel request will complete with
+        //  -EALREADY for that case.
+        // - And finally, if the request to cancel wasn't
+        //   found, the cancel request is completed with -ENOENT.
+        //
+        // Every handle has to wait for 2 requests completion: original one and
+        // the cancel request which is tracked by PosixHandle::req_count.
+        if (posix_handle->req_count == 2 &&
+            static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
+          posix_handle->is_finished = true;
+          FSReadRequest req;
+          req.status = IOStatus::Aborted();
+          posix_handle->cb(req, posix_handle->cb_arg);
+
+          break;
+        }
+      }
+    }
+    return IOStatus::OK();
+#else
+    // If Poll is not supported then it didn't submit any request and it should
+    // return OK.
+    (void)io_handles;
+    return IOStatus::OK();
+#endif
+  }
+
 #if defined(ROCKSDB_IOURING_PRESENT)
   // io_uring instance
   std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_;
@@ -1018,8 +1252,7 @@ size_t PosixFileSystem::GetLogicalBlockSizeForWriteIfNeeded(
 }
 
 PosixFileSystem::PosixFileSystem()
-    : checkedDiskForMmap_(false),
-      forceMmapOff_(false),
+    : forceMmapOff_(false),
       page_size_(getpagesize()),
       allow_non_owner_access_(true) {
 #if defined(ROCKSDB_IOURING_PRESENT)
@@ -1040,16 +1273,15 @@ PosixFileSystem::PosixFileSystem()
 // Default Posix FileSystem
 //
 std::shared_ptr<FileSystem> FileSystem::Default() {
-  static PosixFileSystem default_fs;
-  static std::shared_ptr<PosixFileSystem> default_fs_ptr(
-      &default_fs, [](PosixFileSystem*) {});
-  return default_fs_ptr;
+  STATIC_AVOID_DESTRUCTION(std::shared_ptr<FileSystem>, instance)
+  (std::make_shared<PosixFileSystem>());
+  return instance;
 }
 
 #ifndef ROCKSDB_LITE
 static FactoryFunc<FileSystem> posix_filesystem_reg =
-    ObjectLibrary::Default()->Register<FileSystem>(
-        "posix://.*",
+    ObjectLibrary::Default()->AddFactory<FileSystem>(
+        ObjectLibrary::PatternEntry("posix").AddSeparator("://", false),
         [](const std::string& /* uri */, std::unique_ptr<FileSystem>* f,
            std::string* /* errmsg */) {
           f->reset(new PosixFileSystem());