]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/reactor_backend.hh
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / src / core / reactor_backend.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21
22 #pragma once
23
24 #include <seastar/core/future.hh>
25 #include <seastar/core/posix.hh>
26 #include <seastar/core/internal/pollable_fd.hh>
27 #include <seastar/core/internal/poll.hh>
28 #include <seastar/core/linux-aio.hh>
29 #include <seastar/core/cacheline.hh>
30 #include <sys/time.h>
31 #include <signal.h>
32 #include <thread>
33 #include <stack>
34 #include <boost/any.hpp>
35 #include <boost/program_options.hpp>
36 #include <boost/container/static_vector.hpp>
37
38 #ifdef HAVE_OSV
39 #include <osv/newpoll.hh>
40 #endif
41
42 namespace seastar {
43
44 class reactor;
45
46 // FIXME: merge it with storage context below. At this point the
47 // main thing to do is unify the iocb list
48 struct aio_general_context {
49 explicit aio_general_context(size_t nr);
50 ~aio_general_context();
51 internal::linux_abi::aio_context_t io_context{};
52 std::unique_ptr<internal::linux_abi::iocb*[]> iocbs;
53 internal::linux_abi::iocb** last = iocbs.get();
54 void queue(internal::linux_abi::iocb* iocb);
55 size_t flush();
56 };
57
58 class aio_storage_context {
59 static constexpr unsigned max_aio = 1024;
60
61 class iocb_pool {
62 alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
63 std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
64 public:
65 iocb_pool();
66 internal::linux_abi::iocb& get_one();
67 void put_one(internal::linux_abi::iocb* io);
68 unsigned outstanding() const;
69 bool has_capacity() const;
70 };
71
72 reactor* _r;
73 internal::linux_abi::aio_context_t _io_context;
74 boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue;
75 iocb_pool _iocb_pool;
76 size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
77 boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry;
78 public:
79 explicit aio_storage_context(reactor* r);
80 ~aio_storage_context();
81
82 bool reap_completions();
83 bool submit_work();
84 bool can_sleep() const;
85 };
86
87 class completion_with_iocb {
88 bool _in_context = false;
89 internal::linux_abi::iocb _iocb;
90 protected:
91 completion_with_iocb(int fd, int events, void* user_data);
92 void completed() {
93 _in_context = false;
94 }
95 public:
96 void maybe_queue(aio_general_context& context);
97 };
98
99 class fd_kernel_completion : public kernel_completion {
100 protected:
101 reactor* _r;
102 file_desc& _fd;
103 fd_kernel_completion(reactor* r, file_desc& fd) : _r(r), _fd(fd) {}
104 public:
105 file_desc& fd() {
106 return _fd;
107 }
108 };
109
110 struct hrtimer_aio_completion : public fd_kernel_completion,
111 public completion_with_iocb {
112 hrtimer_aio_completion(reactor* r, file_desc& fd);
113 virtual void complete_with(ssize_t value) override;
114 };
115
116 struct task_quota_aio_completion : public fd_kernel_completion,
117 public completion_with_iocb {
118 task_quota_aio_completion(reactor* r, file_desc& fd);
119 virtual void complete_with(ssize_t value) override;
120 };
121
122 struct smp_wakeup_aio_completion : public fd_kernel_completion,
123 public completion_with_iocb {
124 smp_wakeup_aio_completion(reactor* r, file_desc& fd);
125 virtual void complete_with(ssize_t value) override;
126 };
127
128 // Common aio-based Implementation of the task quota and hrtimer.
129 class preempt_io_context {
130 reactor* _r;
131 aio_general_context _context{2};
132
133 task_quota_aio_completion _task_quota_aio_completion;
134 hrtimer_aio_completion _hrtimer_aio_completion;
135 public:
136 preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer);
137 bool service_preempting_io();
138
139 size_t flush() {
140 return _context.flush();
141 }
142
143 void reset_preemption_monitor();
144 void request_preemption();
145 void start_tick();
146 void stop_tick();
147 };
148
149 // The "reactor_backend" interface provides a method of waiting for various
150 // basic events on one thread. We have one implementation based on epoll and
151 // file-descriptors (reactor_backend_epoll) and one implementation based on
152 // OSv-specific file-descriptor-less mechanisms (reactor_backend_osv).
153 class reactor_backend {
154 public:
155 virtual ~reactor_backend() {};
156 // The methods below are used to communicate with the kernel.
157 // reap_kernel_completions() will complete any previous async
158 // work that is ready to consume.
159 // kernel_submit_work() submit new events that were produced.
160 // Both of those methods are asynchronous and will never block.
161 //
162 // wait_and_process_events on the other hand may block, and is called when
163 // we are about to go to sleep.
164 virtual bool reap_kernel_completions() = 0;
165 virtual bool kernel_submit_work() = 0;
166 virtual bool kernel_events_can_sleep() const = 0;
167 virtual void wait_and_process_events(const sigset_t* active_sigmask = nullptr) = 0;
168
169 // Methods that allow polling on file descriptors. This will only work on
170 // reactor_backend_epoll. Other reactor_backend will probably abort if
171 // they are called (which is fine if no file descriptors are waited on):
172 virtual future<> readable(pollable_fd_state& fd) = 0;
173 virtual future<> writeable(pollable_fd_state& fd) = 0;
174 virtual future<> readable_or_writeable(pollable_fd_state& fd) = 0;
175 virtual void forget(pollable_fd_state& fd) noexcept = 0;
176
177 virtual future<std::tuple<pollable_fd, socket_address>>
178 accept(pollable_fd_state& listenfd) = 0;
179 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) = 0;
180 virtual void shutdown(pollable_fd_state& fd, int how) = 0;
181 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) = 0;
182 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) = 0;
183 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) = 0;
184 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) = 0;
185
186 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) = 0;
187 virtual void start_tick() = 0;
188 virtual void stop_tick() = 0;
189 virtual void arm_highres_timer(const ::itimerspec& ts) = 0;
190 virtual void reset_preemption_monitor() = 0;
191 virtual void request_preemption() = 0;
192 virtual void start_handling_signal() = 0;
193
194 virtual pollable_fd_state_ptr make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) = 0;
195 };
196
197 // reactor backend using file-descriptor & epoll, suitable for running on
198 // Linux. Can wait on multiple file descriptors, and converts other events
199 // (such as timers, signals, inter-thread notifications) into file descriptors
200 // using mechanisms like timerfd, signalfd and eventfd respectively.
201 class reactor_backend_epoll : public reactor_backend {
202 reactor* _r;
203 std::thread _task_quota_timer_thread;
204 timer_t _steady_clock_timer = {};
205 private:
206 file_desc _epollfd;
207 future<> get_epoll_future(pollable_fd_state& fd, int event);
208 void complete_epoll_event(pollable_fd_state& fd, int events, int event);
209 aio_storage_context _storage_context;
210 bool wait_and_process(int timeout, const sigset_t* active_sigmask);
211 bool _need_epoll_events = false;
212 public:
213 explicit reactor_backend_epoll(reactor* r);
214 virtual ~reactor_backend_epoll() override;
215
216 virtual bool reap_kernel_completions() override;
217 virtual bool kernel_submit_work() override;
218 virtual bool kernel_events_can_sleep() const override;
219 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
220 virtual future<> readable(pollable_fd_state& fd) override;
221 virtual future<> writeable(pollable_fd_state& fd) override;
222 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
223 virtual void forget(pollable_fd_state& fd) noexcept override;
224
225 virtual future<std::tuple<pollable_fd, socket_address>>
226 accept(pollable_fd_state& listenfd) override;
227 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
228 virtual void shutdown(pollable_fd_state& fd, int how) override;
229 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
230 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
231 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
232 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
233
234 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
235 virtual void start_tick() override;
236 virtual void stop_tick() override;
237 virtual void arm_highres_timer(const ::itimerspec& ts) override;
238 virtual void reset_preemption_monitor() override;
239 virtual void request_preemption() override;
240 virtual void start_handling_signal() override;
241
242 virtual pollable_fd_state_ptr
243 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
244 };
245
246 class reactor_backend_aio : public reactor_backend {
247 static constexpr size_t max_polls = 10000;
248 reactor* _r;
249 file_desc _hrtimer_timerfd;
250 aio_storage_context _storage_context;
251 // We use two aio contexts, one for preempting events (the timer tick and
252 // signals), the other for non-preempting events (fd poll).
253 preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer
254 aio_general_context _polling_io{max_polls}; // FIXME: unify with disk aio_context
255 hrtimer_aio_completion _hrtimer_poll_completion;
256 smp_wakeup_aio_completion _smp_wakeup_aio_completion;
257 static file_desc make_timerfd();
258 bool await_events(int timeout, const sigset_t* active_sigmask);
259 public:
260 explicit reactor_backend_aio(reactor* r);
261
262 virtual bool reap_kernel_completions() override;
263 virtual bool kernel_submit_work() override;
264 virtual bool kernel_events_can_sleep() const override;
265 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
266 future<> poll(pollable_fd_state& fd, int events);
267 virtual future<> readable(pollable_fd_state& fd) override;
268 virtual future<> writeable(pollable_fd_state& fd) override;
269 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
270 virtual void forget(pollable_fd_state& fd) noexcept override;
271
272 virtual future<std::tuple<pollable_fd, socket_address>>
273 accept(pollable_fd_state& listenfd) override;
274 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
275 virtual void shutdown(pollable_fd_state& fd, int how) override;
276 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
277 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
278 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
279 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
280
281 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
282 virtual void start_tick() override;
283 virtual void stop_tick() override;
284 virtual void arm_highres_timer(const ::itimerspec& its) override;
285 virtual void reset_preemption_monitor() override;
286 virtual void request_preemption() override;
287 virtual void start_handling_signal() override;
288
289 virtual pollable_fd_state_ptr
290 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
291 };
292
293 #ifdef HAVE_OSV
294 // reactor_backend using OSv-specific features, without any file descriptors.
295 // This implementation cannot currently wait on file descriptors, but unlike
296 // reactor_backend_epoll it doesn't need file descriptors for waiting on a
297 // timer, for example, so file descriptors are not necessary.
298 class reactor_backend_osv : public reactor_backend {
299 private:
300 osv::newpoll::poller _poller;
301 future<> get_poller_future(reactor_notifier_osv *n);
302 promise<> _timer_promise;
303 public:
304 reactor_backend_osv();
305 virtual ~reactor_backend_osv() override { }
306
307 virtual bool reap_kernel_completions() override;
308 virtual bool kernel_submit_work() override;
309 virtual bool kernel_events_can_sleep() const override;
310 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
311 virtual future<> readable(pollable_fd_state& fd) override;
312 virtual future<> writeable(pollable_fd_state& fd) override;
313 virtual void forget(pollable_fd_state& fd) noexcept override;
314
315 virtual future<std::tuple<pollable_fd, socket_address>>
316 accept(pollable_fd_state& listenfd) override;
317 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
318 virtual void shutdown(pollable_fd_state& fd, int how) override;
319 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
320 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
321 virtual future<size_t> write_some(net::packet& p) override;
322 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
323
324 void enable_timer(steady_clock_type::time_point when);
325 virtual pollable_fd_state_ptr
326 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
327 };
328 #endif /* HAVE_OSV */
329
330 class reactor_backend_selector {
331 std::string _name;
332 private:
333 static bool has_enough_aio_nr();
334 explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {}
335 public:
336 std::unique_ptr<reactor_backend> create(reactor* r);
337 static reactor_backend_selector default_backend();
338 static std::vector<reactor_backend_selector> available();
339 friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) {
340 return os << rbs._name;
341 }
342 friend void validate(boost::any& v, const std::vector<std::string> values, reactor_backend_selector* rbs, int) {
343 namespace bpo = boost::program_options;
344 bpo::validators::check_first_occurrence(v);
345 auto s = bpo::validators::get_single_string(values);
346 for (auto&& x : available()) {
347 if (s == x._name) {
348 v = std::move(x);
349 return;
350 }
351 }
352 throw bpo::validation_error(bpo::validation_error::invalid_option_value);
353 }
354 };
355
356 }