enum class operation { read, readv, write, writev, fdatasync, recv, recvmsg, send, sendmsg, accept, connect, poll_add, poll_remove, cancel };
private:
operation _op;
- int _fd;
- union {
- uint64_t pos;
- int flags;
- int events;
- } _attr;
// the upper layers give us void pointers, but storing void pointers here is just
// dangerous. The constructors seem to be happy to convert other pointers to void*,
// even if they are marked as explicit, and then you end up losing approximately 3 hours
// and 15 minutes (hypothetically, of course), trying to chase the weirdest bug.
// Let's store a char* for safety, and cast it back to void* in the accessor.
- union {
+ struct read_op {
+ int fd;
+ uint64_t pos;
char* addr;
+ size_t size;
+ bool nowait_works;
+ };
+ struct readv_op {
+ int fd;
+ uint64_t pos;
::iovec* iovec;
+ size_t iov_len;
+ bool nowait_works;
+ };
+ struct recv_op {
+ int fd;
+ char* addr;
+ size_t size;
+ int flags;
+ };
+ struct recvmsg_op {
+ int fd;
::msghdr* msghdr;
+ int flags;
+ };
+ using send_op = recv_op;
+ using sendmsg_op = recvmsg_op;
+ using write_op = read_op;
+ using writev_op = readv_op;
+ struct fdatasync_op {
+ int fd;
+ };
+ struct accept_op {
+ int fd;
::sockaddr* sockaddr;
- } _ptr;
-
- // accept wants a socklen_t*, connect wants a socklen_t
- union {
- size_t len;
socklen_t* socklen_ptr;
+ int flags;
+ };
+ struct connect_op {
+ int fd;
+ ::sockaddr* sockaddr;
socklen_t socklen;
- } _size;
+ };
+ struct poll_add_op {
+ int fd;
+ int events;
+ };
+ struct poll_remove_op {
+ int fd;
+ char* addr;
+ };
+ struct cancel_op {
+ int fd;
+ char* addr;
+ };
+ union {
+ read_op _read;
+ readv_op _readv;
+ recv_op _recv;
+ recvmsg_op _recvmsg;
+ send_op _send;
+ sendmsg_op _sendmsg;
+ write_op _write;
+ writev_op _writev;
+ fdatasync_op _fdatasync;
+ accept_op _accept;
+ connect_op _connect;
+ poll_add_op _poll_add;
+ poll_remove_op _poll_remove;
+ cancel_op _cancel;
+ };
- bool _nowait_works;
+public:
+ static io_request make_read(int fd, uint64_t pos, void* address, size_t size, bool nowait_works) {
+ io_request req;
+ req._op = operation::read;
+ req._read = {
+ .fd = fd,
+ .pos = pos,
+ .addr = reinterpret_cast<char*>(address),
+ .size = size,
+ .nowait_works = nowait_works,
+ };
+ return req;
+ }
- explicit io_request(operation op, int fd, int flags, ::msghdr* msg)
- : _op(op)
- , _fd(fd)
- {
- _attr.flags = flags;
- _ptr.msghdr = msg;
+ static io_request make_readv(int fd, uint64_t pos, std::vector<iovec>& iov, bool nowait_works) {
+ io_request req;
+ req._op = operation::readv;
+ req._readv = {
+ .fd = fd,
+ .pos = pos,
+ .iovec = iov.data(),
+ .iov_len = iov.size(),
+ .nowait_works = nowait_works,
+ };
+ return req;
}
- explicit io_request(operation op, int fd, sockaddr* sa, socklen_t sl)
- : _op(op)
- , _fd(fd)
- {
- _ptr.sockaddr = sa;
- _size.socklen = sl;
+ static io_request make_recv(int fd, void* address, size_t size, int flags) {
+ io_request req;
+ req._op = operation::recv;
+ req._recv = {
+ .fd = fd,
+ .addr = reinterpret_cast<char*>(address),
+ .size = size,
+ .flags = flags,
+ };
+ return req;
}
- explicit io_request(operation op, int fd, int flags, sockaddr* sa, socklen_t* sl)
- : _op(op)
- , _fd(fd)
- {
- _attr.flags = flags;
- _ptr.sockaddr = sa;
- _size.socklen_ptr = sl;
+ static io_request make_recvmsg(int fd, ::msghdr* msg, int flags) {
+ io_request req;
+ req._op = operation::recvmsg;
+ req._recvmsg = {
+ .fd = fd,
+ .msghdr = msg,
+ .flags = flags,
+ };
+ return req;
}
- explicit io_request(operation op, int fd, uint64_t pos, char* ptr, size_t size)
- : _op(op)
- , _fd(fd)
- {
- _attr.pos = pos;
- _ptr.addr = ptr;
- _size.len = size;
+
+ static io_request make_send(int fd, const void* address, size_t size, int flags) {
+ io_request req;
+ req._op = operation::send;
+ req._send = {
+ .fd = fd,
+ .addr = const_cast<char*>(reinterpret_cast<const char*>(address)),
+ .size = size,
+ .flags = flags,
+ };
+ return req;
}
- explicit io_request(operation op, int fd, uint64_t pos, char* ptr, size_t size, bool nowait_works)
- : _op(op)
- , _fd(fd)
- , _nowait_works(nowait_works)
- {
- _attr.pos = pos;
- _ptr.addr = ptr;
- _size.len = size;
+ static io_request make_sendmsg(int fd, ::msghdr* msg, int flags) {
+ io_request req;
+ req._op = operation::sendmsg;
+ req._sendmsg = {
+ .fd = fd,
+ .msghdr = msg,
+ .flags = flags,
+ };
+ return req;
}
- explicit io_request(operation op, int fd, uint64_t pos, iovec* ptr, size_t size, bool nowait_works)
- : _op(op)
- , _fd(fd)
- , _nowait_works(nowait_works)
- {
- _attr.pos = pos;
- _ptr.iovec = ptr;
- _size.len = size;
+ static io_request make_write(int fd, uint64_t pos, const void* address, size_t size, bool nowait_works) {
+ io_request req;
+ req._op = operation::write;
+ req._write = {
+ .fd = fd,
+ .pos = pos,
+ .addr = const_cast<char*>(reinterpret_cast<const char*>(address)),
+ .size = size,
+ .nowait_works = nowait_works,
+ };
+ return req;
}
- explicit io_request(operation op, int fd)
- : _op(op)
- , _fd(fd)
- {}
- explicit io_request(operation op, int fd, int events)
- : _op(op)
- , _fd(fd)
- {
- _attr.events = events;
+ static io_request make_writev(int fd, uint64_t pos, std::vector<iovec>& iov, bool nowait_works) {
+ io_request req;
+ req._op = operation::writev;
+ req._writev = {
+ .fd = fd,
+ .pos = pos,
+ .iovec = iov.data(),
+ .iov_len = iov.size(),
+ .nowait_works = nowait_works,
+ };
+ return req;
}
- explicit io_request(operation op, int fd, char *ptr)
- : _op(op)
- , _fd(fd)
- {
- _ptr.addr = ptr;
+ static io_request make_fdatasync(int fd) {
+ io_request req;
+ req._op = operation::fdatasync;
+ req._fdatasync = {
+ .fd = fd,
+ };
+ return req;
}
-public:
+
+ static io_request make_accept(int fd, struct sockaddr* addr, socklen_t* addrlen, int flags) {
+ io_request req;
+ req._op = operation::accept;
+ req._accept = {
+ .fd = fd,
+ .sockaddr = addr,
+ .socklen_ptr = addrlen,
+ .flags = flags,
+ };
+ return req;
+ }
+
+ static io_request make_connect(int fd, struct sockaddr* addr, socklen_t addrlen) {
+ io_request req;
+ req._op = operation::connect;
+ req._connect = {
+ .fd = fd,
+ .sockaddr = addr,
+ .socklen = addrlen,
+ };
+ return req;
+ }
+
+ static io_request make_poll_add(int fd, int events) {
+ io_request req;
+ req._op = operation::poll_add;
+ req._poll_add = {
+ .fd = fd,
+ .events = events,
+ };
+ return req;
+ }
+
+ static io_request make_poll_remove(int fd, void *addr) {
+ io_request req;
+ req._op = operation::poll_remove;
+ req._poll_remove = {
+ .fd = fd,
+ .addr = reinterpret_cast<char*>(addr),
+ };
+ return req;
+ }
+
+ static io_request make_cancel(int fd, void *addr) {
+ io_request req;
+ req._op = operation::cancel;
+ req._cancel = {
+ .fd = fd,
+ .addr = reinterpret_cast<char*>(addr),
+ };
+ return req;
+ }
+
bool is_read() const {
switch (_op) {
case operation::read:
return _op;
}
- int fd() const {
- return _fd;
- }
-
- uint64_t pos() const {
- return _attr.pos;
- }
-
- int flags() const {
- return _attr.flags;
- }
-
- int events() const {
- return _attr.events;
- }
-
- void* address() const {
- return reinterpret_cast<void*>(_ptr.addr);
- }
-
- iovec* iov() const {
- return _ptr.iovec;
- }
-
- ::sockaddr* posix_sockaddr() const {
- return _ptr.sockaddr;
- }
-
- ::msghdr* msghdr() const {
- return _ptr.msghdr;
- }
-
- size_t size() const {
- return _size.len;
- }
-
- size_t iov_len() const {
- return _size.len;
- }
-
- socklen_t socklen() const {
- return _size.socklen;
- }
-
- socklen_t* socklen_ptr() const {
- return _size.socklen_ptr;
- }
-
- bool nowait_works() const {
- return _nowait_works;
- }
-
- static io_request make_read(int fd, uint64_t pos, void* address, size_t size, bool nowait_works) {
- return io_request(operation::read, fd, pos, reinterpret_cast<char*>(address), size, nowait_works);
- }
-
- static io_request make_readv(int fd, uint64_t pos, std::vector<iovec>& iov, bool nowait_works) {
- return io_request(operation::readv, fd, pos, iov.data(), iov.size(), nowait_works);
- }
-
- static io_request make_recv(int fd, void* address, size_t size, int flags) {
- return io_request(operation::recv, fd, flags, reinterpret_cast<char*>(address), size);
- }
-
- static io_request make_recvmsg(int fd, ::msghdr* msg, int flags) {
- return io_request(operation::recvmsg, fd, flags, msg);
- }
-
- static io_request make_send(int fd, const void* address, size_t size, int flags) {
- return io_request(operation::send, fd, flags, const_cast<char*>(reinterpret_cast<const char*>(address)), size);
- }
-
- static io_request make_sendmsg(int fd, ::msghdr* msg, int flags) {
- return io_request(operation::sendmsg, fd, flags, msg);
- }
-
- static io_request make_write(int fd, uint64_t pos, const void* address, size_t size, bool nowait_works) {
- return io_request(operation::write, fd, pos, const_cast<char*>(reinterpret_cast<const char*>(address)), size, nowait_works);
- }
-
- static io_request make_writev(int fd, uint64_t pos, std::vector<iovec>& iov, bool nowait_works) {
- return io_request(operation::writev, fd, pos, iov.data(), iov.size(), nowait_works);
- }
-
- static io_request make_fdatasync(int fd) {
- return io_request(operation::fdatasync, fd);
- }
-
- static io_request make_accept(int fd, struct sockaddr* addr, socklen_t* addrlen, int flags) {
- return io_request(operation::accept, fd, flags, addr, addrlen);
+ template <operation Op>
+ auto& as() const {
+ if constexpr (Op == operation::read) {
+ return _read;
+ }
+ if constexpr (Op == operation::readv) {
+ return _readv;
+ }
+ if constexpr (Op == operation::recv) {
+ return _recv;
+ }
+ if constexpr (Op == operation::recvmsg) {
+ return _recvmsg;
+ }
+ if constexpr (Op == operation::send) {
+ return _send;
+ }
+ if constexpr (Op == operation::sendmsg) {
+ return _sendmsg;
+ }
+ if constexpr (Op == operation::write) {
+ return _write;
+ }
+ if constexpr (Op == operation::writev) {
+ return _writev;
+ }
+ if constexpr (Op == operation::fdatasync) {
+ return _fdatasync;
+ }
+ if constexpr (Op == operation::accept) {
+ return _accept;
+ }
+ if constexpr (Op == operation::connect) {
+ return _connect;
+ }
+ if constexpr (Op == operation::poll_add) {
+ return _poll_add;
+ }
+ if constexpr (Op == operation::poll_remove) {
+ return _poll_remove;
+ }
+ if constexpr (Op == operation::cancel) {
+ return _cancel;
+ }
}
- static io_request make_connect(int fd, struct sockaddr* addr, socklen_t addrlen) {
- return io_request(operation::connect, fd, addr, addrlen);
- }
+ struct part;
+ std::vector<part> split(size_t max_length);
- static io_request make_poll_add(int fd, int events) {
- return io_request(operation::poll_add, fd, events);
- }
+private:
+ io_request sub_req_buffer(size_t pos, size_t len) const {
+ io_request sub_req;
+ sub_req._op = _op;
+ // read_op and write_op share the same layout, so we don't handle
+ // them separately
+ auto& op = _read;
+ auto& sub_op = sub_req._read;
+ sub_op = {
+ .fd = op.fd,
+ .pos = op.pos + pos,
+ .addr = op.addr + pos,
+ .size = len,
+ .nowait_works = op.nowait_works,
+ };
+ return sub_req;
+ }
+ std::vector<part> split_buffer(size_t max_length);
+
+ io_request sub_req_iovec(size_t pos, std::vector<iovec>& iov) const {
+ io_request sub_req;
+ sub_req._op = _op;
+ // readv_op and writev_op share the same layout, so we don't handle
+ // them separately
+ auto& op = _readv;
+ auto& sub_op = sub_req._readv;
+ sub_op = {
+ .fd = op.fd,
+ .pos = op.pos + pos,
+ .iovec = iov.data(),
+ .iov_len = iov.size(),
+ .nowait_works = op.nowait_works,
+ };
+ return sub_req;
+ }
+ std::vector<part> split_iovec(size_t max_length);
+};
- static io_request make_poll_remove(int fd, void *addr) {
- return io_request(operation::poll_remove, fd, reinterpret_cast<char*>(addr));
- }
- static io_request make_cancel(int fd, void *addr) {
- return io_request(operation::cancel, fd, reinterpret_cast<char*>(addr));
- }
+struct io_request::part {
+ io_request req;
+ size_t size;
+ std::vector<::iovec> iovecs;
};
// Helper pair of IO direction and length
size_t _directed_length; // bit 0 is R/W flag
public:
- io_direction_and_length(const io_request& rq, size_t val) {
- _directed_length = val << 1;
- if (rq.is_read()) {
- _directed_length |= 0x1;
- } else if (!rq.is_write()) {
- on_internal_error(io_log, fmt::format("Unrecognized request passing through I/O queue {}", rq.opname()));
- }
- }
-
- bool is_read() const noexcept { return (_directed_length & 0x1) == 1; }
- bool is_write() const noexcept { return (_directed_length & 0x1) == 0; }
size_t length() const noexcept { return _directed_length >> 1; }
int rw_idx() const noexcept { return _directed_length & 0x1; }
static constexpr int read_idx = 1;
static constexpr int write_idx = 0;
+
+ io_direction_and_length(int idx, size_t val) noexcept
+ : _directed_length((val << 1) | idx)
+ {
+ assert(idx == read_idx || idx == write_idx);
+ }
};
}