]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/env/io_posix.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / env / io_posix.h
index 2d8e83c9d79ce5421cc818f4cc2a0305ff97c1ed..f129668ea546bba31a4e255a8955b9d91ecb8ed0 100644 (file)
 #include <sys/uio.h>
 #endif
 #include <unistd.h>
+
 #include <atomic>
 #include <functional>
 #include <map>
 #include <string>
+
 #include "port/port.h"
 #include "rocksdb/env.h"
 #include "rocksdb/file_system.h"
 #include "rocksdb/io_status.h"
+#include "test_util/sync_point.h"
 #include "util/mutexlock.h"
 #include "util/thread_local.h"
 
 #define POSIX_FADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */
 #define POSIX_FADV_WILLNEED 3   /* [MC1] will need these pages */
 #define POSIX_FADV_DONTNEED 4   /* [MC1] don't need these pages */
+
+#define POSIX_MADV_NORMAL 0     /* [MC1] no further special treatment */
+#define POSIX_MADV_RANDOM 1     /* [MC1] expect random page refs */
+#define POSIX_MADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */
+#define POSIX_MADV_WILLNEED 3   /* [MC1] will need these pages */
+#define POSIX_MADV_DONTNEED 4   /* [MC1] don't need these pages */
 #endif
 
 namespace ROCKSDB_NAMESPACE {
@@ -49,6 +58,107 @@ class PosixHelper {
                                                size_t* size);
 };
 
+/*
+ * DirectIOHelper
+ */
+inline bool IsSectorAligned(const size_t off, size_t sector_size) {
+  assert((sector_size & (sector_size - 1)) == 0);
+  return (off & (sector_size - 1)) == 0;
+}
+
+#ifndef NDEBUG
+inline bool IsSectorAligned(const void* ptr, size_t sector_size) {
+  return uintptr_t(ptr) % sector_size == 0;
+}
+#endif
+
+#if defined(ROCKSDB_IOURING_PRESENT)
+struct Posix_IOHandle {
+  Posix_IOHandle(struct io_uring* _iu,
+                 std::function<void(const FSReadRequest&, void*)> _cb,
+                 void* _cb_arg, uint64_t _offset, size_t _len, char* _scratch,
+                 bool _use_direct_io, size_t _alignment)
+      : iu(_iu),
+        cb(_cb),
+        cb_arg(_cb_arg),
+        offset(_offset),
+        len(_len),
+        scratch(_scratch),
+        use_direct_io(_use_direct_io),
+        alignment(_alignment),
+        is_finished(false),
+        req_count(0) {}
+
+  struct iovec iov;
+  struct io_uring* iu;
+  std::function<void(const FSReadRequest&, void*)> cb;
+  void* cb_arg;
+  uint64_t offset;
+  size_t len;
+  char* scratch;
+  bool use_direct_io;
+  size_t alignment;
+  bool is_finished;
+  // req_count is used by AbortIO API to keep track of number of requests.
+  uint32_t req_count;
+};
+
+inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
+                         size_t len, size_t iov_len, bool async_read,
+                         bool use_direct_io, size_t alignment,
+                         size_t& finished_len, FSReadRequest* req,
+                         size_t& bytes_read, bool& read_again) {
+  read_again = false;
+  if (cqe->res < 0) {
+    req->result = Slice(req->scratch, 0);
+    req->status = IOError("Req failed", file_name, cqe->res);
+  } else {
+    bytes_read = static_cast<size_t>(cqe->res);
+    TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read);
+    if (bytes_read == iov_len) {
+      req->result = Slice(req->scratch, req->len);
+      req->status = IOStatus::OK();
+    } else if (bytes_read == 0) {
+      /// cqe->res == 0 can means EOF, or can mean partial results. See
+      // comment
+      // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
+      // Fall back to pread in this case.
+      if (use_direct_io && !IsSectorAligned(finished_len, alignment)) {
+        // Bytes reads don't fill sectors. Should only happen at the end
+        // of the file.
+        req->result = Slice(req->scratch, finished_len);
+        req->status = IOStatus::OK();
+      } else {
+        if (async_read) {
+          // No  bytes read. It can means EOF. In case of partial results, it's
+          // caller responsibility to call read/readasync again.
+          req->result = Slice(req->scratch, 0);
+          req->status = IOStatus::OK();
+        } else {
+          read_again = true;
+        }
+      }
+    } else if (bytes_read < iov_len) {
+      assert(bytes_read > 0);
+      if (async_read) {
+        req->result = Slice(req->scratch, bytes_read);
+        req->status = IOStatus::OK();
+      } else {
+        assert(bytes_read + finished_len < len);
+        finished_len += bytes_read;
+      }
+    } else {
+      req->result = Slice(req->scratch, 0);
+      req->status = IOError("Req returned more bytes than requested", file_name,
+                            cqe->res);
+    }
+  }
+#ifdef NDEBUG
+  (void)len;
+#endif
+}
+#endif
+
 #ifdef OS_LINUX
 // Files under a specific directory have the same logical block size.
 // This class caches the logical block size for the specified directories to
