]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/core/reactor_backend.hh
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / reactor_backend.hh
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/core/posix.hh>
26#include <seastar/core/internal/pollable_fd.hh>
27#include <seastar/core/internal/poll.hh>
28#include <seastar/core/linux-aio.hh>
29#include <seastar/core/cacheline.hh>
30#include <sys/time.h>
31#include <signal.h>
32#include <thread>
33#include <stack>
34#include <boost/any.hpp>
35#include <boost/program_options.hpp>
36#include <boost/container/static_vector.hpp>
37
38#ifdef HAVE_OSV
39#include <osv/newpoll.hh>
40#endif
41
42namespace seastar {
43
44class reactor;
45
46// FIXME: merge it with storage context below. At this point the
47// main thing to do is unify the iocb list
48struct aio_general_context {
49 explicit aio_general_context(size_t nr);
50 ~aio_general_context();
51 internal::linux_abi::aio_context_t io_context{};
52 std::unique_ptr<internal::linux_abi::iocb*[]> iocbs;
53 internal::linux_abi::iocb** last = iocbs.get();
54 void queue(internal::linux_abi::iocb* iocb);
55 size_t flush();
56};
57
58class aio_storage_context {
59 static constexpr unsigned max_aio = 1024;
60
61 class iocb_pool {
62 alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
63 std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
64 public:
65 iocb_pool();
66 internal::linux_abi::iocb& get_one();
67 void put_one(internal::linux_abi::iocb* io);
68 unsigned outstanding() const;
69 bool has_capacity() const;
70 };
71
72 reactor* _r;
73 internal::linux_abi::aio_context_t _io_context;
74 boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue;
75 iocb_pool _iocb_pool;
76 size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
f67539c2
TL
77 using pending_aio_retry_t = boost::container::static_vector<internal::linux_abi::iocb*, max_aio>;
78 pending_aio_retry_t _pending_aio_retry;
79 internal::linux_abi::io_event _ev_buffer[max_aio];
9f95a23c
TL
80public:
81 explicit aio_storage_context(reactor* r);
82 ~aio_storage_context();
83
84 bool reap_completions();
f67539c2 85 void schedule_retry();
9f95a23c
TL
86 bool submit_work();
87 bool can_sleep() const;
88};
89
90class completion_with_iocb {
91 bool _in_context = false;
92 internal::linux_abi::iocb _iocb;
93protected:
94 completion_with_iocb(int fd, int events, void* user_data);
95 void completed() {
96 _in_context = false;
97 }
98public:
99 void maybe_queue(aio_general_context& context);
100};
101
102class fd_kernel_completion : public kernel_completion {
103protected:
104 reactor* _r;
105 file_desc& _fd;
106 fd_kernel_completion(reactor* r, file_desc& fd) : _r(r), _fd(fd) {}
107public:
108 file_desc& fd() {
109 return _fd;
110 }
111};
112
113struct hrtimer_aio_completion : public fd_kernel_completion,
114 public completion_with_iocb {
115 hrtimer_aio_completion(reactor* r, file_desc& fd);
116 virtual void complete_with(ssize_t value) override;
117};
118
119struct task_quota_aio_completion : public fd_kernel_completion,
120 public completion_with_iocb {
121 task_quota_aio_completion(reactor* r, file_desc& fd);
122 virtual void complete_with(ssize_t value) override;
123};
124
125struct smp_wakeup_aio_completion : public fd_kernel_completion,
126 public completion_with_iocb {
127 smp_wakeup_aio_completion(reactor* r, file_desc& fd);
128 virtual void complete_with(ssize_t value) override;
129};
130
131// Common aio-based Implementation of the task quota and hrtimer.
132class preempt_io_context {
133 reactor* _r;
134 aio_general_context _context{2};
135
136 task_quota_aio_completion _task_quota_aio_completion;
137 hrtimer_aio_completion _hrtimer_aio_completion;
138public:
139 preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer);
140 bool service_preempting_io();
141
142 size_t flush() {
143 return _context.flush();
144 }
145
146 void reset_preemption_monitor();
147 void request_preemption();
148 void start_tick();
149 void stop_tick();
150};
151
152// The "reactor_backend" interface provides a method of waiting for various
153// basic events on one thread. We have one implementation based on epoll and
154// file-descriptors (reactor_backend_epoll) and one implementation based on
155// OSv-specific file-descriptor-less mechanisms (reactor_backend_osv).
156class reactor_backend {
157public:
158 virtual ~reactor_backend() {};
159 // The methods below are used to communicate with the kernel.
160 // reap_kernel_completions() will complete any previous async
161 // work that is ready to consume.
162 // kernel_submit_work() submit new events that were produced.
163 // Both of those methods are asynchronous and will never block.
164 //
165 // wait_and_process_events on the other hand may block, and is called when
166 // we are about to go to sleep.
167 virtual bool reap_kernel_completions() = 0;
168 virtual bool kernel_submit_work() = 0;
169 virtual bool kernel_events_can_sleep() const = 0;
170 virtual void wait_and_process_events(const sigset_t* active_sigmask = nullptr) = 0;
171
172 // Methods that allow polling on file descriptors. This will only work on
173 // reactor_backend_epoll. Other reactor_backend will probably abort if
174 // they are called (which is fine if no file descriptors are waited on):
175 virtual future<> readable(pollable_fd_state& fd) = 0;
176 virtual future<> writeable(pollable_fd_state& fd) = 0;
177 virtual future<> readable_or_writeable(pollable_fd_state& fd) = 0;
178 virtual void forget(pollable_fd_state& fd) noexcept = 0;
179
180 virtual future<std::tuple<pollable_fd, socket_address>>
181 accept(pollable_fd_state& listenfd) = 0;
182 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) = 0;
183 virtual void shutdown(pollable_fd_state& fd, int how) = 0;
184 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) = 0;
185 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) = 0;
f67539c2 186 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) = 0;
9f95a23c
TL
187 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) = 0;
188 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) = 0;
189
190 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) = 0;
191 virtual void start_tick() = 0;
192 virtual void stop_tick() = 0;
193 virtual void arm_highres_timer(const ::itimerspec& ts) = 0;
194 virtual void reset_preemption_monitor() = 0;
195 virtual void request_preemption() = 0;
196 virtual void start_handling_signal() = 0;
197
198 virtual pollable_fd_state_ptr make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) = 0;
199};
200
201// reactor backend using file-descriptor & epoll, suitable for running on
202// Linux. Can wait on multiple file descriptors, and converts other events
203// (such as timers, signals, inter-thread notifications) into file descriptors
204// using mechanisms like timerfd, signalfd and eventfd respectively.
205class reactor_backend_epoll : public reactor_backend {
206 reactor* _r;
207 std::thread _task_quota_timer_thread;
208 timer_t _steady_clock_timer = {};
209private:
210 file_desc _epollfd;
211 future<> get_epoll_future(pollable_fd_state& fd, int event);
212 void complete_epoll_event(pollable_fd_state& fd, int events, int event);
213 aio_storage_context _storage_context;
214 bool wait_and_process(int timeout, const sigset_t* active_sigmask);
215 bool _need_epoll_events = false;
216public:
217 explicit reactor_backend_epoll(reactor* r);
218 virtual ~reactor_backend_epoll() override;
219
220 virtual bool reap_kernel_completions() override;
221 virtual bool kernel_submit_work() override;
222 virtual bool kernel_events_can_sleep() const override;
223 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
224 virtual future<> readable(pollable_fd_state& fd) override;
225 virtual future<> writeable(pollable_fd_state& fd) override;
226 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
227 virtual void forget(pollable_fd_state& fd) noexcept override;
228
229 virtual future<std::tuple<pollable_fd, socket_address>>
230 accept(pollable_fd_state& listenfd) override;
231 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
232 virtual void shutdown(pollable_fd_state& fd, int how) override;
233 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
234 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
f67539c2 235 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
9f95a23c
TL
236 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
237 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
238
239 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
240 virtual void start_tick() override;
241 virtual void stop_tick() override;
242 virtual void arm_highres_timer(const ::itimerspec& ts) override;
243 virtual void reset_preemption_monitor() override;
244 virtual void request_preemption() override;
245 virtual void start_handling_signal() override;
246
247 virtual pollable_fd_state_ptr
248 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
249};
250
251class reactor_backend_aio : public reactor_backend {
252 static constexpr size_t max_polls = 10000;
253 reactor* _r;
254 file_desc _hrtimer_timerfd;
255 aio_storage_context _storage_context;
256 // We use two aio contexts, one for preempting events (the timer tick and
257 // signals), the other for non-preempting events (fd poll).
258 preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer
259 aio_general_context _polling_io{max_polls}; // FIXME: unify with disk aio_context
260 hrtimer_aio_completion _hrtimer_poll_completion;
261 smp_wakeup_aio_completion _smp_wakeup_aio_completion;
262 static file_desc make_timerfd();
263 bool await_events(int timeout, const sigset_t* active_sigmask);
264public:
265 explicit reactor_backend_aio(reactor* r);
266
267 virtual bool reap_kernel_completions() override;
268 virtual bool kernel_submit_work() override;
269 virtual bool kernel_events_can_sleep() const override;
270 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
271 future<> poll(pollable_fd_state& fd, int events);
272 virtual future<> readable(pollable_fd_state& fd) override;
273 virtual future<> writeable(pollable_fd_state& fd) override;
274 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
275 virtual void forget(pollable_fd_state& fd) noexcept override;
276
277 virtual future<std::tuple<pollable_fd, socket_address>>
278 accept(pollable_fd_state& listenfd) override;
279 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
280 virtual void shutdown(pollable_fd_state& fd, int how) override;
281 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
282 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
f67539c2 283 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
9f95a23c
TL
284 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
285 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
286
287 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
288 virtual void start_tick() override;
289 virtual void stop_tick() override;
290 virtual void arm_highres_timer(const ::itimerspec& its) override;
291 virtual void reset_preemption_monitor() override;
292 virtual void request_preemption() override;
293 virtual void start_handling_signal() override;
294
295 virtual pollable_fd_state_ptr
296 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
297};
298
299#ifdef HAVE_OSV
300// reactor_backend using OSv-specific features, without any file descriptors.
301// This implementation cannot currently wait on file descriptors, but unlike
302// reactor_backend_epoll it doesn't need file descriptors for waiting on a
303// timer, for example, so file descriptors are not necessary.
304class reactor_backend_osv : public reactor_backend {
305private:
306 osv::newpoll::poller _poller;
307 future<> get_poller_future(reactor_notifier_osv *n);
308 promise<> _timer_promise;
309public:
310 reactor_backend_osv();
311 virtual ~reactor_backend_osv() override { }
312
313 virtual bool reap_kernel_completions() override;
314 virtual bool kernel_submit_work() override;
315 virtual bool kernel_events_can_sleep() const override;
316 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
317 virtual future<> readable(pollable_fd_state& fd) override;
318 virtual future<> writeable(pollable_fd_state& fd) override;
319 virtual void forget(pollable_fd_state& fd) noexcept override;
320
321 virtual future<std::tuple<pollable_fd, socket_address>>
322 accept(pollable_fd_state& listenfd) override;
323 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
324 virtual void shutdown(pollable_fd_state& fd, int how) override;
325 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
326 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
f67539c2 327 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
9f95a23c
TL
328 virtual future<size_t> write_some(net::packet& p) override;
329 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
330
331 void enable_timer(steady_clock_type::time_point when);
332 virtual pollable_fd_state_ptr
333 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
334};
335#endif /* HAVE_OSV */
336
337class reactor_backend_selector {
338 std::string _name;
339private:
340 static bool has_enough_aio_nr();
341 explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {}
342public:
343 std::unique_ptr<reactor_backend> create(reactor* r);
344 static reactor_backend_selector default_backend();
345 static std::vector<reactor_backend_selector> available();
346 friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) {
347 return os << rbs._name;
348 }
349 friend void validate(boost::any& v, const std::vector<std::string> values, reactor_backend_selector* rbs, int) {
350 namespace bpo = boost::program_options;
351 bpo::validators::check_first_occurrence(v);
352 auto s = bpo::validators::get_single_string(values);
353 for (auto&& x : available()) {
354 if (s == x._name) {
355 v = std::move(x);
356 return;
357 }
358 }
359 throw bpo::validation_error(bpo::validation_error::invalid_option_value);
360 }
361};
362
363}