#include <sys/uio.h>
#endif
#include <unistd.h>
+
#include <atomic>
#include <functional>
#include <map>
#include <string>
+
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/io_status.h"
+#include "test_util/sync_point.h"
#include "util/mutexlock.h"
#include "util/thread_local.h"
#define POSIX_FADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */
#define POSIX_FADV_WILLNEED 3 /* [MC1] will need these pages */
#define POSIX_FADV_DONTNEED 4 /* [MC1] don't need these pages */
+
+#define POSIX_MADV_NORMAL 0 /* [MC1] no further special treatment */
+#define POSIX_MADV_RANDOM 1 /* [MC1] expect random page refs */
+#define POSIX_MADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */
+#define POSIX_MADV_WILLNEED 3 /* [MC1] will need these pages */
+#define POSIX_MADV_DONTNEED 4 /* [MC1] don't need these pages */
#endif
namespace ROCKSDB_NAMESPACE {
size_t* size);
};
+/*
+ * DirectIOHelper
+ */
+inline bool IsSectorAligned(const size_t off, size_t sector_size) {
+ assert((sector_size & (sector_size - 1)) == 0);
+ return (off & (sector_size - 1)) == 0;
+}
+
+#ifndef NDEBUG
+inline bool IsSectorAligned(const void* ptr, size_t sector_size) {
+ return uintptr_t(ptr) % sector_size == 0;
+}
+#endif
+
+#if defined(ROCKSDB_IOURING_PRESENT)
+struct Posix_IOHandle {
+ Posix_IOHandle(struct io_uring* _iu,
+ std::function<void(const FSReadRequest&, void*)> _cb,
+ void* _cb_arg, uint64_t _offset, size_t _len, char* _scratch,
+ bool _use_direct_io, size_t _alignment)
+ : iu(_iu),
+ cb(_cb),
+ cb_arg(_cb_arg),
+ offset(_offset),
+ len(_len),
+ scratch(_scratch),
+ use_direct_io(_use_direct_io),
+ alignment(_alignment),
+ is_finished(false),
+ req_count(0) {}
+
+ struct iovec iov;
+ struct io_uring* iu;
+ std::function<void(const FSReadRequest&, void*)> cb;
+ void* cb_arg;
+ uint64_t offset;
+ size_t len;
+ char* scratch;
+ bool use_direct_io;
+ size_t alignment;
+ bool is_finished;
+ // req_count is used by AbortIO API to keep track of number of requests.
+ uint32_t req_count;
+};
+
+inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
+ size_t len, size_t iov_len, bool async_read,
+ bool use_direct_io, size_t alignment,
+ size_t& finished_len, FSReadRequest* req,
+ size_t& bytes_read, bool& read_again) {
+ read_again = false;
+ if (cqe->res < 0) {
+ req->result = Slice(req->scratch, 0);
+ req->status = IOError("Req failed", file_name, cqe->res);
+ } else {
+ bytes_read = static_cast<size_t>(cqe->res);
+ TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read);
+ if (bytes_read == iov_len) {
+ req->result = Slice(req->scratch, req->len);
+ req->status = IOStatus::OK();
+ } else if (bytes_read == 0) {
+ /// cqe->res == 0 can means EOF, or can mean partial results. See
+ // comment
+ // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
+ // Fall back to pread in this case.
+ if (use_direct_io && !IsSectorAligned(finished_len, alignment)) {
+ // Bytes reads don't fill sectors. Should only happen at the end
+ // of the file.
+ req->result = Slice(req->scratch, finished_len);
+ req->status = IOStatus::OK();
+ } else {
+ if (async_read) {
+ // No bytes read. It can means EOF. In case of partial results, it's
+ // caller responsibility to call read/readasync again.
+ req->result = Slice(req->scratch, 0);
+ req->status = IOStatus::OK();
+ } else {
+ read_again = true;
+ }
+ }
+ } else if (bytes_read < iov_len) {
+ assert(bytes_read > 0);
+ if (async_read) {
+ req->result = Slice(req->scratch, bytes_read);
+ req->status = IOStatus::OK();
+ } else {
+ assert(bytes_read + finished_len < len);
+ finished_len += bytes_read;
+ }
+ } else {
+ req->result = Slice(req->scratch, 0);
+ req->status = IOError("Req returned more bytes than requested", file_name,
+ cqe->res);
+ }
+ }
+#ifdef NDEBUG
+ (void)len;
+#endif
+}
+#endif
+
#ifdef OS_LINUX
// Files under a specific directory have the same logical block size.
// This class caches the logical block size for the specified directories to
public:
PosixSequentialFile(const std::string& fname, FILE* file, int fd,
- size_t logical_block_size,
- const EnvOptions& options);
+ size_t logical_block_size, const EnvOptions& options);
virtual ~PosixSequentialFile();
virtual IOStatus Read(size_t n, const IOOptions& opts, Slice* result,
public:
PosixRandomAccessFile(const std::string& fname, int fd,
- size_t logical_block_size,
- const EnvOptions& options
+ size_t logical_block_size, const EnvOptions& options
#if defined(ROCKSDB_IOURING_PRESENT)
,
ThreadLocalPtr* thread_local_io_urings
virtual size_t GetRequiredBufferAlignment() const override {
return logical_sector_size_;
}
+ // EXPERIMENTAL
+ virtual IOStatus ReadAsync(
+ FSReadRequest& req, const IOOptions& opts,
+ std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
+ void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) override;
};
class PosixWritableFile : public FSWritableFile {
PosixMmapReadableFile(const int fd, const std::string& fname, void* base,
size_t length, const EnvOptions& options);
virtual ~PosixMmapReadableFile();
- virtual IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts,
- Slice* result, char* scratch,
- IODebugContext* dbg) const override;
- virtual IOStatus InvalidateCache(size_t offset, size_t length) override;
+ IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts, Slice* result,
+ char* scratch, IODebugContext* dbg) const override;
+ void Hint(AccessPattern pattern) override;
+ IOStatus InvalidateCache(size_t offset, size_t length) override;
};
class PosixMmapFile : public FSWritableFile {
class PosixDirectory : public FSDirectory {
public:
- explicit PosixDirectory(int fd) : fd_(fd) {}
+ explicit PosixDirectory(int fd, const std::string& directory_name);
~PosixDirectory();
virtual IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
+ virtual IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
+
+ virtual IOStatus FsyncWithDirOptions(
+ const IOOptions&, IODebugContext*,
+ const DirFsyncOptions& dir_fsync_options) override;
+
private:
int fd_;
+ bool is_btrfs_;
+ const std::string directory_name_;
};
} // namespace ROCKSDB_NAMESPACE