]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/reactor_backend.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / core / reactor_backend.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21 #include "core/reactor_backend.hh"
22 #include "core/thread_pool.hh"
23 #include "core/syscall_result.hh"
24 #include <seastar/core/print.hh>
25 #include <seastar/core/reactor.hh>
26 #include <seastar/core/internal/buffer_allocator.hh>
27 #include <seastar/util/defer.hh>
28 #include <seastar/util/read_first_line.hh>
29 #include <chrono>
30 #include <sys/poll.h>
31 #include <sys/syscall.h>
32
33 #ifdef HAVE_OSV
34 #include <osv/newpoll.hh>
35 #endif
36
37 namespace seastar {
38
39 using namespace std::chrono_literals;
40 using namespace internal;
41 using namespace internal::linux_abi;
42 namespace fs = std::filesystem;
43
44 class pollable_fd_state_completion : public kernel_completion {
45 promise<> _pr;
46 public:
47 virtual void complete_with(ssize_t res) override {
48 _pr.set_value();
49 }
50 future<> get_future() {
51 return _pr.get_future();
52 }
53 };
54
55 void prepare_iocb(io_request& req, io_completion* desc, iocb& iocb) {
56 switch (req.opcode()) {
57 case io_request::operation::fdatasync:
58 iocb = make_fdsync_iocb(req.fd());
59 break;
60 case io_request::operation::write:
61 iocb = make_write_iocb(req.fd(), req.pos(), req.address(), req.size());
62 set_nowait(iocb, req.nowait_works());
63 break;
64 case io_request::operation::writev:
65 iocb = make_writev_iocb(req.fd(), req.pos(), req.iov(), req.size());
66 set_nowait(iocb, req.nowait_works());
67 break;
68 case io_request::operation::read:
69 iocb = make_read_iocb(req.fd(), req.pos(), req.address(), req.size());
70 set_nowait(iocb, req.nowait_works());
71 break;
72 case io_request::operation::readv:
73 iocb = make_readv_iocb(req.fd(), req.pos(), req.iov(), req.size());
74 set_nowait(iocb, req.nowait_works());
75 break;
76 default:
77 seastar_logger.error("Invalid operation for iocb: {}", req.opname());
78 std::abort();
79 }
80 set_user_data(iocb, desc);
81 }
82
83 aio_storage_context::iocb_pool::iocb_pool() {
84 for (unsigned i = 0; i != max_aio; ++i) {
85 _free_iocbs.push(&_iocb_pool[i]);
86 }
87 }
88
89 aio_storage_context::aio_storage_context(reactor& r)
90 : _r(r)
91 , _io_context(0) {
92 static_assert(max_aio >= reactor::max_queues * reactor::max_queues,
93 "Mismatch between maximum allowed io and what the IO queues can produce");
94 internal::setup_aio_context(max_aio, &_io_context);
95 _r.at_exit([this] { return stop(); });
96 }
97
98 aio_storage_context::~aio_storage_context() {
99 internal::io_destroy(_io_context);
100 }
101
102 future<> aio_storage_context::stop() noexcept {
103 return std::exchange(_pending_aio_retry_fut, make_ready_future<>()).finally([this] {
104 return do_until([this] { return !_iocb_pool.outstanding(); }, [this] {
105 reap_completions(false);
106 return make_ready_future<>();
107 });
108 });
109 }
110
111 inline
112 internal::linux_abi::iocb&
113 aio_storage_context::iocb_pool::get_one() {
114 auto io = _free_iocbs.top();
115 _free_iocbs.pop();
116 return *io;
117 }
118
119 inline
120 void
121 aio_storage_context::iocb_pool::put_one(internal::linux_abi::iocb* io) {
122 _free_iocbs.push(io);
123 }
124
125 inline
126 unsigned
127 aio_storage_context::iocb_pool::outstanding() const {
128 return max_aio - _free_iocbs.size();
129 }
130
131 inline
132 bool
133 aio_storage_context::iocb_pool::has_capacity() const {
134 return !_free_iocbs.empty();
135 }
136
137 // Returns: number of iocbs consumed (0 or 1)
138 size_t
139 aio_storage_context::handle_aio_error(linux_abi::iocb* iocb, int ec) {
140 switch (ec) {
141 case EAGAIN:
142 return 0;
143 case EBADF: {
144 auto desc = reinterpret_cast<kernel_completion*>(get_user_data(*iocb));
145 _iocb_pool.put_one(iocb);
146 desc->complete_with(-EBADF);
147 // if EBADF, it means that the first request has a bad fd, so
148 // we will only remove it from _pending_io and try again.
149 return 1;
150 }
151 default:
152 ++_r._io_stats.aio_errors;
153 throw_system_error_on(true, "io_submit");
154 abort();
155 }
156 }
157
158 extern bool aio_nowait_supported;
159
160 bool
161 aio_storage_context::submit_work() {
162 bool did_work = false;
163
164 _submission_queue.resize(0);
165 size_t to_submit = _r._io_sink.drain([this] (internal::io_request& req, io_completion* desc) -> bool {
166 if (!_iocb_pool.has_capacity()) {
167 return false;
168 }
169
170 auto& io = _iocb_pool.get_one();
171 prepare_iocb(req, desc, io);
172
173 if (_r._aio_eventfd) {
174 set_eventfd_notification(io, _r._aio_eventfd->get_fd());
175 }
176 _submission_queue.push_back(&io);
177 return true;
178 });
179
180 if (__builtin_expect(_r._kernel_page_cache, false)) {
181 // linux-aio is not asynchrous when the page cache is used,
182 // so we don't want to call io_submit() from the reactor thread.
183 //
184 // Pretend that all aio failed with EAGAIN and submit them
185 // via schedule_retry(), below.
186 did_work = !_submission_queue.empty();
187 for (auto& iocbp : _submission_queue) {
188 set_nowait(*iocbp, false);
189 _pending_aio_retry.push_back(iocbp);
190 }
191 to_submit = 0;
192 }
193
194 size_t submitted = 0;
195 while (to_submit > submitted) {
196 auto nr = to_submit - submitted;
197 auto iocbs = _submission_queue.data() + submitted;
198 auto r = io_submit(_io_context, nr, iocbs);
199 size_t nr_consumed;
200 if (r == -1) {
201 nr_consumed = handle_aio_error(iocbs[0], errno);
202 } else {
203 nr_consumed = size_t(r);
204 }
205 did_work = true;
206 submitted += nr_consumed;
207 }
208
209 if (need_to_retry() && !retry_in_progress()) {
210 schedule_retry();
211 }
212
213 return did_work;
214 }
215
216 void aio_storage_context::schedule_retry() {
217 // loop until both _pending_aio_retry and _aio_retries are empty.
218 // While retrying _aio_retries, new retries may be queued onto _pending_aio_retry.
219 _pending_aio_retry_fut = do_until([this] {
220 if (_aio_retries.empty()) {
221 if (_pending_aio_retry.empty()) {
222 return true;
223 }
224 // _pending_aio_retry, holding a batch of new iocbs to retry,
225 // is swapped with the empty _aio_retries.
226 std::swap(_aio_retries, _pending_aio_retry);
227 }
228 return false;
229 }, [this] {
230 return _r._thread_pool->submit<syscall_result<int>>([this] () mutable {
231 auto r = io_submit(_io_context, _aio_retries.size(), _aio_retries.data());
232 return wrap_syscall<int>(r);
233 }).then_wrapped([this] (future<syscall_result<int>> f) {
234 // If submit failed, just log the error and exit the loop.
235 // The next call to submit_work will call schedule_retry again.
236 if (f.failed()) {
237 auto ex = f.get_exception();
238 seastar_logger.warn("aio_storage_context::schedule_retry failed: {}", std::move(ex));
239 return;
240 }
241 auto result = f.get0();
242 auto iocbs = _aio_retries.data();
243 size_t nr_consumed = 0;
244 if (result.result == -1) {
245 try {
246 nr_consumed = handle_aio_error(iocbs[0], result.error);
247 } catch (...) {
248 seastar_logger.error("aio retry failed: {}. Aborting.", std::current_exception());
249 abort();
250 }
251 } else {
252 nr_consumed = result.result;
253 }
254 _aio_retries.erase(_aio_retries.begin(), _aio_retries.begin() + nr_consumed);
255 });
256 });
257 }
258
259 bool aio_storage_context::reap_completions(bool allow_retry)
260 {
261 struct timespec timeout = {0, 0};
262 auto n = io_getevents(_io_context, 1, max_aio, _ev_buffer, &timeout, _r._force_io_getevents_syscall);
263 if (n == -1 && errno == EINTR) {
264 n = 0;
265 }
266 assert(n >= 0);
267 for (size_t i = 0; i < size_t(n); ++i) {
268 auto iocb = get_iocb(_ev_buffer[i]);
269 if (_ev_buffer[i].res == -EAGAIN && allow_retry) {
270 set_nowait(*iocb, false);
271 _pending_aio_retry.push_back(iocb);
272 continue;
273 }
274 _iocb_pool.put_one(iocb);
275 auto desc = reinterpret_cast<kernel_completion*>(_ev_buffer[i].data);
276 desc->complete_with(_ev_buffer[i].res);
277 }
278 return n;
279 }
280
281 bool aio_storage_context::can_sleep() const {
282 // Because aio depends on polling, it cannot generate events to wake us up, Therefore, sleep
283 // is only possible if there are no in-flight aios. If there are, we need to keep polling.
284 //
285 // Alternatively, if we enabled _aio_eventfd, we can always enter
286 unsigned executing = _iocb_pool.outstanding();
287 return executing == 0 || _r._aio_eventfd;
288 }
289
290 aio_general_context::aio_general_context(size_t nr)
291 : iocbs(new iocb*[nr])
292 , last(iocbs.get())
293 , end(iocbs.get() + nr)
294 {
295 setup_aio_context(nr, &io_context);
296 }
297
298 aio_general_context::~aio_general_context() {
299 io_destroy(io_context);
300 }
301
302 void aio_general_context::queue(linux_abi::iocb* iocb) {
303 assert(last < end);
304 *last++ = iocb;
305 }
306
307 size_t aio_general_context::flush() {
308 auto begin = iocbs.get();
309 auto retried = last;
310 while (begin != last) {
311 auto r = io_submit(io_context, last - begin, begin);
312 if (__builtin_expect(r > 0, true)) {
313 begin += r;
314 continue;
315 }
316 // errno == EAGAIN is expected here. We don't explicitly assert that
317 // since the assert below requires that some progress will be
318 // made, preventing an endless loop for any reason.
319 if (need_preempt()) {
320 assert(retried != begin);
321 retried = begin;
322 }
323 }
324 auto nr = last - iocbs.get();
325 last = iocbs.get();
326 return nr;
327 }
328
329 completion_with_iocb::completion_with_iocb(int fd, int events, void* user_data)
330 : _iocb(make_poll_iocb(fd, events)) {
331 set_user_data(_iocb, user_data);
332 }
333
334 void completion_with_iocb::maybe_queue(aio_general_context& context) {
335 if (!_in_context) {
336 _in_context = true;
337 context.queue(&_iocb);
338 }
339 }
340
341 hrtimer_aio_completion::hrtimer_aio_completion(reactor& r, file_desc& fd)
342 : fd_kernel_completion(fd)
343 , completion_with_iocb(fd.get(), POLLIN, this)
344 , _r(r) {}
345
346 task_quota_aio_completion::task_quota_aio_completion(file_desc& fd)
347 : fd_kernel_completion(fd)
348 , completion_with_iocb(fd.get(), POLLIN, this) {}
349
350 smp_wakeup_aio_completion::smp_wakeup_aio_completion(file_desc& fd)
351 : fd_kernel_completion(fd)
352 , completion_with_iocb(fd.get(), POLLIN, this) {}
353
354 void
355 hrtimer_aio_completion::complete_with(ssize_t ret) {
356 uint64_t expirations = 0;
357 (void)_fd.read(&expirations, 8);
358 if (expirations) {
359 _r.service_highres_timer();
360 }
361 completion_with_iocb::completed();
362 }
363
364 void
365 task_quota_aio_completion::complete_with(ssize_t ret) {
366 uint64_t v;
367 (void)_fd.read(&v, 8);
368 completion_with_iocb::completed();
369 }
370
371 void
372 smp_wakeup_aio_completion::complete_with(ssize_t ret) {
373 uint64_t ignore = 0;
374 (void)_fd.read(&ignore, 8);
375 completion_with_iocb::completed();
376 }
377
378 preempt_io_context::preempt_io_context(reactor& r, file_desc& task_quota, file_desc& hrtimer)
379 : _r(r)
380 , _task_quota_aio_completion(task_quota)
381 , _hrtimer_aio_completion(r, hrtimer)
382 {}
383
384 void preempt_io_context::start_tick() {
385 // Preempt whenever an event (timer tick or signal) is available on the
386 // _preempting_io ring
387 set_need_preempt_var(reinterpret_cast<const preemption_monitor*>(_context.io_context + 8));
388 // preempt_io_context::request_preemption() will write to reactor::_preemption_monitor, which is now ignored
389 }
390
391 void preempt_io_context::stop_tick() {
392 set_need_preempt_var(&_r._preemption_monitor);
393 }
394
395 void preempt_io_context::request_preemption() {
396 ::itimerspec expired = {};
397 expired.it_value.tv_nsec = 1;
398 // will trigger immediately, triggering the preemption monitor
399 _hrtimer_aio_completion.fd().timerfd_settime(TFD_TIMER_ABSTIME, expired);
400
401 // This might have been called from poll_once. If that is the case, we cannot assume that timerfd is being
402 // monitored.
403 _hrtimer_aio_completion.maybe_queue(_context);
404 _context.flush();
405
406 // The kernel is not obliged to deliver the completion immediately, so wait for it
407 while (!need_preempt()) {
408 std::atomic_signal_fence(std::memory_order_seq_cst);
409 }
410 }
411
412 void preempt_io_context::reset_preemption_monitor() {
413 service_preempting_io();
414 _hrtimer_aio_completion.maybe_queue(_context);
415 _task_quota_aio_completion.maybe_queue(_context);
416 flush();
417 }
418
419 bool preempt_io_context::service_preempting_io() {
420 linux_abi::io_event a[2];
421 auto r = io_getevents(_context.io_context, 0, 2, a, 0);
422 assert(r != -1);
423 bool did_work = r > 0;
424 for (unsigned i = 0; i != unsigned(r); ++i) {
425 auto desc = reinterpret_cast<kernel_completion*>(a[i].data);
426 desc->complete_with(a[i].res);
427 }
428 return did_work;
429 }
430
431 file_desc reactor_backend_aio::make_timerfd() {
432 return file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC|TFD_NONBLOCK);
433 }
434
435 unsigned
436 reactor_backend_aio::max_polls() const {
437 return _r._cfg.max_networking_aio_io_control_blocks;
438 }
439
440 bool reactor_backend_aio::await_events(int timeout, const sigset_t* active_sigmask) {
441 ::timespec ts = {};
442 ::timespec* tsp = [&] () -> ::timespec* {
443 if (timeout == 0) {
444 return &ts;
445 } else if (timeout == -1) {
446 return nullptr;
447 } else {
448 ts = posix::to_timespec(timeout * 1ms);
449 return &ts;
450 }
451 }();
452 constexpr size_t batch_size = 128;
453 io_event batch[batch_size];
454 bool did_work = false;
455 int r;
456 do {
457 r = io_pgetevents(_polling_io.io_context, 1, batch_size, batch, tsp, active_sigmask);
458 if (r == -1 && errno == EINTR) {
459 return true;
460 }
461 assert(r != -1);
462 for (unsigned i = 0; i != unsigned(r); ++i) {
463 did_work = true;
464 auto& event = batch[i];
465 auto* desc = reinterpret_cast<kernel_completion*>(uintptr_t(event.data));
466 desc->complete_with(event.res);
467 }
468 // For the next iteration, don't use a timeout, since we may have waited already
469 ts = {};
470 tsp = &ts;
471 } while (r == batch_size);
472 return did_work;
473 }
474
475 void reactor_backend_aio::signal_received(int signo, siginfo_t* siginfo, void* ignore) {
476 engine()._signals.action(signo, siginfo, ignore);
477 }
478
479 reactor_backend_aio::reactor_backend_aio(reactor& r)
480 : _r(r)
481 , _hrtimer_timerfd(make_timerfd())
482 , _storage_context(_r)
483 , _preempting_io(_r, _r._task_quota_timer, _hrtimer_timerfd)
484 , _hrtimer_poll_completion(_r, _hrtimer_timerfd)
485 , _smp_wakeup_aio_completion(_r._notify_eventfd)
486 {
487 // Protect against spurious wakeups - if we get notified that the timer has
488 // expired when it really hasn't, we don't want to block in read(tfd, ...).
489 auto tfd = _r._task_quota_timer.get();
490 ::fcntl(tfd, F_SETFL, ::fcntl(tfd, F_GETFL) | O_NONBLOCK);
491
492 sigset_t mask = make_sigset_mask(hrtimer_signal());
493 auto e = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
494 assert(e == 0);
495 }
496
497 bool reactor_backend_aio::reap_kernel_completions() {
498 bool did_work = await_events(0, nullptr);
499 did_work |= _storage_context.reap_completions();
500 return did_work;
501 }
502
503 bool reactor_backend_aio::kernel_submit_work() {
504 _hrtimer_poll_completion.maybe_queue(_polling_io);
505 bool did_work = _polling_io.flush();
506 did_work |= _storage_context.submit_work();
507 return did_work;
508 }
509
510 bool reactor_backend_aio::kernel_events_can_sleep() const {
511 return _storage_context.can_sleep();
512 }
513
514 void reactor_backend_aio::wait_and_process_events(const sigset_t* active_sigmask) {
515 int timeout = -1;
516 bool did_work = _preempting_io.service_preempting_io();
517 if (did_work) {
518 timeout = 0;
519 }
520
521 _hrtimer_poll_completion.maybe_queue(_polling_io);
522 _smp_wakeup_aio_completion.maybe_queue(_polling_io);
523 _polling_io.flush();
524 await_events(timeout, active_sigmask);
525 _preempting_io.service_preempting_io(); // clear task quota timer
526 }
527
528 class aio_pollable_fd_state : public pollable_fd_state {
529 internal::linux_abi::iocb _iocb_pollin;
530 pollable_fd_state_completion _completion_pollin;
531
532 internal::linux_abi::iocb _iocb_pollout;
533 pollable_fd_state_completion _completion_pollout;
534 public:
535 pollable_fd_state_completion* get_desc(int events) {
536 if (events & POLLIN) {
537 return &_completion_pollin;
538 }
539 return &_completion_pollout;
540 }
541 internal::linux_abi::iocb* get_iocb(int events) {
542 if (events & POLLIN) {
543 return &_iocb_pollin;
544 }
545 return &_iocb_pollout;
546 }
547 explicit aio_pollable_fd_state(file_desc fd, speculation speculate)
548 : pollable_fd_state(std::move(fd), std::move(speculate))
549 {}
550 future<> get_completion_future(int events) {
551 return get_desc(events)->get_future();
552 }
553 };
554
555 future<> reactor_backend_aio::poll(pollable_fd_state& fd, int events) {
556 try {
557 if (events & fd.events_known) {
558 fd.events_known &= ~events;
559 return make_ready_future<>();
560 }
561
562 fd.events_rw = events == (POLLIN|POLLOUT);
563
564 auto* pfd = static_cast<aio_pollable_fd_state*>(&fd);
565 auto* iocb = pfd->get_iocb(events);
566 auto* desc = pfd->get_desc(events);
567 *iocb = make_poll_iocb(fd.fd.get(), events);
568 *desc = pollable_fd_state_completion{};
569 set_user_data(*iocb, desc);
570 _polling_io.queue(iocb);
571 return pfd->get_completion_future(events);
572 } catch (...) {
573 return make_exception_future<>(std::current_exception());
574 }
575 }
576
577 future<> reactor_backend_aio::readable(pollable_fd_state& fd) {
578 return poll(fd, POLLIN);
579 }
580
581 future<> reactor_backend_aio::writeable(pollable_fd_state& fd) {
582 return poll(fd, POLLOUT);
583 }
584
585 future<> reactor_backend_aio::readable_or_writeable(pollable_fd_state& fd) {
586 return poll(fd, POLLIN|POLLOUT);
587 }
588
589 void reactor_backend_aio::forget(pollable_fd_state& fd) noexcept {
590 auto* pfd = static_cast<aio_pollable_fd_state*>(&fd);
591 delete pfd;
592 // ?
593 }
594
595 future<std::tuple<pollable_fd, socket_address>>
596 reactor_backend_aio::accept(pollable_fd_state& listenfd) {
597 return engine().do_accept(listenfd);
598 }
599
600 future<> reactor_backend_aio::connect(pollable_fd_state& fd, socket_address& sa) {
601 return engine().do_connect(fd, sa);
602 }
603
604 void reactor_backend_aio::shutdown(pollable_fd_state& fd, int how) {
605 fd.fd.shutdown(how);
606 }
607
608 future<size_t>
609 reactor_backend_aio::read_some(pollable_fd_state& fd, void* buffer, size_t len) {
610 return engine().do_read_some(fd, buffer, len);
611 }
612
613 future<size_t>
614 reactor_backend_aio::read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) {
615 return engine().do_read_some(fd, iov);
616 }
617
618 future<temporary_buffer<char>>
619 reactor_backend_aio::read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) {
620 return engine().do_read_some(fd, ba);
621 }
622
623 future<size_t>
624 reactor_backend_aio::write_some(pollable_fd_state& fd, const void* buffer, size_t len) {
625 return engine().do_write_some(fd, buffer, len);
626 }
627
628 future<size_t>
629 reactor_backend_aio::write_some(pollable_fd_state& fd, net::packet& p) {
630 return engine().do_write_some(fd, p);
631 }
632
633 void reactor_backend_aio::start_tick() {
634 _preempting_io.start_tick();
635 }
636
637 void reactor_backend_aio::stop_tick() {
638 _preempting_io.stop_tick();
639 }
640
641 void reactor_backend_aio::arm_highres_timer(const ::itimerspec& its) {
642 _hrtimer_timerfd.timerfd_settime(TFD_TIMER_ABSTIME, its);
643 }
644
645 void reactor_backend_aio::reset_preemption_monitor() {
646 _preempting_io.reset_preemption_monitor();
647 }
648
649 void reactor_backend_aio::request_preemption() {
650 _preempting_io.request_preemption();
651 }
652
653 void reactor_backend_aio::start_handling_signal() {
654 // The aio backend only uses SIGHUP/SIGTERM/SIGINT. We don't need to handle them right away and our
655 // implementation of request_preemption is not signal safe, so do nothing.
656 }
657
658 pollable_fd_state_ptr
659 reactor_backend_aio::make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) {
660 return pollable_fd_state_ptr(new aio_pollable_fd_state(std::move(fd), std::move(speculate)));
661 }
662
663 reactor_backend_epoll::reactor_backend_epoll(reactor& r)
664 : _r(r)
665 , _steady_clock_timer_reactor_thread(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC))
666 , _steady_clock_timer_timer_thread(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC))
667 , _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC))
668 , _storage_context(_r) {
669 ::epoll_event event;
670 event.events = EPOLLIN;
671 event.data.ptr = nullptr;
672 auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r._notify_eventfd.get(), &event);
673 throw_system_error_on(ret == -1);
674 event.events = EPOLLIN;
675 event.data.ptr = &_steady_clock_timer_reactor_thread;
676 ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _steady_clock_timer_reactor_thread.get(), &event);
677 throw_system_error_on(ret == -1);
678 }
679
680 void
681 reactor_backend_epoll::task_quota_timer_thread_fn() {
682 auto thread_name = seastar::format("timer-{}", _r._id);
683 pthread_setname_np(pthread_self(), thread_name.c_str());
684
685 sigset_t mask;
686 sigfillset(&mask);
687 for (auto sig : { SIGSEGV }) {
688 sigdelset(&mask, sig);
689 }
690 auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
691 if (r) {
692 seastar_logger.error("Thread {}: failed to block signals. Aborting.", thread_name.c_str());
693 abort();
694 }
695
696 // We need to wait until task quota is set before we can calculate how many ticks are to
697 // a minute. Technically task_quota is used from many threads, but since it is read-only here
698 // and only used during initialization we will avoid complicating the code.
699 {
700 uint64_t events;
701 _r._task_quota_timer.read(&events, 8);
702 _r.request_preemption();
703 }
704
705 while (!_r._dying.load(std::memory_order_relaxed)) {
706 // Wait for either the task quota timer, or the high resolution timer, or both,
707 // to expire.
708 struct pollfd pfds[2] = {};
709 pfds[0].fd = _r._task_quota_timer.get();
710 pfds[0].events = POLL_IN;
711 pfds[1].fd = _steady_clock_timer_timer_thread.get();
712 pfds[1].events = POLL_IN;
713 int r = poll(pfds, 2, -1);
714 assert(r != -1);
715
716 uint64_t events;
717 if (pfds[0].revents & POLL_IN) {
718 _r._task_quota_timer.read(&events, 8);
719 }
720 if (pfds[1].revents & POLL_IN) {
721 _steady_clock_timer_timer_thread.read(&events, 8);
722 _highres_timer_pending.store(true, std::memory_order_relaxed);
723 }
724 _r.request_preemption();
725
726 // We're in a different thread, but guaranteed to be on the same core, so even
727 // a signal fence is overdoing it
728 std::atomic_signal_fence(std::memory_order_seq_cst);
729 }
730 }
731
732 reactor_backend_epoll::~reactor_backend_epoll() = default;
733
734 void reactor_backend_epoll::start_tick() {
735 _task_quota_timer_thread = std::thread(&reactor_backend_epoll::task_quota_timer_thread_fn, this);
736
737 ::sched_param sp;
738 sp.sched_priority = 1;
739 auto sched_ok = pthread_setschedparam(_task_quota_timer_thread.native_handle(), SCHED_FIFO, &sp);
740 if (sched_ok != 0 && _r._id == 0) {
741 seastar_logger.warn("Unable to set SCHED_FIFO scheduling policy for timer thread; latency impact possible. Try adding CAP_SYS_NICE");
742 }
743 }
744
745 void reactor_backend_epoll::stop_tick() {
746 _r._dying.store(true, std::memory_order_relaxed);
747 _r._task_quota_timer.timerfd_settime(0, seastar::posix::to_relative_itimerspec(1ns, 1ms)); // Make the timer fire soon
748 _task_quota_timer_thread.join();
749 }
750
751 void reactor_backend_epoll::arm_highres_timer(const ::itimerspec& its) {
752 _steady_clock_timer_deadline = its;
753 _steady_clock_timer_timer_thread.timerfd_settime(TFD_TIMER_ABSTIME, its);
754 }
755
756 void
757 reactor_backend_epoll::switch_steady_clock_timers(file_desc& from, file_desc& to) {
758 auto& deadline = _steady_clock_timer_deadline;
759 if (deadline.it_value.tv_sec == 0 && deadline.it_value.tv_nsec == 0) {
760 return;
761 }
762 // Enable-then-disable, so the hardware timer doesn't have to be reprogrammed. Probably pointless.
763 to.timerfd_settime(TFD_TIMER_ABSTIME, _steady_clock_timer_deadline);
764 from.timerfd_settime(TFD_TIMER_ABSTIME, {});
765 }
766
767 void reactor_backend_epoll::maybe_switch_steady_clock_timers(int timeout, file_desc& from, file_desc& to) {
768 if (timeout != 0) {
769 switch_steady_clock_timers(from, to);
770 }
771 }
772
773 bool
774 reactor_backend_epoll::wait_and_process(int timeout, const sigset_t* active_sigmask) {
775 // If we plan to sleep, disable the timer thread steady clock timer (since it won't
776 // wake us up from sleep, and timer thread wakeup will just waste CPU time) and enable
777 // reactor thread steady clock timer.
778 maybe_switch_steady_clock_timers(timeout, _steady_clock_timer_timer_thread, _steady_clock_timer_reactor_thread);
779 auto undo_timer_switch = defer([&] () noexcept {
780 try {
781 maybe_switch_steady_clock_timers(timeout, _steady_clock_timer_reactor_thread, _steady_clock_timer_timer_thread);
782 } catch (...) {
783 seastar_logger.error("Switching steady_clock timers back failed: {}. Aborting...", std::current_exception());
784 abort();
785 }
786 });
787 std::array<epoll_event, 128> eevt;
788 int nr = ::epoll_pwait(_epollfd.get(), eevt.data(), eevt.size(), timeout, active_sigmask);
789 if (nr == -1 && errno == EINTR) {
790 return false; // gdb can cause this
791 }
792 assert(nr != -1);
793 for (int i = 0; i < nr; ++i) {
794 auto& evt = eevt[i];
795 auto pfd = reinterpret_cast<pollable_fd_state*>(evt.data.ptr);
796 if (!pfd) {
797 char dummy[8];
798 _r._notify_eventfd.read(dummy, 8);
799 continue;
800 }
801 if (evt.data.ptr == &_steady_clock_timer_reactor_thread) {
802 char dummy[8];
803 _steady_clock_timer_reactor_thread.read(dummy, 8);
804 _highres_timer_pending.store(true, std::memory_order_relaxed);
805 _steady_clock_timer_deadline = {};
806 continue;
807 }
808 if (evt.events & (EPOLLHUP | EPOLLERR)) {
809 // treat the events as required events when error occurs, let
810 // send/recv/accept/connect handle the specific error.
811 evt.events = pfd->events_requested;
812 }
813 auto events = evt.events & (EPOLLIN | EPOLLOUT);
814 auto events_to_remove = events & ~pfd->events_requested;
815 if (pfd->events_rw) {
816 // accept() signals normal completions via EPOLLIN, but errors (due to shutdown())
817 // via EPOLLOUT|EPOLLHUP, so we have to wait for both EPOLLIN and EPOLLOUT with the
818 // same future
819 complete_epoll_event(*pfd, events, EPOLLIN|EPOLLOUT);
820 } else {
821 // Normal processing where EPOLLIN and EPOLLOUT are waited for via different
822 // futures.
823 complete_epoll_event(*pfd, events, EPOLLIN);
824 complete_epoll_event(*pfd, events, EPOLLOUT);
825 }
826 if (events_to_remove) {
827 pfd->events_epoll &= ~events_to_remove;
828 evt.events = pfd->events_epoll;
829 auto op = evt.events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
830 ::epoll_ctl(_epollfd.get(), op, pfd->fd.get(), &evt);
831 }
832 }
833 return nr;
834 }
835
836 class epoll_pollable_fd_state : public pollable_fd_state {
837 pollable_fd_state_completion _pollin;
838 pollable_fd_state_completion _pollout;
839
840 pollable_fd_state_completion* get_desc(int events) {
841 if (events & EPOLLIN) {
842 return &_pollin;
843 }
844 return &_pollout;
845 }
846 public:
847 explicit epoll_pollable_fd_state(file_desc fd, speculation speculate)
848 : pollable_fd_state(std::move(fd), std::move(speculate))
849 {}
850 future<> get_completion_future(int event) {
851 auto desc = get_desc(event);
852 *desc = pollable_fd_state_completion{};
853 return desc->get_future();
854 }
855
856 void complete_with(int event) {
857 get_desc(event)->complete_with(event);
858 }
859 };
860
861 bool reactor_backend_epoll::reap_kernel_completions() {
862 // epoll does not have a separate submission stage, and just
863 // calls epoll_ctl everytime it needs, so this method and
864 // kernel_submit_work are essentially the same. Ordering also
865 // doesn't matter much. wait_and_process is actually completing,
866 // but we prefer to call it in kernel_submit_work because the
867 // reactor register two pollers for completions and one for submission,
868 // since completion is cheaper for other backends like aio. This avoids
869 // calling epoll_wait twice.
870 //
871 // We will only reap the io completions
872 return _storage_context.reap_completions();
873 }
874
875 bool reactor_backend_epoll::kernel_submit_work() {
876 bool result = false;
877 _storage_context.submit_work();
878 if (_need_epoll_events) {
879 result |= wait_and_process(0, nullptr);
880 }
881
882 result |= complete_hrtimer();
883
884 return result;
885 }
886
887 bool reactor_backend_epoll::complete_hrtimer() {
888 // This can be set from either the task quota timer thread, or
889 // wait_and_process(), above.
890 if (_highres_timer_pending.load(std::memory_order_relaxed)) {
891 _highres_timer_pending.store(false, std::memory_order_relaxed);
892 _r.service_highres_timer();
893 return true;
894 }
895 return false;
896 }
897
898 bool reactor_backend_epoll::kernel_events_can_sleep() const {
899 return _storage_context.can_sleep();
900 }
901
902 void reactor_backend_epoll::wait_and_process_events(const sigset_t* active_sigmask) {
903 wait_and_process(-1 , active_sigmask);
904 complete_hrtimer();
905 }
906
907 void reactor_backend_epoll::complete_epoll_event(pollable_fd_state& pfd, int events, int event) {
908 if (pfd.events_requested & events & event) {
909 pfd.events_requested &= ~event;
910 pfd.events_known &= ~event;
911 auto* fd = static_cast<epoll_pollable_fd_state*>(&pfd);
912 return fd->complete_with(event);
913 }
914 }
915
916 void reactor_backend_epoll::signal_received(int signo, siginfo_t* siginfo, void* ignore) {
917 if (engine_is_ready()) {
918 engine()._signals.action(signo, siginfo, ignore);
919 } else {
920 reactor::signals::failed_to_handle(signo);
921 }
922 }
923
924 future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd, int event) {
925 if (pfd.events_known & event) {
926 pfd.events_known &= ~event;
927 return make_ready_future();
928 }
929 pfd.events_rw = event == (EPOLLIN | EPOLLOUT);
930 pfd.events_requested |= event;
931 if ((pfd.events_epoll & event) != event) {
932 auto ctl = pfd.events_epoll ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
933 pfd.events_epoll |= event;
934 ::epoll_event eevt;
935 eevt.events = pfd.events_epoll;
936 eevt.data.ptr = &pfd;
937 int r = ::epoll_ctl(_epollfd.get(), ctl, pfd.fd.get(), &eevt);
938 assert(r == 0);
939 _need_epoll_events = true;
940 }
941
942 auto* fd = static_cast<epoll_pollable_fd_state*>(&pfd);
943 return fd->get_completion_future(event);
944 }
945
946 future<> reactor_backend_epoll::readable(pollable_fd_state& fd) {
947 return get_epoll_future(fd, EPOLLIN);
948 }
949
950 future<> reactor_backend_epoll::writeable(pollable_fd_state& fd) {
951 return get_epoll_future(fd, EPOLLOUT);
952 }
953
954 future<> reactor_backend_epoll::readable_or_writeable(pollable_fd_state& fd) {
955 return get_epoll_future(fd, EPOLLIN | EPOLLOUT);
956 }
957
958 void reactor_backend_epoll::forget(pollable_fd_state& fd) noexcept {
959 if (fd.events_epoll) {
960 ::epoll_ctl(_epollfd.get(), EPOLL_CTL_DEL, fd.fd.get(), nullptr);
961 }
962 auto* efd = static_cast<epoll_pollable_fd_state*>(&fd);
963 delete efd;
964 }
965
966 future<std::tuple<pollable_fd, socket_address>>
967 reactor_backend_epoll::accept(pollable_fd_state& listenfd) {
968 return engine().do_accept(listenfd);
969 }
970
971 future<> reactor_backend_epoll::connect(pollable_fd_state& fd, socket_address& sa) {
972 return engine().do_connect(fd, sa);
973 }
974
975 void reactor_backend_epoll::shutdown(pollable_fd_state& fd, int how) {
976 fd.fd.shutdown(how);
977 }
978
979 future<size_t>
980 reactor_backend_epoll::read_some(pollable_fd_state& fd, void* buffer, size_t len) {
981 return engine().do_read_some(fd, buffer, len);
982 }
983
984 future<size_t>
985 reactor_backend_epoll::read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) {
986 return engine().do_read_some(fd, iov);
987 }
988
989 future<temporary_buffer<char>>
990 reactor_backend_epoll::read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) {
991 return engine().do_read_some(fd, ba);
992 }
993
994 future<size_t>
995 reactor_backend_epoll::write_some(pollable_fd_state& fd, const void* buffer, size_t len) {
996 return engine().do_write_some(fd, buffer, len);
997 }
998
999 future<size_t>
1000 reactor_backend_epoll::write_some(pollable_fd_state& fd, net::packet& p) {
1001 return engine().do_write_some(fd, p);
1002 }
1003
1004 void
1005 reactor_backend_epoll::request_preemption() {
1006 _r._preemption_monitor.head.store(1, std::memory_order_relaxed);
1007 }
1008
1009 void reactor_backend_epoll::start_handling_signal() {
1010 // The epoll backend uses signals for the high resolution timer. That is used for thread_scheduling_group, so we
1011 // request preemption so when we receive a signal.
1012 request_preemption();
1013 }
1014
1015 pollable_fd_state_ptr
1016 reactor_backend_epoll::make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) {
1017 return pollable_fd_state_ptr(new epoll_pollable_fd_state(std::move(fd), std::move(speculate)));
1018 }
1019
1020 void reactor_backend_epoll::reset_preemption_monitor() {
1021 _r._preemption_monitor.head.store(0, std::memory_order_relaxed);
1022 }
1023
1024 #ifdef HAVE_OSV
1025 reactor_backend_osv::reactor_backend_osv() {
1026 }
1027
1028 bool
1029 reactor_backend_osv::reap_kernel_completions() {
1030 _poller.process();
1031 // osv::poller::process runs pollable's callbacks, but does not currently
1032 // have a timer expiration callback - instead if gives us an expired()
1033 // function we need to check:
1034 if (_poller.expired()) {
1035 _timer_promise.set_value();
1036 _timer_promise = promise<>();
1037 }
1038 return true;
1039 }
1040
1041 reactor_backend_osv::kernel_submit_work() {
1042 }
1043
1044 void
1045 reactor_backend_osv::wait_and_process_events(const sigset_t* sigset) {
1046 return process_events_nowait();
1047 }
1048
1049 future<>
1050 reactor_backend_osv::readable(pollable_fd_state& fd) {
1051 std::cerr << "reactor_backend_osv does not support file descriptors - readable() shouldn't have been called!\n";
1052 abort();
1053 }
1054
1055 future<>
1056 reactor_backend_osv::writeable(pollable_fd_state& fd) {
1057 std::cerr << "reactor_backend_osv does not support file descriptors - writeable() shouldn't have been called!\n";
1058 abort();
1059 }
1060
1061 void
1062 reactor_backend_osv::forget(pollable_fd_state& fd) noexcept {
1063 std::cerr << "reactor_backend_osv does not support file descriptors - forget() shouldn't have been called!\n";
1064 abort();
1065 }
1066
1067 future<std::tuple<pollable_fd, socket_address>>
1068 reactor_backend_osv::accept(pollable_fd_state& listenfd) {
1069 return engine().do_accept(listenfd);
1070 }
1071
1072 future<> reactor_backend_osv::connect(pollable_fd_state& fd, socket_address& sa) {
1073 return engine().do_connect(fd, sa);
1074 }
1075
1076 void reactor_backend_osv::shutdown(pollable_fd_state& fd, int how) {
1077 fd.fd.shutdown(how);
1078 }
1079
1080 future<size_t>
1081 reactor_backend_osv::read_some(pollable_fd_state& fd, void* buffer, size_t len) {
1082 return engine().do_read_some(fd, buffer, len);
1083 }
1084
1085 future<size_t>
1086 reactor_backend_osv::read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) {
1087 return engine().do_read_some(fd, iov);
1088 }
1089
1090 future<temporary_buffer<char>>
1091 reactor_backend_osv::read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) {
1092 return engine().do_read_some(fd, ba);
1093 }
1094
1095 future<size_t>
1096 reactor_backend_osv::write_some(pollable_fd_state& fd, const void* buffer, size_t len) {
1097 return engine().do_write_some(fd, buffer, len);
1098 }
1099
1100 future<size_t>
1101 reactor_backend_osv::write_some(pollable_fd_state& fd, net::packet& p) {
1102 return engine().do_write_some(fd, p);
1103 }
1104
1105 void
1106 reactor_backend_osv::enable_timer(steady_clock_type::time_point when) {
1107 _poller.set_timer(when);
1108 }
1109
1110 pollable_fd_state_ptr
1111 reactor_backend_osv::make_pollable_fd_state(file_desc fd, pollable_fd::speculation speculate) {
1112 std::cerr << "reactor_backend_osv does not support file descriptors - make_pollable_fd_state() shouldn't have been called!\n";
1113 abort();
1114 }
1115 #endif
1116
1117 static bool detect_aio_poll() {
1118 auto fd = file_desc::eventfd(0, 0);
1119 aio_context_t ioc{};
1120 setup_aio_context(1, &ioc);
1121 auto cleanup = defer([&] () noexcept { io_destroy(ioc); });
1122 linux_abi::iocb iocb = internal::make_poll_iocb(fd.get(), POLLIN|POLLOUT);
1123 linux_abi::iocb* a[1] = { &iocb };
1124 auto r = io_submit(ioc, 1, a);
1125 if (r != 1) {
1126 return false;
1127 }
1128 uint64_t one = 1;
1129 fd.write(&one, 8);
1130 io_event ev[1];
1131 // We set force_syscall = true (the last parameter) to ensure
1132 // the system call exists and is usable. If IOCB_CMD_POLL exists then
1133 // io_pgetevents() will also exist, but some versions of docker
1134 // have a syscall whitelist that does not include io_pgetevents(),
1135 // which causes it to fail with -EPERM. See
1136 // https://github.com/moby/moby/issues/38894.
1137 r = io_pgetevents(ioc, 1, 1, ev, nullptr, nullptr, true);
1138 return r == 1;
1139 }
1140
1141 bool reactor_backend_selector::has_enough_aio_nr() {
1142 auto aio_max_nr = read_first_line_as<unsigned>("/proc/sys/fs/aio-max-nr");
1143 auto aio_nr = read_first_line_as<unsigned>("/proc/sys/fs/aio-nr");
1144 /* reactor_backend_selector::available() will be execute in early stage,
1145 * it's before io_setup() issued, and not per-cpu basis.
1146 * So this method calculates:
1147 * Available AIO on the system - (request AIO per-cpu * ncpus)
1148 */
1149 if (aio_max_nr - aio_nr < reactor::max_aio * smp::count) {
1150 return false;
1151 }
1152 return true;
1153 }
1154
1155 std::unique_ptr<reactor_backend> reactor_backend_selector::create(reactor& r) {
1156 if (_name == "linux-aio") {
1157 return std::make_unique<reactor_backend_aio>(r);
1158 } else if (_name == "epoll") {
1159 return std::make_unique<reactor_backend_epoll>(r);
1160 }
1161 throw std::logic_error("bad reactor backend");
1162 }
1163
1164 reactor_backend_selector reactor_backend_selector::default_backend() {
1165 return available()[0];
1166 }
1167
1168 std::vector<reactor_backend_selector> reactor_backend_selector::available() {
1169 std::vector<reactor_backend_selector> ret;
1170 if (detect_aio_poll() && has_enough_aio_nr()) {
1171 ret.push_back(reactor_backend_selector("linux-aio"));
1172 }
1173 ret.push_back(reactor_backend_selector("epoll"));
1174 return ret;
1175 }
1176
1177 }