]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/reactor_backend.hh
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / reactor_backend.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21
22 #pragma once
23
24 #include <seastar/core/future.hh>
25 #include <seastar/core/posix.hh>
26 #include <seastar/core/internal/pollable_fd.hh>
27 #include <seastar/core/internal/poll.hh>
28 #include <seastar/core/linux-aio.hh>
29 #include <seastar/core/cacheline.hh>
30 #include <sys/time.h>
31 #include <signal.h>
32 #include <thread>
33 #include <stack>
34 #include <boost/any.hpp>
35 #include <boost/program_options.hpp>
36 #include <boost/container/static_vector.hpp>
37
38 #ifdef HAVE_OSV
39 #include <osv/newpoll.hh>
40 #endif
41
42 namespace seastar {
43
44 class reactor;
45
46 // FIXME: merge it with storage context below. At this point the
47 // main thing to do is unify the iocb list
48 struct aio_general_context {
49 explicit aio_general_context(size_t nr);
50 ~aio_general_context();
51 internal::linux_abi::aio_context_t io_context{};
52 std::unique_ptr<internal::linux_abi::iocb*[]> iocbs;
53 internal::linux_abi::iocb** last = iocbs.get();
54 void queue(internal::linux_abi::iocb* iocb);
55 size_t flush();
56 };
57
58 class aio_storage_context {
59 static constexpr unsigned max_aio = 1024;
60
61 class iocb_pool {
62 alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
63 std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
64 public:
65 iocb_pool();
66 internal::linux_abi::iocb& get_one();
67 void put_one(internal::linux_abi::iocb* io);
68 unsigned outstanding() const;
69 bool has_capacity() const;
70 };
71
72 reactor* _r;
73 internal::linux_abi::aio_context_t _io_context;
74 boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue;
75 iocb_pool _iocb_pool;
76 size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
77 using pending_aio_retry_t = boost::container::static_vector<internal::linux_abi::iocb*, max_aio>;
78 pending_aio_retry_t _pending_aio_retry;
79 internal::linux_abi::io_event _ev_buffer[max_aio];
80 public:
81 explicit aio_storage_context(reactor* r);
82 ~aio_storage_context();
83
84 bool reap_completions();
85 void schedule_retry();
86 bool submit_work();
87 bool can_sleep() const;
88 };
89
90 class completion_with_iocb {
91 bool _in_context = false;
92 internal::linux_abi::iocb _iocb;
93 protected:
94 completion_with_iocb(int fd, int events, void* user_data);
95 void completed() {
96 _in_context = false;
97 }
98 public:
99 void maybe_queue(aio_general_context& context);
100 };
101
102 class fd_kernel_completion : public kernel_completion {
103 protected:
104 reactor* _r;
105 file_desc& _fd;
106 fd_kernel_completion(reactor* r, file_desc& fd) : _r(r), _fd(fd) {}
107 public:
108 file_desc& fd() {
109 return _fd;
110 }
111 };
112
113 struct hrtimer_aio_completion : public fd_kernel_completion,
114 public completion_with_iocb {
115 hrtimer_aio_completion(reactor* r, file_desc& fd);
116 virtual void complete_with(ssize_t value) override;
117 };
118
119 struct task_quota_aio_completion : public fd_kernel_completion,
120 public completion_with_iocb {
121 task_quota_aio_completion(reactor* r, file_desc& fd);
122 virtual void complete_with(ssize_t value) override;
123 };
124
125 struct smp_wakeup_aio_completion : public fd_kernel_completion,
126 public completion_with_iocb {
127 smp_wakeup_aio_completion(reactor* r, file_desc& fd);
128 virtual void complete_with(ssize_t value) override;
129 };
130
131 // Common aio-based Implementation of the task quota and hrtimer.
132 class preempt_io_context {
133 reactor* _r;
134 aio_general_context _context{2};
135
136 task_quota_aio_completion _task_quota_aio_completion;
137 hrtimer_aio_completion _hrtimer_aio_completion;
138 public:
139 preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer);
140 bool service_preempting_io();
141
142 size_t flush() {
143 return _context.flush();
144 }
145
146 void reset_preemption_monitor();
147 void request_preemption();
148 void start_tick();
149 void stop_tick();
150 };
151
152 // The "reactor_backend" interface provides a method of waiting for various
153 // basic events on one thread. We have one implementation based on epoll and
154 // file-descriptors (reactor_backend_epoll) and one implementation based on
155 // OSv-specific file-descriptor-less mechanisms (reactor_backend_osv).
156 class reactor_backend {
157 public:
158 virtual ~reactor_backend() {};
159 // The methods below are used to communicate with the kernel.
160 // reap_kernel_completions() will complete any previous async
161 // work that is ready to consume.
162 // kernel_submit_work() submit new events that were produced.
163 // Both of those methods are asynchronous and will never block.
164 //
165 // wait_and_process_events on the other hand may block, and is called when
166 // we are about to go to sleep.
167 virtual bool reap_kernel_completions() = 0;
168 virtual bool kernel_submit_work() = 0;
169 virtual bool kernel_events_can_sleep() const = 0;
170 virtual void wait_and_process_events(const sigset_t* active_sigmask = nullptr) = 0;
171
172 // Methods that allow polling on file descriptors. This will only work on
173 // reactor_backend_epoll. Other reactor_backend will probably abort if
174 // they are called (which is fine if no file descriptors are waited on):
175 virtual future<> readable(pollable_fd_state& fd) = 0;
176 virtual future<> writeable(pollable_fd_state& fd) = 0;
177 virtual future<> readable_or_writeable(pollable_fd_state& fd) = 0;
178 virtual void forget(pollable_fd_state& fd) noexcept = 0;
179
180 virtual future<std::tuple<pollable_fd, socket_address>>
181 accept(pollable_fd_state& listenfd) = 0;
182 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) = 0;
183 virtual void shutdown(pollable_fd_state& fd, int how) = 0;
184 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) = 0;
185 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) = 0;
186 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) = 0;
187 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) = 0;
188 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) = 0;
189
190 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) = 0;
191 virtual void start_tick() = 0;
192 virtual void stop_tick() = 0;
193 virtual void arm_highres_timer(const ::itimerspec& ts) = 0;
194 virtual void reset_preemption_monitor() = 0;
195 virtual void request_preemption() = 0;
196 virtual void start_handling_signal() = 0;
197
198 virtual pollable_fd_state_ptr make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) = 0;
199 };
200
201 // reactor backend using file-descriptor & epoll, suitable for running on
202 // Linux. Can wait on multiple file descriptors, and converts other events
203 // (such as timers, signals, inter-thread notifications) into file descriptors
204 // using mechanisms like timerfd, signalfd and eventfd respectively.
205 class reactor_backend_epoll : public reactor_backend {
206 reactor* _r;
207 std::thread _task_quota_timer_thread;
208 timer_t _steady_clock_timer = {};
209 private:
210 file_desc _epollfd;
211 future<> get_epoll_future(pollable_fd_state& fd, int event);
212 void complete_epoll_event(pollable_fd_state& fd, int events, int event);
213 aio_storage_context _storage_context;
214 bool wait_and_process(int timeout, const sigset_t* active_sigmask);
215 bool _need_epoll_events = false;
216 public:
217 explicit reactor_backend_epoll(reactor* r);
218 virtual ~reactor_backend_epoll() override;
219
220 virtual bool reap_kernel_completions() override;
221 virtual bool kernel_submit_work() override;
222 virtual bool kernel_events_can_sleep() const override;
223 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
224 virtual future<> readable(pollable_fd_state& fd) override;
225 virtual future<> writeable(pollable_fd_state& fd) override;
226 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
227 virtual void forget(pollable_fd_state& fd) noexcept override;
228
229 virtual future<std::tuple<pollable_fd, socket_address>>
230 accept(pollable_fd_state& listenfd) override;
231 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
232 virtual void shutdown(pollable_fd_state& fd, int how) override;
233 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
234 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
235 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
236 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
237 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
238
239 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
240 virtual void start_tick() override;
241 virtual void stop_tick() override;
242 virtual void arm_highres_timer(const ::itimerspec& ts) override;
243 virtual void reset_preemption_monitor() override;
244 virtual void request_preemption() override;
245 virtual void start_handling_signal() override;
246
247 virtual pollable_fd_state_ptr
248 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
249 };
250
251 class reactor_backend_aio : public reactor_backend {
252 static constexpr size_t max_polls = 10000;
253 reactor* _r;
254 file_desc _hrtimer_timerfd;
255 aio_storage_context _storage_context;
256 // We use two aio contexts, one for preempting events (the timer tick and
257 // signals), the other for non-preempting events (fd poll).
258 preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer
259 aio_general_context _polling_io{max_polls}; // FIXME: unify with disk aio_context
260 hrtimer_aio_completion _hrtimer_poll_completion;
261 smp_wakeup_aio_completion _smp_wakeup_aio_completion;
262 static file_desc make_timerfd();
263 bool await_events(int timeout, const sigset_t* active_sigmask);
264 public:
265 explicit reactor_backend_aio(reactor* r);
266
267 virtual bool reap_kernel_completions() override;
268 virtual bool kernel_submit_work() override;
269 virtual bool kernel_events_can_sleep() const override;
270 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
271 future<> poll(pollable_fd_state& fd, int events);
272 virtual future<> readable(pollable_fd_state& fd) override;
273 virtual future<> writeable(pollable_fd_state& fd) override;
274 virtual future<> readable_or_writeable(pollable_fd_state& fd) override;
275 virtual void forget(pollable_fd_state& fd) noexcept override;
276
277 virtual future<std::tuple<pollable_fd, socket_address>>
278 accept(pollable_fd_state& listenfd) override;
279 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
280 virtual void shutdown(pollable_fd_state& fd, int how) override;
281 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
282 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
283 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
284 virtual future<size_t> write_some(pollable_fd_state& fd, net::packet& p) override;
285 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
286
287 virtual void signal_received(int signo, siginfo_t* siginfo, void* ignore) override;
288 virtual void start_tick() override;
289 virtual void stop_tick() override;
290 virtual void arm_highres_timer(const ::itimerspec& its) override;
291 virtual void reset_preemption_monitor() override;
292 virtual void request_preemption() override;
293 virtual void start_handling_signal() override;
294
295 virtual pollable_fd_state_ptr
296 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
297 };
298
299 #ifdef HAVE_OSV
300 // reactor_backend using OSv-specific features, without any file descriptors.
301 // This implementation cannot currently wait on file descriptors, but unlike
302 // reactor_backend_epoll it doesn't need file descriptors for waiting on a
303 // timer, for example, so file descriptors are not necessary.
304 class reactor_backend_osv : public reactor_backend {
305 private:
306 osv::newpoll::poller _poller;
307 future<> get_poller_future(reactor_notifier_osv *n);
308 promise<> _timer_promise;
309 public:
310 reactor_backend_osv();
311 virtual ~reactor_backend_osv() override { }
312
313 virtual bool reap_kernel_completions() override;
314 virtual bool kernel_submit_work() override;
315 virtual bool kernel_events_can_sleep() const override;
316 virtual void wait_and_process_events(const sigset_t* active_sigmask) override;
317 virtual future<> readable(pollable_fd_state& fd) override;
318 virtual future<> writeable(pollable_fd_state& fd) override;
319 virtual void forget(pollable_fd_state& fd) noexcept override;
320
321 virtual future<std::tuple<pollable_fd, socket_address>>
322 accept(pollable_fd_state& listenfd) override;
323 virtual future<> connect(pollable_fd_state& fd, socket_address& sa) override;
324 virtual void shutdown(pollable_fd_state& fd, int how) override;
325 virtual future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t len) override;
326 virtual future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) override;
327 virtual future<temporary_buffer<char>> read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) override;
328 virtual future<size_t> write_some(net::packet& p) override;
329 virtual future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t len) override;
330
331 void enable_timer(steady_clock_type::time_point when);
332 virtual pollable_fd_state_ptr
333 make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) override;
334 };
335 #endif /* HAVE_OSV */
336
337 class reactor_backend_selector {
338 std::string _name;
339 private:
340 static bool has_enough_aio_nr();
341 explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {}
342 public:
343 std::unique_ptr<reactor_backend> create(reactor* r);
344 static reactor_backend_selector default_backend();
345 static std::vector<reactor_backend_selector> available();
346 friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) {
347 return os << rbs._name;
348 }
349 friend void validate(boost::any& v, const std::vector<std::string> values, reactor_backend_selector* rbs, int) {
350 namespace bpo = boost::program_options;
351 bpo::validators::check_first_occurrence(v);
352 auto s = bpo::validators::get_single_string(values);
353 for (auto&& x : available()) {
354 if (s == x._name) {
355 v = std::move(x);
356 return;
357 }
358 }
359 throw bpo::validation_error(bpo::validation_error::invalid_option_value);
360 }
361 };
362
363 }