]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/internal/pollable_fd.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / internal / pollable_fd.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21
22 #pragma once
23
24 #include <seastar/core/future.hh>
25 #include <seastar/core/posix.hh>
26 #include <vector>
27 #include <tuple>
28 #include <seastar/core/internal/io_desc.hh>
29 #include <seastar/util/bool_class.hh>
30 #include <boost/intrusive_ptr.hpp>
31
32 namespace seastar {
33
34 class reactor;
35 class pollable_fd;
36 class pollable_fd_state;
37 class socket_address;
38
39 namespace internal {
40
41 class buffer_allocator;
42
43 }
44
45 namespace net {
46
47 class packet;
48
49 }
50
51 class pollable_fd_state;
52
53 using pollable_fd_state_ptr = boost::intrusive_ptr<pollable_fd_state>;
54
55 class pollable_fd_state {
56 unsigned _refs = 0;
57 public:
58 virtual ~pollable_fd_state() {}
59 struct speculation {
60 int events = 0;
61 explicit speculation(int epoll_events_guessed = 0) : events(epoll_events_guessed) {}
62 };
63 pollable_fd_state(const pollable_fd_state&) = delete;
64 void operator=(const pollable_fd_state&) = delete;
65 /// Set the speculation of specified I/O events
66 ///
67 /// We try to speculate. If an I/O is completed successfully without being
68 /// blocked and it didn't return the short read/write. We anticipate that
69 /// the next I/O will also be non-blocking and will not return EAGAIN.
70 /// But the speculation is invalidated once it is "used" by
71 /// \c take_speculation()
72 void speculate_epoll(int events) { events_known |= events; }
73 /// Check whether we speculate specified I/O is possible on the fd,
74 /// invalidate the speculation if it matches with all specified \c events.
75 ///
76 /// \return true if the current speculation includes all specified events
77 bool take_speculation(int events) {
78 // invalidate the speculation set by the last speculate_epoll() call,
79 if (events_known & events) {
80 events_known &= ~events;
81 return true;
82 }
83 return false;
84 }
85 file_desc fd;
86 bool events_rw = false; // single consumer for both read and write (accept())
87 unsigned shutdown_mask = 0; // For udp, there is no shutdown indication from the kernel
88 int events_requested = 0; // wanted by pollin/pollout promises
89 int events_epoll = 0; // installed in epoll
90 int events_known = 0; // returned from epoll
91
92 friend class reactor;
93 friend class pollable_fd;
94 friend class reactor_backend_uring;
95
96 future<size_t> read_some(char* buffer, size_t size);
97 future<size_t> read_some(uint8_t* buffer, size_t size);
98 future<size_t> read_some(const std::vector<iovec>& iov);
99 future<temporary_buffer<char>> read_some(internal::buffer_allocator* ba);
100 future<> write_all(const char* buffer, size_t size);
101 future<> write_all(const uint8_t* buffer, size_t size);
102 future<size_t> write_some(net::packet& p);
103 future<> write_all(net::packet& p);
104 future<> readable();
105 future<> writeable();
106 future<> readable_or_writeable();
107 future<std::tuple<pollable_fd, socket_address>> accept();
108 future<> connect(socket_address& sa);
109 future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba);
110 future<size_t> sendmsg(struct msghdr *msg);
111 future<size_t> recvmsg(struct msghdr *msg);
112 future<size_t> sendto(socket_address addr, const void* buf, size_t len);
113 future<> poll_rdhup();
114
115 protected:
116 explicit pollable_fd_state(file_desc fd, speculation speculate = speculation())
117 : fd(std::move(fd)), events_known(speculate.events) {}
118 private:
119 void maybe_no_more_recv();
120 void maybe_no_more_send();
121 void forget(); // called on end-of-life
122
123 friend void intrusive_ptr_add_ref(pollable_fd_state* fd) {
124 ++fd->_refs;
125 }
126 friend void intrusive_ptr_release(pollable_fd_state* fd);
127 };
128
129 class pollable_fd {
130 public:
131 using speculation = pollable_fd_state::speculation;
132 pollable_fd() = default;
133 pollable_fd(file_desc fd, speculation speculate = speculation());
134 public:
135 future<size_t> read_some(char* buffer, size_t size) {
136 return _s->read_some(buffer, size);
137 }
138 future<size_t> read_some(uint8_t* buffer, size_t size) {
139 return _s->read_some(buffer, size);
140 }
141 future<size_t> read_some(const std::vector<iovec>& iov) {
142 return _s->read_some(iov);
143 }
144 future<temporary_buffer<char>> read_some(internal::buffer_allocator* ba) {
145 return _s->read_some(ba);
146 }
147 future<> write_all(const char* buffer, size_t size) {
148 return _s->write_all(buffer, size);
149 }
150 future<> write_all(const uint8_t* buffer, size_t size) {
151 return _s->write_all(buffer, size);
152 }
153 future<size_t> write_some(net::packet& p) {
154 return _s->write_some(p);
155 }
156 future<> write_all(net::packet& p) {
157 return _s->write_all(p);
158 }
159 future<> readable() {
160 return _s->readable();
161 }
162 future<> writeable() {
163 return _s->writeable();
164 }
165 future<> readable_or_writeable() {
166 return _s->readable_or_writeable();
167 }
168 future<std::tuple<pollable_fd, socket_address>> accept() {
169 return _s->accept();
170 }
171 future<> connect(socket_address& sa) {
172 return _s->connect(sa);
173 }
174 future<temporary_buffer<char>> recv_some(internal::buffer_allocator* ba) {
175 return _s->recv_some(ba);
176 }
177 future<size_t> sendmsg(struct msghdr *msg) {
178 return _s->sendmsg(msg);
179 }
180 future<size_t> recvmsg(struct msghdr *msg) {
181 return _s->recvmsg(msg);
182 }
183 future<size_t> sendto(socket_address addr, const void* buf, size_t len) {
184 return _s->sendto(addr, buf, len);
185 }
186 file_desc& get_file_desc() const { return _s->fd; }
187 using shutdown_kernel_only = bool_class<struct shutdown_kernel_only_tag>;
188 void shutdown(int how, shutdown_kernel_only kernel_only = shutdown_kernel_only::yes);
189 void close() { _s.reset(); }
190 explicit operator bool() const noexcept {
191 return bool(_s);
192 }
193 future<> poll_rdhup() {
194 return _s->poll_rdhup();
195 }
196 protected:
197 int get_fd() const { return _s->fd.get(); }
198 void maybe_no_more_recv() { return _s->maybe_no_more_recv(); }
199 void maybe_no_more_send() { return _s->maybe_no_more_send(); }
200 friend class reactor;
201 friend class readable_eventfd;
202 friend class writeable_eventfd;
203 friend class aio_storage_context;
204 private:
205 pollable_fd_state_ptr _s;
206 };
207
208 class writeable_eventfd;
209
210 class readable_eventfd {
211 pollable_fd _fd;
212 public:
213 explicit readable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
214 readable_eventfd(readable_eventfd&&) = default;
215 writeable_eventfd write_side();
216 future<size_t> wait();
217 int get_write_fd() { return _fd.get_fd(); }
218 private:
219 explicit readable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
220 static file_desc try_create_eventfd(size_t initial);
221
222 friend class writeable_eventfd;
223 };
224
225 class writeable_eventfd {
226 file_desc _fd;
227 public:
228 explicit writeable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
229 writeable_eventfd(writeable_eventfd&&) = default;
230 readable_eventfd read_side();
231 void signal(size_t nr);
232 int get_read_fd() { return _fd.get(); }
233 private:
234 explicit writeable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
235 static file_desc try_create_eventfd(size_t initial);
236
237 friend class readable_eventfd;
238 };
239
240 }