2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
24 #include <seastar/core/future.hh>
25 #include <seastar/core/posix.hh>
28 #include <seastar/core/internal/io_desc.hh>
29 #include <seastar/util/bool_class.hh>
30 #include <boost/intrusive_ptr.hpp>
36 class pollable_fd_state;
41 class buffer_allocator;
51 class pollable_fd_state;
53 using pollable_fd_state_ptr = boost::intrusive_ptr<pollable_fd_state>;
55 class pollable_fd_state {
58 virtual ~pollable_fd_state() {}
61 explicit speculation(int epoll_events_guessed = 0) : events(epoll_events_guessed) {}
63 pollable_fd_state(const pollable_fd_state&) = delete;
64 void operator=(const pollable_fd_state&) = delete;
65 /// Set the speculation of specified I/O events
67 /// We try to speculate. If an I/O is completed successfully without being
68 /// blocked and it didn't return the short read/write. We anticipate that
69 /// the next I/O will also be non-blocking and will not return EAGAIN.
70 /// But the speculation is invalidated once it is "used" by
71 /// \c take_speculation()
72 void speculate_epoll(int events) { events_known |= events; }
73 /// Check whether we speculate specified I/O is possible on the fd,
74 /// invalidate the speculation if it matches with all specified \c events.
76 /// \return true if the current speculation includes all specified events
77 bool take_speculation(int events) {
78 // invalidate the speculation set by the last speculate_epoll() call,
79 if (events_known & events) {
80 events_known &= ~events;
86 bool events_rw = false; // single consumer for both read and write (accept())
87 unsigned shutdown_mask = 0; // For udp, there is no shutdown indication from the kernel
88 int events_requested = 0; // wanted by pollin/pollout promises
89 int events_epoll = 0; // installed in epoll
90 int events_known = 0; // returned from epoll
93 friend class pollable_fd;
94 friend class reactor_backend_uring;
96 future<size_t> read_some(char* buffer, size_t size);
97 future<size_t> read_some(uint8_t* buffer, size_t size);
98 future<size_t> read_some(const std::vector<iovec>& iov);
99 future<temporary_buffer<char>> read_some(internal::buffer_allocator* ba);
100 future<> write_all(const char* buffer, size_t size);
101 future<> write_all(const uint8_t* buffer, size_t size);
102 future<size_t> write_some(net::packet& p);
103 future<> write_all(net::packet& p);
105 future<> writeable();
106 future<> readable_or_writeable();
107 future<std::tuple<pollable_fd, socket_address>> accept();
108 future<> connect(socket_address& sa);
109 future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba);
110 future<size_t> sendmsg(struct msghdr *msg);
111 future<size_t> recvmsg(struct msghdr *msg);
112 future<size_t> sendto(socket_address addr, const void* buf, size_t len);
113 future<> poll_rdhup();
116 explicit pollable_fd_state(file_desc fd, speculation speculate = speculation())
117 : fd(std::move(fd)), events_known(speculate.events) {}
119 void maybe_no_more_recv();
120 void maybe_no_more_send();
121 void forget(); // called on end-of-life
123 friend void intrusive_ptr_add_ref(pollable_fd_state* fd) {
126 friend void intrusive_ptr_release(pollable_fd_state* fd);
131 using speculation = pollable_fd_state::speculation;
132 pollable_fd() = default;
133 pollable_fd(file_desc fd, speculation speculate = speculation());
135 future<size_t> read_some(char* buffer, size_t size) {
136 return _s->read_some(buffer, size);
138 future<size_t> read_some(uint8_t* buffer, size_t size) {
139 return _s->read_some(buffer, size);
141 future<size_t> read_some(const std::vector<iovec>& iov) {
142 return _s->read_some(iov);
144 future<temporary_buffer<char>> read_some(internal::buffer_allocator* ba) {
145 return _s->read_some(ba);
147 future<> write_all(const char* buffer, size_t size) {
148 return _s->write_all(buffer, size);
150 future<> write_all(const uint8_t* buffer, size_t size) {
151 return _s->write_all(buffer, size);
153 future<size_t> write_some(net::packet& p) {
154 return _s->write_some(p);
156 future<> write_all(net::packet& p) {
157 return _s->write_all(p);
159 future<> readable() {
160 return _s->readable();
162 future<> writeable() {
163 return _s->writeable();
165 future<> readable_or_writeable() {
166 return _s->readable_or_writeable();
168 future<std::tuple<pollable_fd, socket_address>> accept() {
171 future<> connect(socket_address& sa) {
172 return _s->connect(sa);
174 future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba) {
175 return _s->recv_some(ba);
177 future<size_t> sendmsg(struct msghdr *msg) {
178 return _s->sendmsg(msg);
180 future<size_t> recvmsg(struct msghdr *msg) {
181 return _s->recvmsg(msg);
183 future<size_t> sendto(socket_address addr, const void* buf, size_t len) {
184 return _s->sendto(addr, buf, len);
186 file_desc& get_file_desc() const { return _s->fd; }
187 using shutdown_kernel_only = bool_class<struct shutdown_kernel_only_tag>;
188 void shutdown(int how, shutdown_kernel_only kernel_only = shutdown_kernel_only::yes);
189 void close() { _s.reset(); }
190 explicit operator bool() const noexcept {
193 future<> poll_rdhup() {
194 return _s->poll_rdhup();
197 int get_fd() const { return _s->fd.get(); }
198 void maybe_no_more_recv() { return _s->maybe_no_more_recv(); }
199 void maybe_no_more_send() { return _s->maybe_no_more_send(); }
200 friend class reactor;
201 friend class readable_eventfd;
202 friend class writeable_eventfd;
203 friend class aio_storage_context;
205 pollable_fd_state_ptr _s;
208 class writeable_eventfd;
210 class readable_eventfd {
213 explicit readable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
214 readable_eventfd(readable_eventfd&&) = default;
215 writeable_eventfd write_side();
216 future<size_t> wait();
217 int get_write_fd() { return _fd.get_fd(); }
219 explicit readable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
220 static file_desc try_create_eventfd(size_t initial);
222 friend class writeable_eventfd;
225 class writeable_eventfd {
228 explicit writeable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
229 writeable_eventfd(writeable_eventfd&&) = default;
230 readable_eventfd read_side();
231 void signal(size_t nr);
232 int get_read_fd() { return _fd.get(); }
234 explicit writeable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
235 static file_desc try_create_eventfd(size_t initial);
237 friend class readable_eventfd;