]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/core/reactor_backend.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / core / reactor_backend.hh
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/core/posix.hh>
26#include <seastar/core/internal/pollable_fd.hh>
27#include <seastar/core/internal/poll.hh>
28#include <seastar/core/linux-aio.hh>
29#include <seastar/core/cacheline.hh>
30#include <sys/time.h>
31#include <signal.h>
32#include <thread>
33#include <stack>
34#include <boost/any.hpp>
35#include <boost/program_options.hpp>
36#include <boost/container/static_vector.hpp>
37
38#ifdef HAVE_OSV
39#include <osv/newpoll.hh>
40#endif
41
42namespace seastar {
43
44class reactor;
45
46// FIXME: merge it with storage context below. At this point the
47// main thing to do is unify the iocb list
48struct aio_general_context {
49 explicit aio_general_context(size_t nr);
50 ~aio_general_context();
51 internal::linux_abi::aio_context_t io_context{};
52 std::unique_ptr<internal::linux_abi::iocb*[]> iocbs;
20effc67
TL
53 internal::linux_abi::iocb** last;
54 internal::linux_abi::iocb** const end;
9f95a23c 55 void queue(internal::linux_abi::iocb* iocb);
20effc67 56 // submit all queued iocbs and return their count.
9f95a23c
TL
57 size_t flush();
58};
59
60class aio_storage_context {
61 static constexpr unsigned max_aio = 1024;
62
63 class iocb_pool {
64 alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
65 std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
66 public:
67 iocb_pool();
68 internal::linux_abi::iocb& get_one();
69 void put_one(internal::linux_abi::iocb* io);
70 unsigned outstanding() const;
71 bool has_capacity() const;
72 };
73
20effc67 74 reactor& _r;
9f95a23c
TL
75 internal::linux_abi::aio_context_t _io_context;
76 boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue;
77 iocb_pool _iocb_pool;
78 size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
f67539c2 79 using pending_aio_retry_t = boost::container::static_vector<internal::linux_abi::iocb*, max_aio>;
20effc67
TL
80 pending_aio_retry_t _pending_aio_retry; // Pending retries iocbs
81 pending_aio_retry_t _aio_retries; // Currently retried iocbs
82 future<> _pending_aio_retry_fut = make_ready_future<>();
f67539c2 83 internal::linux_abi::io_event _ev_buffer[max_aio];
20effc67
TL
84
85 bool need_to_retry() const noexcept {
86 return !_pending_aio_retry.empty() || !_aio_retries.empty();
87 }
88
89 bool retry_in_progress() const noexcept {
90 return !_pending_aio_retry_fut.available();
91 }
92
9f95a23c 93public:
20effc67 94 explicit aio_storage_context(reactor& r);
9f95a23c
TL
95 ~aio_storage_context();
96
20effc67 97 bool reap_completions(bool allow_retry = true);
f67539c2 98 void schedule_retry();
9f95a23c
TL
99 bool submit_work();
100 bool can_sleep() const;
20effc67 101 future<> stop() noexcept;
9f95a23c
TL
102};
103
104class completion_with_iocb {
105 bool _in_context = false;
106 internal::linux_abi::iocb _iocb;
107protected:
108 completion_with_iocb(int fd, int events, void* user_data);
109 void completed() {
110 _in_context = false;
111 }
112public:
113 void maybe_queue(aio_general_context& context);
114};
115
116class fd_kernel_completion : public kernel_completion {
117protected:
9f95a23c 118 file_desc& _fd;
20effc67 119 fd_kernel_completion(file_desc& fd) : _fd(fd) {}
9f95a23c
TL
120public:
121 file_desc& fd() {
122 return _fd;
123 }
124};
125
126struct hrtimer_aio_completion : public fd_kernel_completion,
127 public completion_with_iocb {
20effc67
TL
128private:
129 reactor& _r;
130public:
131 hrtimer_aio_completion(reactor& r, file_desc& fd);
9f95a23c
TL
132 virtual void complete_with(ssize_t value) override;
133};
134
135struct task_quota_aio_completion : public fd_kernel_completion,
136 public completion_with_iocb {
20effc67 137 task_quota_aio_completion(file_desc& fd);
9f95a23c
TL
138 virtual void complete_with(ssize_t value) override;
139};
140
141struct smp_wakeup_aio_completion : public fd_kernel_completion,
142 public completion_with_iocb {
20effc67 143 smp_wakeup_aio_completion(file_desc& fd);
9f95a23c
TL
144 virtual void complete_with(ssize_t value) override;
145};
146
147// Common aio-based Implementation of the task quota and hrtimer.
148class preempt_io_context {
20effc67 149 reactor& _r;
9f95a23c
TL
150 aio_general_context _context{2};
151
152 task_quota_aio_completion _task_quota_aio_completion;
153 hrtimer_aio_completion _hrtimer_aio_completion;
154public:
20effc67 155 preempt_io_context(reactor& r, file_desc& task_quota, file_desc& hrtimer);
9f95a23c
TL
156 bool service_preempting_io();
157
158 size_t flush() {
159 return _context.flush();
160 }
161
162 void reset_preemption_monitor();
163 void request_preemption();
164 void start_tick();
165 void stop_tick();
166};
167
168// The "reactor_backend" interface provides a method of waiting for various
169// basic events on one thread. We have one implementation based on epoll and
170// file-descriptors (reactor_backend_epoll) and one implementation based on
171// OSv-specific file-descriptor-less mechanisms (reactor_backend_osv).
172class reactor_backend {
173public:
174 virtual ~reactor_backend() {};
175 // The methods below are used to communicate with the kernel.
176 // reap_kernel_completions() will complete any previous async
177 // work that is ready to consume.
178 // kernel_submit_work() submit new events that were produced.
179 // Both of those methods are asynchronous and will never block.
180 //
181 // wait_and_process_events on the other hand may block, and is called when
182 // we are about to go to sleep.
183 virtual bool reap_kernel_completions() = 0;
184 virtual bool kernel_submit_work() = 0;
185 virtual bool kernel_events_can_sleep() const = 0;
186 virtual void wait_and_process_events(const sigset_t* active_sigmask = nullptr) = 0;
187
188 // Methods that allow polling on file descriptors. This will only work on
189 // reactor_backend_epoll. Other reactor_backend will probably abort if
190 // they are called (which is fine if no file descriptors are waited on):
191 virtual future<> readable(pollable_fd_state& fd) = 0;
192 virtual future<> writeable(pollable_fd_state& fd) = 0;
193 virtual future<> readable_or_writeable(pollable_fd_state& fd) = 0;
194 virtual void forget(pollable_fd_state& fd) noexcept = 0;
195
196 virtual future<std::tuple<pollable_fd, socket_address>>
197 accept(pollable_fd_state& listenfd) = 0;
198 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) = 0;
199 virtual void shutdown(pollable_fd_state& fd, int how) = 0;
200 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) = 0;
201 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) = 0;
f67539c2 202 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) = 0;
9f95a23c
TL
203 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) = 0;
204 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) = 0;
205
206 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) = 0;
207 virtual void start_tick() = 0;
208 virtual void stop_tick() = 0;
209 virtual void arm_highres_timer(const ::itimerspec& ts) = 0;
210 virtual void reset_preemption_monitor() = 0;
211 virtual void request_preemption() = 0;
212 virtual void start_handling_signal() = 0;
213
214 virtual pollable_fd_state_ptr make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) = 0;
215};
216
217// reactor backend using file-descriptor & epoll, suitable for running on
218// Linux. Can wait on multiple file descriptors, and converts other events
219// (such as timers, signals, inter-thread notifications) into file descriptors
220// using mechanisms like timerfd, signalfd and eventfd respectively.
221class reactor_backend_epoll : public reactor_backend {
20effc67
TL
222 reactor& _r;
223 std::atomic<bool> _highres_timer_pending;
9f95a23c 224 std::thread _task_quota_timer_thread;
20effc67
TL
225 ::itimerspec _steady_clock_timer_deadline = {};
226 // These two timers are used for high resolution timer<>s, one for
227 // the reactor thread (when sleeping) and one for the timer thread
228 // (when awake). We can't use one timer because of races between the
229 // timer thread and reactor thread.
230 //
231 // Only one of the two is active at any time.
232 file_desc _steady_clock_timer_reactor_thread;
233 file_desc _steady_clock_timer_timer_thread;
9f95a23c
TL
234private:
235 file_desc _epollfd;
20effc67 236 void task_quota_timer_thread_fn();
9f95a23c
TL
237 future<> get_epoll_future(pollable_fd_state& fd, int event);
238 void complete_epoll_event(pollable_fd_state& fd, int events, int event);
239 aio_storage_context _storage_context;
20effc67
TL
240 void switch_steady_clock_timers(file_desc& from, file_desc& to);
241 void maybe_switch_steady_clock_timers(int timeout, file_desc& from, file_desc& to);
9f95a23c 242 bool wait_and_process(int timeout, const sigset_t* active_sigmask);
20effc67 243 bool complete_hrtimer();
9f95a23c
TL
244 bool _need_epoll_events = false;
245public:
20effc67 246 explicit reactor_backend_epoll(reactor& r);
9f95a23c
TL
247 virtual ~reactor_backend_epoll() override;
248
249 virtual bool reap_kernel_completions() override;
250 virtual bool kernel_submit_work() override;
251 virtual bool kernel_events_can_sleep() const override;
252 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
253 virtual future<> readable(pollable_fd_state& fd) override;
254 virtual future<> writeable(pollable_fd_state& fd) override;
255 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
256 virtual void forget(pollable_fd_state& fd) noexcept override;
257
258 virtual future<std::tuple<pollable_fd, socket_address>>
259 accept(pollable_fd_state& listenfd) override;
260 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
261 virtual void shutdown(pollable_fd_state& fd, int how) override;
262 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
263 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
f67539c2 264 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
9f95a23c
TL
265 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
266 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
267
268 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
269 virtual void start_tick() override;
270 virtual void stop_tick() override;
271 virtual void arm_highres_timer(const ::itimerspec& ts) override;
272 virtual void reset_preemption_monitor() override;
273 virtual void request_preemption() override;
274 virtual void start_handling_signal() override;
275
276 virtual pollable_fd_state_ptr
277 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
278};
279
280class reactor_backend_aio : public reactor_backend {
20effc67
TL
281 reactor& _r;
282 unsigned max_polls() const;
9f95a23c
TL
283 file_desc _hrtimer_timerfd;
284 aio_storage_context _storage_context;
285 // We use two aio contexts, one for preempting events (the timer tick and
286 // signals), the other for non-preempting events (fd poll).
287 preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer
20effc67 288 aio_general_context _polling_io{max_polls()}; // FIXME: unify with disk aio_context
9f95a23c
TL
289 hrtimer_aio_completion _hrtimer_poll_completion;
290 smp_wakeup_aio_completion _smp_wakeup_aio_completion;
291 static file_desc make_timerfd();
292 bool await_events(int timeout, const sigset_t* active_sigmask);
293public:
20effc67 294 explicit reactor_backend_aio(reactor& r);
9f95a23c
TL
295
296 virtual bool reap_kernel_completions() override;
297 virtual bool kernel_submit_work() override;
298 virtual bool kernel_events_can_sleep() const override;
299 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
300 future<> poll(pollable_fd_state& fd, int events);
301 virtual future<> readable(pollable_fd_state& fd) override;
302 virtual future<> writeable(pollable_fd_state& fd) override;
303 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
304 virtual void forget(pollable_fd_state& fd) noexcept override;
305
306 virtual future<std::tuple<pollable_fd, socket_address>>
307 accept(pollable_fd_state& listenfd) override;
308 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
309 virtual void shutdown(pollable_fd_state& fd, int how) override;
310 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
311 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
f67539c2 312 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
9f95a23c
TL
313 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
314 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
315
316 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
317 virtual void start_tick() override;
318 virtual void stop_tick() override;
319 virtual void arm_highres_timer(const ::itimerspec& its) override;
320 virtual void reset_preemption_monitor() override;
321 virtual void request_preemption() override;
322 virtual void start_handling_signal() override;
323
324 virtual pollable_fd_state_ptr
325 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
326};
327
328#ifdef HAVE_OSV
329// reactor_backend using OSv-specific features, without any file descriptors.
330// This implementation cannot currently wait on file descriptors, but unlike
331// reactor_backend_epoll it doesn't need file descriptors for waiting on a
332// timer, for example, so file descriptors are not necessary.
333class reactor_backend_osv : public reactor_backend {
334private:
335 osv::newpoll::poller _poller;
336 future<> get_poller_future(reactor_notifier_osv *n);
337 promise<> _timer_promise;
338public:
339 reactor_backend_osv();
340 virtual ~reactor_backend_osv() override { }
341
342 virtual bool reap_kernel_completions() override;
343 virtual bool kernel_submit_work() override;
344 virtual bool kernel_events_can_sleep() const override;
345 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
346 virtual future<> readable(pollable_fd_state& fd) override;
347 virtual future<> writeable(pollable_fd_state& fd) override;
348 virtual void forget(pollable_fd_state& fd) noexcept override;
349
350 virtual future<std::tuple<pollable_fd, socket_address>>
351 accept(pollable_fd_state& listenfd) override;
352 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
353 virtual void shutdown(pollable_fd_state& fd, int how) override;
354 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
355 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
f67539c2 356 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
9f95a23c
TL
357 virtual future<size_t> write_some(net::packet& p) override;
358 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
359
360 void enable_timer(steady_clock_type::time_point when);
361 virtual pollable_fd_state_ptr
362 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
363};
364#endif /* HAVE_OSV */
365
366class reactor_backend_selector {
367 std::string _name;
368private:
369 static bool has_enough_aio_nr();
370 explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {}
371public:
20effc67
TL
372 const std::string& name() const { return _name; }
373 std::unique_ptr<reactor_backend> create(reactor& r);
9f95a23c
TL
374 static reactor_backend_selector default_backend();
375 static std::vector<reactor_backend_selector> available();
376 friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) {
377 return os << rbs._name;
378 }
9f95a23c
TL
379};
380
381}