@@ -132,8 +242,7 @@ class PosixSequentialFile : public FSSequentialFile {
 
  public:
   PosixSequentialFile(const std::string& fname, FILE* file, int fd,
-                      size_t logical_block_size,
-                      const EnvOptions& options);
+                      size_t logical_block_size, const EnvOptions& options);
   virtual ~PosixSequentialFile();
 
   virtual IOStatus Read(size_t n, const IOOptions& opts, Slice* result,
@@ -181,8 +290,7 @@ class PosixRandomAccessFile : public FSRandomAccessFile {
 
  public:
   PosixRandomAccessFile(const std::string& fname, int fd,
-                        size_t logical_block_size,
-                        const EnvOptions& options
+                        size_t logical_block_size, const EnvOptions& options
 #if defined(ROCKSDB_IOURING_PRESENT)
                         ,
                         ThreadLocalPtr* thread_local_io_urings
@@ -210,6 +318,11 @@ class PosixRandomAccessFile : public FSRandomAccessFile {
   virtual size_t GetRequiredBufferAlignment() const override {
     return logical_sector_size_;
   }
+  // EXPERIMENTAL
+  virtual IOStatus ReadAsync(
+      FSReadRequest& req, const IOOptions& opts,
+      std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
+      void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) override;
 };
 
 class PosixWritableFile : public FSWritableFile {
@@ -293,10 +406,10 @@ class PosixMmapReadableFile : public FSRandomAccessFile {
   PosixMmapReadableFile(const int fd, const std::string& fname, void* base,
                         size_t length, const EnvOptions& options);
   virtual ~PosixMmapReadableFile();
-  virtual IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts,
-                        Slice* result, char* scratch,
-                        IODebugContext* dbg) const override;
-  virtual IOStatus InvalidateCache(size_t offset, size_t length) override;
+  IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts, Slice* result,
+                char* scratch, IODebugContext* dbg) const override;
+  void Hint(AccessPattern pattern) override;
+  IOStatus InvalidateCache(size_t offset, size_t length) override;
 };
 
 class PosixMmapFile : public FSWritableFile {
@@ -391,12 +504,20 @@ struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer {
 
 class PosixDirectory : public FSDirectory {
  public:
-  explicit PosixDirectory(int fd) : fd_(fd) {}
+  explicit PosixDirectory(int fd, const std::string& directory_name);
   ~PosixDirectory();
   virtual IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
 
+  virtual IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
+
+  virtual IOStatus FsyncWithDirOptions(
+      const IOOptions&, IODebugContext*,
+      const DirFsyncOptions& dir_fsync_options) override;
+
  private:
   int fd_;
+  bool is_btrfs_;
+  const std::string directory_name_;
 };
 
 }  // namespace ROCKSDB_NAMESPACE