#include <vector>
#include <tuple>
#include <seastar/core/internal/io_desc.hh>
+#include <seastar/util/bool_class.hh>
#include <boost/intrusive_ptr.hpp>
namespace seastar {
};
pollable_fd_state(const pollable_fd_state&) = delete;
void operator=(const pollable_fd_state&) = delete;
+ /// Set the speculation of specified I/O events
+ ///
+ /// We try to speculate. If an I/O is completed successfully without being
+ /// blocked and it didn't return the short read/write. We anticipate that
+ /// the next I/O will also be non-blocking and will not return EAGAIN.
+ /// But the speculation is invalidated once it is "used" by
+ /// \c take_speculation()
void speculate_epoll(int events) { events_known |= events; }
+ /// Check whether we speculate specified I/O is possible on the fd,
+ /// invalidate the speculation if it matches with all specified \c events.
+ ///
+ /// \return true if the current speculation includes all specified events
+ bool take_speculation(int events) {
+ // invalidate the speculation set by the last speculate_epoll() call,
+ if (events_known & events) {
+ events_known &= ~events;
+ return true;
+ }
+ return false;
+ }
file_desc fd;
bool events_rw = false; // single consumer for both read and write (accept())
- bool no_more_recv = false; // For udp, there is no shutdown indication from the kernel
- bool no_more_send = false; // For udp, there is no shutdown indication from the kernel
+ unsigned shutdown_mask = 0; // For udp, there is no shutdown indication from the kernel
int events_requested = 0; // wanted by pollin/pollout promises
int events_epoll = 0; // installed in epoll
int events_known = 0; // returned from epoll
friend class reactor;
friend class pollable_fd;
+ friend class reactor_backend_uring;
future<size_t> read_some(char* buffer, size_t size);
future<size_t> read_some(uint8_t* buffer, size_t size);
future<> readable();
future<> writeable();
future<> readable_or_writeable();
- void abort_reader();
- void abort_writer();
future<std::tuple<pollable_fd, socket_address>> accept();
future<> connect(socket_address& sa);
+ future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba);
future<size_t> sendmsg(struct msghdr *msg);
future<size_t> recvmsg(struct msghdr *msg);
future<size_t> sendto(socket_address addr, const void* buf, size_t len);
+ future<> poll_rdhup();
protected:
explicit pollable_fd_state(file_desc fd, speculation speculate = speculation())
future<> readable_or_writeable() {
return _s->readable_or_writeable();
}
- void abort_reader() {
- return _s->abort_reader();
- }
- void abort_writer() {
- return _s->abort_writer();
- }
future<std::tuple<pollable_fd, socket_address>> accept() {
return _s->accept();
}
future<> connect(socket_address& sa) {
return _s->connect(sa);
}
+ future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba) {
+ return _s->recv_some(ba);
+ }
future<size_t> sendmsg(struct msghdr *msg) {
return _s->sendmsg(msg);
}
return _s->sendto(addr, buf, len);
}
file_desc& get_file_desc() const { return _s->fd; }
- void shutdown(int how);
+ using shutdown_kernel_only = bool_class<struct shutdown_kernel_only_tag>;
+ void shutdown(int how, shutdown_kernel_only kernel_only = shutdown_kernel_only::yes);
void close() { _s.reset(); }
explicit operator bool() const noexcept {
return bool(_s);
}
+ future<> poll_rdhup() {
+ return _s->poll_rdhup();
+ }
protected:
int get_fd() const { return _s->fd.get(); }
void maybe_no_more_recv() { return _s->maybe_no_more_recv(); }