2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
21 #include "core/reactor_backend.hh"
22 #include "core/thread_pool.hh"
23 #include "core/syscall_result.hh"
24 #include <seastar/core/print.hh>
25 #include <seastar/core/reactor.hh>
26 #include <seastar/core/internal/buffer_allocator.hh>
27 #include <seastar/util/defer.hh>
28 #include <seastar/util/read_first_line.hh>
31 #include <sys/syscall.h>
34 #include <osv/newpoll.hh>
39 using namespace std::chrono_literals
;
40 using namespace internal
;
41 using namespace internal::linux_abi
;
42 namespace fs
= std::filesystem
;
44 class pollable_fd_state_completion
: public kernel_completion
{
47 virtual void complete_with(ssize_t res
) override
{
50 future
<> get_future() {
51 return _pr
.get_future();
55 void prepare_iocb(io_request
& req
, io_completion
* desc
, iocb
& iocb
) {
56 switch (req
.opcode()) {
57 case io_request::operation::fdatasync
:
58 iocb
= make_fdsync_iocb(req
.fd());
60 case io_request::operation::write
:
61 iocb
= make_write_iocb(req
.fd(), req
.pos(), req
.address(), req
.size());
62 set_nowait(iocb
, req
.nowait_works());
64 case io_request::operation::writev
:
65 iocb
= make_writev_iocb(req
.fd(), req
.pos(), req
.iov(), req
.size());
66 set_nowait(iocb
, req
.nowait_works());
68 case io_request::operation::read
:
69 iocb
= make_read_iocb(req
.fd(), req
.pos(), req
.address(), req
.size());
70 set_nowait(iocb
, req
.nowait_works());
72 case io_request::operation::readv
:
73 iocb
= make_readv_iocb(req
.fd(), req
.pos(), req
.iov(), req
.size());
74 set_nowait(iocb
, req
.nowait_works());
77 seastar_logger
.error("Invalid operation for iocb: {}", req
.opname());
80 set_user_data(iocb
, desc
);
83 aio_storage_context::iocb_pool::iocb_pool() {
84 for (unsigned i
= 0; i
!= max_aio
; ++i
) {
85 _free_iocbs
.push(&_iocb_pool
[i
]);
89 aio_storage_context::aio_storage_context(reactor
& r
)
92 static_assert(max_aio
>= reactor::max_queues
* reactor::max_queues
,
93 "Mismatch between maximum allowed io and what the IO queues can produce");
94 internal::setup_aio_context(max_aio
, &_io_context
);
95 _r
.at_exit([this] { return stop(); });
98 aio_storage_context::~aio_storage_context() {
99 internal::io_destroy(_io_context
);
102 future
<> aio_storage_context::stop() noexcept
{
103 return std::exchange(_pending_aio_retry_fut
, make_ready_future
<>()).finally([this] {
104 return do_until([this] { return !_iocb_pool
.outstanding(); }, [this] {
105 reap_completions(false);
106 return make_ready_future
<>();
112 internal::linux_abi::iocb
&
113 aio_storage_context::iocb_pool::get_one() {
114 auto io
= _free_iocbs
.top();
121 aio_storage_context::iocb_pool::put_one(internal::linux_abi::iocb
* io
) {
122 _free_iocbs
.push(io
);
127 aio_storage_context::iocb_pool::outstanding() const {
128 return max_aio
- _free_iocbs
.size();
133 aio_storage_context::iocb_pool::has_capacity() const {
134 return !_free_iocbs
.empty();
137 // Returns: number of iocbs consumed (0 or 1)
139 aio_storage_context::handle_aio_error(linux_abi::iocb
* iocb
, int ec
) {
144 auto desc
= reinterpret_cast<kernel_completion
*>(get_user_data(*iocb
));
145 _iocb_pool
.put_one(iocb
);
146 desc
->complete_with(-EBADF
);
147 // if EBADF, it means that the first request has a bad fd, so
148 // we will only remove it from _pending_io and try again.
152 ++_r
._io_stats
.aio_errors
;
153 throw_system_error_on(true, "io_submit");
158 extern bool aio_nowait_supported
;
161 aio_storage_context::submit_work() {
162 bool did_work
= false;
164 _submission_queue
.resize(0);
165 size_t to_submit
= _r
._io_sink
.drain([this] (internal::io_request
& req
, io_completion
* desc
) -> bool {
166 if (!_iocb_pool
.has_capacity()) {
170 auto& io
= _iocb_pool
.get_one();
171 prepare_iocb(req
, desc
, io
);
173 if (_r
._aio_eventfd
) {
174 set_eventfd_notification(io
, _r
._aio_eventfd
->get_fd());
176 _submission_queue
.push_back(&io
);
180 if (__builtin_expect(_r
._kernel_page_cache
, false)) {
181 // linux-aio is not asynchrous when the page cache is used,
182 // so we don't want to call io_submit() from the reactor thread.
184 // Pretend that all aio failed with EAGAIN and submit them
185 // via schedule_retry(), below.
186 did_work
= !_submission_queue
.empty();
187 for (auto& iocbp
: _submission_queue
) {
188 set_nowait(*iocbp
, false);
189 _pending_aio_retry
.push_back(iocbp
);
194 size_t submitted
= 0;
195 while (to_submit
> submitted
) {
196 auto nr
= to_submit
- submitted
;
197 auto iocbs
= _submission_queue
.data() + submitted
;
198 auto r
= io_submit(_io_context
, nr
, iocbs
);
201 nr_consumed
= handle_aio_error(iocbs
[0], errno
);
203 nr_consumed
= size_t(r
);
206 submitted
+= nr_consumed
;
209 if (need_to_retry() && !retry_in_progress()) {
216 void aio_storage_context::schedule_retry() {
217 // loop until both _pending_aio_retry and _aio_retries are empty.
218 // While retrying _aio_retries, new retries may be queued onto _pending_aio_retry.
219 _pending_aio_retry_fut
= do_until([this] {
220 if (_aio_retries
.empty()) {
221 if (_pending_aio_retry
.empty()) {
224 // _pending_aio_retry, holding a batch of new iocbs to retry,
225 // is swapped with the empty _aio_retries.
226 std::swap(_aio_retries
, _pending_aio_retry
);
230 return _r
._thread_pool
->submit
<syscall_result
<int>>([this] () mutable {
231 auto r
= io_submit(_io_context
, _aio_retries
.size(), _aio_retries
.data());
232 return wrap_syscall
<int>(r
);
233 }).then_wrapped([this] (future
<syscall_result
<int>> f
) {
234 // If submit failed, just log the error and exit the loop.
235 // The next call to submit_work will call schedule_retry again.
237 auto ex
= f
.get_exception();
238 seastar_logger
.warn("aio_storage_context::schedule_retry failed: {}", std::move(ex
));
241 auto result
= f
.get0();
242 auto iocbs
= _aio_retries
.data();
243 size_t nr_consumed
= 0;
244 if (result
.result
== -1) {
246 nr_consumed
= handle_aio_error(iocbs
[0], result
.error
);
248 seastar_logger
.error("aio retry failed: {}. Aborting.", std::current_exception());
252 nr_consumed
= result
.result
;
254 _aio_retries
.erase(_aio_retries
.begin(), _aio_retries
.begin() + nr_consumed
);
259 bool aio_storage_context::reap_completions(bool allow_retry
)
261 struct timespec timeout
= {0, 0};
262 auto n
= io_getevents(_io_context
, 1, max_aio
, _ev_buffer
, &timeout
, _r
._force_io_getevents_syscall
);
263 if (n
== -1 && errno
== EINTR
) {
267 for (size_t i
= 0; i
< size_t(n
); ++i
) {
268 auto iocb
= get_iocb(_ev_buffer
[i
]);
269 if (_ev_buffer
[i
].res
== -EAGAIN
&& allow_retry
) {
270 set_nowait(*iocb
, false);
271 _pending_aio_retry
.push_back(iocb
);
274 _iocb_pool
.put_one(iocb
);
275 auto desc
= reinterpret_cast<kernel_completion
*>(_ev_buffer
[i
].data
);
276 desc
->complete_with(_ev_buffer
[i
].res
);
281 bool aio_storage_context::can_sleep() const {
282 // Because aio depends on polling, it cannot generate events to wake us up, Therefore, sleep
283 // is only possible if there are no in-flight aios. If there are, we need to keep polling.
285 // Alternatively, if we enabled _aio_eventfd, we can always enter
286 unsigned executing
= _iocb_pool
.outstanding();
287 return executing
== 0 || _r
._aio_eventfd
;
290 aio_general_context::aio_general_context(size_t nr
)
291 : iocbs(new iocb
*[nr
])
293 , end(iocbs
.get() + nr
)
295 setup_aio_context(nr
, &io_context
);
298 aio_general_context::~aio_general_context() {
299 io_destroy(io_context
);
302 void aio_general_context::queue(linux_abi::iocb
* iocb
) {
307 size_t aio_general_context::flush() {
308 auto begin
= iocbs
.get();
310 while (begin
!= last
) {
311 auto r
= io_submit(io_context
, last
- begin
, begin
);
312 if (__builtin_expect(r
> 0, true)) {
316 // errno == EAGAIN is expected here. We don't explicitly assert that
317 // since the assert below requires that some progress will be
318 // made, preventing an endless loop for any reason.
319 if (need_preempt()) {
320 assert(retried
!= begin
);
324 auto nr
= last
- iocbs
.get();
329 completion_with_iocb::completion_with_iocb(int fd
, int events
, void* user_data
)
330 : _iocb(make_poll_iocb(fd
, events
)) {
331 set_user_data(_iocb
, user_data
);
334 void completion_with_iocb::maybe_queue(aio_general_context
& context
) {
337 context
.queue(&_iocb
);
341 hrtimer_aio_completion::hrtimer_aio_completion(reactor
& r
, file_desc
& fd
)
342 : fd_kernel_completion(fd
)
343 , completion_with_iocb(fd
.get(), POLLIN
, this)
346 task_quota_aio_completion::task_quota_aio_completion(file_desc
& fd
)
347 : fd_kernel_completion(fd
)
348 , completion_with_iocb(fd
.get(), POLLIN
, this) {}
350 smp_wakeup_aio_completion::smp_wakeup_aio_completion(file_desc
& fd
)
351 : fd_kernel_completion(fd
)
352 , completion_with_iocb(fd
.get(), POLLIN
, this) {}
355 hrtimer_aio_completion::complete_with(ssize_t ret
) {
356 uint64_t expirations
= 0;
357 (void)_fd
.read(&expirations
, 8);
359 _r
.service_highres_timer();
361 completion_with_iocb::completed();
365 task_quota_aio_completion::complete_with(ssize_t ret
) {
367 (void)_fd
.read(&v
, 8);
368 completion_with_iocb::completed();
372 smp_wakeup_aio_completion::complete_with(ssize_t ret
) {
374 (void)_fd
.read(&ignore
, 8);
375 completion_with_iocb::completed();
378 preempt_io_context::preempt_io_context(reactor
& r
, file_desc
& task_quota
, file_desc
& hrtimer
)
380 , _task_quota_aio_completion(task_quota
)
381 , _hrtimer_aio_completion(r
, hrtimer
)
384 void preempt_io_context::start_tick() {
385 // Preempt whenever an event (timer tick or signal) is available on the
386 // _preempting_io ring
387 set_need_preempt_var(reinterpret_cast<const preemption_monitor
*>(_context
.io_context
+ 8));
388 // preempt_io_context::request_preemption() will write to reactor::_preemption_monitor, which is now ignored
391 void preempt_io_context::stop_tick() {
392 set_need_preempt_var(&_r
._preemption_monitor
);
395 void preempt_io_context::request_preemption() {
396 ::itimerspec expired
= {};
397 expired
.it_value
.tv_nsec
= 1;
398 // will trigger immediately, triggering the preemption monitor
399 _hrtimer_aio_completion
.fd().timerfd_settime(TFD_TIMER_ABSTIME
, expired
);
401 // This might have been called from poll_once. If that is the case, we cannot assume that timerfd is being
403 _hrtimer_aio_completion
.maybe_queue(_context
);
406 // The kernel is not obliged to deliver the completion immediately, so wait for it
407 while (!need_preempt()) {
408 std::atomic_signal_fence(std::memory_order_seq_cst
);
412 void preempt_io_context::reset_preemption_monitor() {
413 service_preempting_io();
414 _hrtimer_aio_completion
.maybe_queue(_context
);
415 _task_quota_aio_completion
.maybe_queue(_context
);
419 bool preempt_io_context::service_preempting_io() {
420 linux_abi::io_event a
[2];
421 auto r
= io_getevents(_context
.io_context
, 0, 2, a
, 0);
423 bool did_work
= r
> 0;
424 for (unsigned i
= 0; i
!= unsigned(r
); ++i
) {
425 auto desc
= reinterpret_cast<kernel_completion
*>(a
[i
].data
);
426 desc
->complete_with(a
[i
].res
);
431 file_desc
reactor_backend_aio::make_timerfd() {
432 return file_desc::timerfd_create(CLOCK_MONOTONIC
, TFD_CLOEXEC
|TFD_NONBLOCK
);
436 reactor_backend_aio::max_polls() const {
437 return _r
._cfg
.max_networking_aio_io_control_blocks
;
440 bool reactor_backend_aio::await_events(int timeout
, const sigset_t
* active_sigmask
) {
442 ::timespec
* tsp
= [&] () -> ::timespec
* {
445 } else if (timeout
== -1) {
448 ts
= posix::to_timespec(timeout
* 1ms
);
452 constexpr size_t batch_size
= 128;
453 io_event batch
[batch_size
];
454 bool did_work
= false;
457 r
= io_pgetevents(_polling_io
.io_context
, 1, batch_size
, batch
, tsp
, active_sigmask
);
458 if (r
== -1 && errno
== EINTR
) {
462 for (unsigned i
= 0; i
!= unsigned(r
); ++i
) {
464 auto& event
= batch
[i
];
465 auto* desc
= reinterpret_cast<kernel_completion
*>(uintptr_t(event
.data
));
466 desc
->complete_with(event
.res
);
468 // For the next iteration, don't use a timeout, since we may have waited already
471 } while (r
== batch_size
);
475 void reactor_backend_aio::signal_received(int signo
, siginfo_t
* siginfo
, void* ignore
) {
476 engine()._signals
.action(signo
, siginfo
, ignore
);
479 reactor_backend_aio::reactor_backend_aio(reactor
& r
)
481 , _hrtimer_timerfd(make_timerfd())
482 , _storage_context(_r
)
483 , _preempting_io(_r
, _r
._task_quota_timer
, _hrtimer_timerfd
)
484 , _hrtimer_poll_completion(_r
, _hrtimer_timerfd
)
485 , _smp_wakeup_aio_completion(_r
._notify_eventfd
)
487 // Protect against spurious wakeups - if we get notified that the timer has
488 // expired when it really hasn't, we don't want to block in read(tfd, ...).
489 auto tfd
= _r
._task_quota_timer
.get();
490 ::fcntl(tfd
, F_SETFL
, ::fcntl(tfd
, F_GETFL
) | O_NONBLOCK
);
492 sigset_t mask
= make_sigset_mask(hrtimer_signal());
493 auto e
= ::pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
497 bool reactor_backend_aio::reap_kernel_completions() {
498 bool did_work
= await_events(0, nullptr);
499 did_work
|= _storage_context
.reap_completions();
503 bool reactor_backend_aio::kernel_submit_work() {
504 _hrtimer_poll_completion
.maybe_queue(_polling_io
);
505 bool did_work
= _polling_io
.flush();
506 did_work
|= _storage_context
.submit_work();
510 bool reactor_backend_aio::kernel_events_can_sleep() const {
511 return _storage_context
.can_sleep();
514 void reactor_backend_aio::wait_and_process_events(const sigset_t
* active_sigmask
) {
516 bool did_work
= _preempting_io
.service_preempting_io();
521 _hrtimer_poll_completion
.maybe_queue(_polling_io
);
522 _smp_wakeup_aio_completion
.maybe_queue(_polling_io
);
524 await_events(timeout
, active_sigmask
);
525 _preempting_io
.service_preempting_io(); // clear task quota timer
528 class aio_pollable_fd_state
: public pollable_fd_state
{
529 internal::linux_abi::iocb _iocb_pollin
;
530 pollable_fd_state_completion _completion_pollin
;
532 internal::linux_abi::iocb _iocb_pollout
;
533 pollable_fd_state_completion _completion_pollout
;
535 pollable_fd_state_completion
* get_desc(int events
) {
536 if (events
& POLLIN
) {
537 return &_completion_pollin
;
539 return &_completion_pollout
;
541 internal::linux_abi::iocb
* get_iocb(int events
) {
542 if (events
& POLLIN
) {
543 return &_iocb_pollin
;
545 return &_iocb_pollout
;
547 explicit aio_pollable_fd_state(file_desc fd
, speculation speculate
)
548 : pollable_fd_state(std::move(fd
), std::move(speculate
))
550 future
<> get_completion_future(int events
) {
551 return get_desc(events
)->get_future();
555 future
<> reactor_backend_aio::poll(pollable_fd_state
& fd
, int events
) {
557 if (events
& fd
.events_known
) {
558 fd
.events_known
&= ~events
;
559 return make_ready_future
<>();
562 fd
.events_rw
= events
== (POLLIN
|POLLOUT
);
564 auto* pfd
= static_cast<aio_pollable_fd_state
*>(&fd
);
565 auto* iocb
= pfd
->get_iocb(events
);
566 auto* desc
= pfd
->get_desc(events
);
567 *iocb
= make_poll_iocb(fd
.fd
.get(), events
);
568 *desc
= pollable_fd_state_completion
{};
569 set_user_data(*iocb
, desc
);
570 _polling_io
.queue(iocb
);
571 return pfd
->get_completion_future(events
);
573 return make_exception_future
<>(std::current_exception());
577 future
<> reactor_backend_aio::readable(pollable_fd_state
& fd
) {
578 return poll(fd
, POLLIN
);
581 future
<> reactor_backend_aio::writeable(pollable_fd_state
& fd
) {
582 return poll(fd
, POLLOUT
);
585 future
<> reactor_backend_aio::readable_or_writeable(pollable_fd_state
& fd
) {
586 return poll(fd
, POLLIN
|POLLOUT
);
589 void reactor_backend_aio::forget(pollable_fd_state
& fd
) noexcept
{
590 auto* pfd
= static_cast<aio_pollable_fd_state
*>(&fd
);
595 future
<std::tuple
<pollable_fd
, socket_address
>>
596 reactor_backend_aio::accept(pollable_fd_state
& listenfd
) {
597 return engine().do_accept(listenfd
);
600 future
<> reactor_backend_aio::connect(pollable_fd_state
& fd
, socket_address
& sa
) {
601 return engine().do_connect(fd
, sa
);
604 void reactor_backend_aio::shutdown(pollable_fd_state
& fd
, int how
) {
609 reactor_backend_aio::read_some(pollable_fd_state
& fd
, void* buffer
, size_t len
) {
610 return engine().do_read_some(fd
, buffer
, len
);
614 reactor_backend_aio::read_some(pollable_fd_state
& fd
, const std::vector
<iovec
>& iov
) {
615 return engine().do_read_some(fd
, iov
);
618 future
<temporary_buffer
<char>>
619 reactor_backend_aio::read_some(pollable_fd_state
& fd
, internal::buffer_allocator
* ba
) {
620 return engine().do_read_some(fd
, ba
);
624 reactor_backend_aio::write_some(pollable_fd_state
& fd
, const void* buffer
, size_t len
) {
625 return engine().do_write_some(fd
, buffer
, len
);
629 reactor_backend_aio::write_some(pollable_fd_state
& fd
, net::packet
& p
) {
630 return engine().do_write_some(fd
, p
);
633 void reactor_backend_aio::start_tick() {
634 _preempting_io
.start_tick();
637 void reactor_backend_aio::stop_tick() {
638 _preempting_io
.stop_tick();
641 void reactor_backend_aio::arm_highres_timer(const ::itimerspec
& its
) {
642 _hrtimer_timerfd
.timerfd_settime(TFD_TIMER_ABSTIME
, its
);
645 void reactor_backend_aio::reset_preemption_monitor() {
646 _preempting_io
.reset_preemption_monitor();
649 void reactor_backend_aio::request_preemption() {
650 _preempting_io
.request_preemption();
653 void reactor_backend_aio::start_handling_signal() {
654 // The aio backend only uses SIGHUP/SIGTERM/SIGINT. We don't need to handle them right away and our
655 // implementation of request_preemption is not signal safe, so do nothing.
658 pollable_fd_state_ptr
659 reactor_backend_aio::make_pollable_fd_state(file_desc fd
, pollable_fd::speculation speculate
) {
660 return pollable_fd_state_ptr(new aio_pollable_fd_state(std::move(fd
), std::move(speculate
)));
663 reactor_backend_epoll::reactor_backend_epoll(reactor
& r
)
665 , _steady_clock_timer_reactor_thread(file_desc::timerfd_create(CLOCK_MONOTONIC
, TFD_NONBLOCK
|TFD_CLOEXEC
))
666 , _steady_clock_timer_timer_thread(file_desc::timerfd_create(CLOCK_MONOTONIC
, TFD_NONBLOCK
|TFD_CLOEXEC
))
667 , _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC
))
668 , _storage_context(_r
) {
670 event
.events
= EPOLLIN
;
671 event
.data
.ptr
= nullptr;
672 auto ret
= ::epoll_ctl(_epollfd
.get(), EPOLL_CTL_ADD
, _r
._notify_eventfd
.get(), &event
);
673 throw_system_error_on(ret
== -1);
674 event
.events
= EPOLLIN
;
675 event
.data
.ptr
= &_steady_clock_timer_reactor_thread
;
676 ret
= ::epoll_ctl(_epollfd
.get(), EPOLL_CTL_ADD
, _steady_clock_timer_reactor_thread
.get(), &event
);
677 throw_system_error_on(ret
== -1);
681 reactor_backend_epoll::task_quota_timer_thread_fn() {
682 auto thread_name
= seastar::format("timer-{}", _r
._id
);
683 pthread_setname_np(pthread_self(), thread_name
.c_str());
687 for (auto sig
: { SIGSEGV
}) {
688 sigdelset(&mask
, sig
);
690 auto r
= ::pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
692 seastar_logger
.error("Thread {}: failed to block signals. Aborting.", thread_name
.c_str());
696 // We need to wait until task quota is set before we can calculate how many ticks are to
697 // a minute. Technically task_quota is used from many threads, but since it is read-only here
698 // and only used during initialization we will avoid complicating the code.
701 _r
._task_quota_timer
.read(&events
, 8);
702 _r
.request_preemption();
705 while (!_r
._dying
.load(std::memory_order_relaxed
)) {
706 // Wait for either the task quota timer, or the high resolution timer, or both,
708 struct pollfd pfds
[2] = {};
709 pfds
[0].fd
= _r
._task_quota_timer
.get();
710 pfds
[0].events
= POLL_IN
;
711 pfds
[1].fd
= _steady_clock_timer_timer_thread
.get();
712 pfds
[1].events
= POLL_IN
;
713 int r
= poll(pfds
, 2, -1);
717 if (pfds
[0].revents
& POLL_IN
) {
718 _r
._task_quota_timer
.read(&events
, 8);
720 if (pfds
[1].revents
& POLL_IN
) {
721 _steady_clock_timer_timer_thread
.read(&events
, 8);
722 _highres_timer_pending
.store(true, std::memory_order_relaxed
);
724 _r
.request_preemption();
726 // We're in a different thread, but guaranteed to be on the same core, so even
727 // a signal fence is overdoing it
728 std::atomic_signal_fence(std::memory_order_seq_cst
);
732 reactor_backend_epoll::~reactor_backend_epoll() = default;
734 void reactor_backend_epoll::start_tick() {
735 _task_quota_timer_thread
= std::thread(&reactor_backend_epoll::task_quota_timer_thread_fn
, this);
738 sp
.sched_priority
= 1;
739 auto sched_ok
= pthread_setschedparam(_task_quota_timer_thread
.native_handle(), SCHED_FIFO
, &sp
);
740 if (sched_ok
!= 0 && _r
._id
== 0) {
741 seastar_logger
.warn("Unable to set SCHED_FIFO scheduling policy for timer thread; latency impact possible. Try adding CAP_SYS_NICE");
745 void reactor_backend_epoll::stop_tick() {
746 _r
._dying
.store(true, std::memory_order_relaxed
);
747 _r
._task_quota_timer
.timerfd_settime(0, seastar::posix::to_relative_itimerspec(1ns
, 1ms
)); // Make the timer fire soon
748 _task_quota_timer_thread
.join();
751 void reactor_backend_epoll::arm_highres_timer(const ::itimerspec
& its
) {
752 _steady_clock_timer_deadline
= its
;
753 _steady_clock_timer_timer_thread
.timerfd_settime(TFD_TIMER_ABSTIME
, its
);
757 reactor_backend_epoll::switch_steady_clock_timers(file_desc
& from
, file_desc
& to
) {
758 auto& deadline
= _steady_clock_timer_deadline
;
759 if (deadline
.it_value
.tv_sec
== 0 && deadline
.it_value
.tv_nsec
== 0) {
762 // Enable-then-disable, so the hardware timer doesn't have to be reprogrammed. Probably pointless.
763 to
.timerfd_settime(TFD_TIMER_ABSTIME
, _steady_clock_timer_deadline
);
764 from
.timerfd_settime(TFD_TIMER_ABSTIME
, {});
767 void reactor_backend_epoll::maybe_switch_steady_clock_timers(int timeout
, file_desc
& from
, file_desc
& to
) {
769 switch_steady_clock_timers(from
, to
);
774 reactor_backend_epoll::wait_and_process(int timeout
, const sigset_t
* active_sigmask
) {
775 // If we plan to sleep, disable the timer thread steady clock timer (since it won't
776 // wake us up from sleep, and timer thread wakeup will just waste CPU time) and enable
777 // reactor thread steady clock timer.
778 maybe_switch_steady_clock_timers(timeout
, _steady_clock_timer_timer_thread
, _steady_clock_timer_reactor_thread
);
779 auto undo_timer_switch
= defer([&] () noexcept
{
781 maybe_switch_steady_clock_timers(timeout
, _steady_clock_timer_reactor_thread
, _steady_clock_timer_timer_thread
);
783 seastar_logger
.error("Switching steady_clock timers back failed: {}. Aborting...", std::current_exception());
787 std::array
<epoll_event
, 128> eevt
;
788 int nr
= ::epoll_pwait(_epollfd
.get(), eevt
.data(), eevt
.size(), timeout
, active_sigmask
);
789 if (nr
== -1 && errno
== EINTR
) {
790 return false; // gdb can cause this
793 for (int i
= 0; i
< nr
; ++i
) {
795 auto pfd
= reinterpret_cast<pollable_fd_state
*>(evt
.data
.ptr
);
798 _r
._notify_eventfd
.read(dummy
, 8);
801 if (evt
.data
.ptr
== &_steady_clock_timer_reactor_thread
) {
803 _steady_clock_timer_reactor_thread
.read(dummy
, 8);
804 _highres_timer_pending
.store(true, std::memory_order_relaxed
);
805 _steady_clock_timer_deadline
= {};
808 if (evt
.events
& (EPOLLHUP
| EPOLLERR
)) {
809 // treat the events as required events when error occurs, let
810 // send/recv/accept/connect handle the specific error.
811 evt
.events
= pfd
->events_requested
;
813 auto events
= evt
.events
& (EPOLLIN
| EPOLLOUT
);
814 auto events_to_remove
= events
& ~pfd
->events_requested
;
815 if (pfd
->events_rw
) {
816 // accept() signals normal completions via EPOLLIN, but errors (due to shutdown())
817 // via EPOLLOUT|EPOLLHUP, so we have to wait for both EPOLLIN and EPOLLOUT with the
819 complete_epoll_event(*pfd
, events
, EPOLLIN
|EPOLLOUT
);
821 // Normal processing where EPOLLIN and EPOLLOUT are waited for via different
823 complete_epoll_event(*pfd
, events
, EPOLLIN
);
824 complete_epoll_event(*pfd
, events
, EPOLLOUT
);
826 if (events_to_remove
) {
827 pfd
->events_epoll
&= ~events_to_remove
;
828 evt
.events
= pfd
->events_epoll
;
829 auto op
= evt
.events
? EPOLL_CTL_MOD
: EPOLL_CTL_DEL
;
830 ::epoll_ctl(_epollfd
.get(), op
, pfd
->fd
.get(), &evt
);
836 class epoll_pollable_fd_state
: public pollable_fd_state
{
837 pollable_fd_state_completion _pollin
;
838 pollable_fd_state_completion _pollout
;
840 pollable_fd_state_completion
* get_desc(int events
) {
841 if (events
& EPOLLIN
) {
847 explicit epoll_pollable_fd_state(file_desc fd
, speculation speculate
)
848 : pollable_fd_state(std::move(fd
), std::move(speculate
))
850 future
<> get_completion_future(int event
) {
851 auto desc
= get_desc(event
);
852 *desc
= pollable_fd_state_completion
{};
853 return desc
->get_future();
856 void complete_with(int event
) {
857 get_desc(event
)->complete_with(event
);
861 bool reactor_backend_epoll::reap_kernel_completions() {
862 // epoll does not have a separate submission stage, and just
863 // calls epoll_ctl everytime it needs, so this method and
864 // kernel_submit_work are essentially the same. Ordering also
865 // doesn't matter much. wait_and_process is actually completing,
866 // but we prefer to call it in kernel_submit_work because the
867 // reactor register two pollers for completions and one for submission,
868 // since completion is cheaper for other backends like aio. This avoids
869 // calling epoll_wait twice.
871 // We will only reap the io completions
872 return _storage_context
.reap_completions();
875 bool reactor_backend_epoll::kernel_submit_work() {
877 _storage_context
.submit_work();
878 if (_need_epoll_events
) {
879 result
|= wait_and_process(0, nullptr);
882 result
|= complete_hrtimer();
887 bool reactor_backend_epoll::complete_hrtimer() {
888 // This can be set from either the task quota timer thread, or
889 // wait_and_process(), above.
890 if (_highres_timer_pending
.load(std::memory_order_relaxed
)) {
891 _highres_timer_pending
.store(false, std::memory_order_relaxed
);
892 _r
.service_highres_timer();
898 bool reactor_backend_epoll::kernel_events_can_sleep() const {
899 return _storage_context
.can_sleep();
902 void reactor_backend_epoll::wait_and_process_events(const sigset_t
* active_sigmask
) {
903 wait_and_process(-1 , active_sigmask
);
907 void reactor_backend_epoll::complete_epoll_event(pollable_fd_state
& pfd
, int events
, int event
) {
908 if (pfd
.events_requested
& events
& event
) {
909 pfd
.events_requested
&= ~event
;
910 pfd
.events_known
&= ~event
;
911 auto* fd
= static_cast<epoll_pollable_fd_state
*>(&pfd
);
912 return fd
->complete_with(event
);
916 void reactor_backend_epoll::signal_received(int signo
, siginfo_t
* siginfo
, void* ignore
) {
917 if (engine_is_ready()) {
918 engine()._signals
.action(signo
, siginfo
, ignore
);
920 reactor::signals::failed_to_handle(signo
);
924 future
<> reactor_backend_epoll::get_epoll_future(pollable_fd_state
& pfd
, int event
) {
925 if (pfd
.events_known
& event
) {
926 pfd
.events_known
&= ~event
;
927 return make_ready_future();
929 pfd
.events_rw
= event
== (EPOLLIN
| EPOLLOUT
);
930 pfd
.events_requested
|= event
;
931 if ((pfd
.events_epoll
& event
) != event
) {
932 auto ctl
= pfd
.events_epoll
? EPOLL_CTL_MOD
: EPOLL_CTL_ADD
;
933 pfd
.events_epoll
|= event
;
935 eevt
.events
= pfd
.events_epoll
;
936 eevt
.data
.ptr
= &pfd
;
937 int r
= ::epoll_ctl(_epollfd
.get(), ctl
, pfd
.fd
.get(), &eevt
);
939 _need_epoll_events
= true;
942 auto* fd
= static_cast<epoll_pollable_fd_state
*>(&pfd
);
943 return fd
->get_completion_future(event
);
946 future
<> reactor_backend_epoll::readable(pollable_fd_state
& fd
) {
947 return get_epoll_future(fd
, EPOLLIN
);
950 future
<> reactor_backend_epoll::writeable(pollable_fd_state
& fd
) {
951 return get_epoll_future(fd
, EPOLLOUT
);
954 future
<> reactor_backend_epoll::readable_or_writeable(pollable_fd_state
& fd
) {
955 return get_epoll_future(fd
, EPOLLIN
| EPOLLOUT
);
958 void reactor_backend_epoll::forget(pollable_fd_state
& fd
) noexcept
{
959 if (fd
.events_epoll
) {
960 ::epoll_ctl(_epollfd
.get(), EPOLL_CTL_DEL
, fd
.fd
.get(), nullptr);
962 auto* efd
= static_cast<epoll_pollable_fd_state
*>(&fd
);
966 future
<std::tuple
<pollable_fd
, socket_address
>>
967 reactor_backend_epoll::accept(pollable_fd_state
& listenfd
) {
968 return engine().do_accept(listenfd
);
971 future
<> reactor_backend_epoll::connect(pollable_fd_state
& fd
, socket_address
& sa
) {
972 return engine().do_connect(fd
, sa
);
975 void reactor_backend_epoll::shutdown(pollable_fd_state
& fd
, int how
) {
980 reactor_backend_epoll::read_some(pollable_fd_state
& fd
, void* buffer
, size_t len
) {
981 return engine().do_read_some(fd
, buffer
, len
);
985 reactor_backend_epoll::read_some(pollable_fd_state
& fd
, const std::vector
<iovec
>& iov
) {
986 return engine().do_read_some(fd
, iov
);
989 future
<temporary_buffer
<char>>
990 reactor_backend_epoll::read_some(pollable_fd_state
& fd
, internal::buffer_allocator
* ba
) {
991 return engine().do_read_some(fd
, ba
);
995 reactor_backend_epoll::write_some(pollable_fd_state
& fd
, const void* buffer
, size_t len
) {
996 return engine().do_write_some(fd
, buffer
, len
);
1000 reactor_backend_epoll::write_some(pollable_fd_state
& fd
, net::packet
& p
) {
1001 return engine().do_write_some(fd
, p
);
1005 reactor_backend_epoll::request_preemption() {
1006 _r
._preemption_monitor
.head
.store(1, std::memory_order_relaxed
);
1009 void reactor_backend_epoll::start_handling_signal() {
1010 // The epoll backend uses signals for the high resolution timer. That is used for thread_scheduling_group, so we
1011 // request preemption so when we receive a signal.
1012 request_preemption();
1015 pollable_fd_state_ptr
1016 reactor_backend_epoll::make_pollable_fd_state(file_desc fd
, pollable_fd::speculation speculate
) {
1017 return pollable_fd_state_ptr(new epoll_pollable_fd_state(std::move(fd
), std::move(speculate
)));
1020 void reactor_backend_epoll::reset_preemption_monitor() {
1021 _r
._preemption_monitor
.head
.store(0, std::memory_order_relaxed
);
1025 reactor_backend_osv::reactor_backend_osv() {
1029 reactor_backend_osv::reap_kernel_completions() {
1031 // osv::poller::process runs pollable's callbacks, but does not currently
1032 // have a timer expiration callback - instead if gives us an expired()
1033 // function we need to check:
1034 if (_poller
.expired()) {
1035 _timer_promise
.set_value();
1036 _timer_promise
= promise
<>();
1041 reactor_backend_osv::kernel_submit_work() {
1045 reactor_backend_osv::wait_and_process_events(const sigset_t
* sigset
) {
1046 return process_events_nowait();
1050 reactor_backend_osv::readable(pollable_fd_state
& fd
) {
1051 std::cerr
<< "reactor_backend_osv does not support file descriptors - readable() shouldn't have been called!\n";
1056 reactor_backend_osv::writeable(pollable_fd_state
& fd
) {
1057 std::cerr
<< "reactor_backend_osv does not support file descriptors - writeable() shouldn't have been called!\n";
1062 reactor_backend_osv::forget(pollable_fd_state
& fd
) noexcept
{
1063 std::cerr
<< "reactor_backend_osv does not support file descriptors - forget() shouldn't have been called!\n";
1067 future
<std::tuple
<pollable_fd
, socket_address
>>
1068 reactor_backend_osv::accept(pollable_fd_state
& listenfd
) {
1069 return engine().do_accept(listenfd
);
1072 future
<> reactor_backend_osv::connect(pollable_fd_state
& fd
, socket_address
& sa
) {
1073 return engine().do_connect(fd
, sa
);
1076 void reactor_backend_osv::shutdown(pollable_fd_state
& fd
, int how
) {
1077 fd
.fd
.shutdown(how
);
1081 reactor_backend_osv::read_some(pollable_fd_state
& fd
, void* buffer
, size_t len
) {
1082 return engine().do_read_some(fd
, buffer
, len
);
1086 reactor_backend_osv::read_some(pollable_fd_state
& fd
, const std::vector
<iovec
>& iov
) {
1087 return engine().do_read_some(fd
, iov
);
1090 future
<temporary_buffer
<char>>
1091 reactor_backend_osv::read_some(pollable_fd_state
& fd
, internal::buffer_allocator
* ba
) {
1092 return engine().do_read_some(fd
, ba
);
1096 reactor_backend_osv::write_some(pollable_fd_state
& fd
, const void* buffer
, size_t len
) {
1097 return engine().do_write_some(fd
, buffer
, len
);
1101 reactor_backend_osv::write_some(pollable_fd_state
& fd
, net::packet
& p
) {
1102 return engine().do_write_some(fd
, p
);
1106 reactor_backend_osv::enable_timer(steady_clock_type::time_point when
) {
1107 _poller
.set_timer(when
);
1110 pollable_fd_state_ptr
1111 reactor_backend_osv::make_pollable_fd_state(file_desc fd
, pollable_fd::speculation speculate
) {
1112 std::cerr
<< "reactor_backend_osv does not support file descriptors - make_pollable_fd_state() shouldn't have been called!\n";
1117 static bool detect_aio_poll() {
1118 auto fd
= file_desc::eventfd(0, 0);
1119 aio_context_t ioc
{};
1120 setup_aio_context(1, &ioc
);
1121 auto cleanup
= defer([&] () noexcept
{ io_destroy(ioc
); });
1122 linux_abi::iocb iocb
= internal::make_poll_iocb(fd
.get(), POLLIN
|POLLOUT
);
1123 linux_abi::iocb
* a
[1] = { &iocb
};
1124 auto r
= io_submit(ioc
, 1, a
);
1131 // We set force_syscall = true (the last parameter) to ensure
1132 // the system call exists and is usable. If IOCB_CMD_POLL exists then
1133 // io_pgetevents() will also exist, but some versions of docker
1134 // have a syscall whitelist that does not include io_pgetevents(),
1135 // which causes it to fail with -EPERM. See
1136 // https://github.com/moby/moby/issues/38894.
1137 r
= io_pgetevents(ioc
, 1, 1, ev
, nullptr, nullptr, true);
1141 bool reactor_backend_selector::has_enough_aio_nr() {
1142 auto aio_max_nr
= read_first_line_as
<unsigned>("/proc/sys/fs/aio-max-nr");
1143 auto aio_nr
= read_first_line_as
<unsigned>("/proc/sys/fs/aio-nr");
1144 /* reactor_backend_selector::available() will be execute in early stage,
1145 * it's before io_setup() issued, and not per-cpu basis.
1146 * So this method calculates:
1147 * Available AIO on the system - (request AIO per-cpu * ncpus)
1149 if (aio_max_nr
- aio_nr
< reactor::max_aio
* smp::count
) {
1155 std::unique_ptr
<reactor_backend
> reactor_backend_selector::create(reactor
& r
) {
1156 if (_name
== "linux-aio") {
1157 return std::make_unique
<reactor_backend_aio
>(r
);
1158 } else if (_name
== "epoll") {
1159 return std::make_unique
<reactor_backend_epoll
>(r
);
1161 throw std::logic_error("bad reactor backend");
1164 reactor_backend_selector
reactor_backend_selector::default_backend() {
1165 return available()[0];
1168 std::vector
<reactor_backend_selector
> reactor_backend_selector::available() {
1169 std::vector
<reactor_backend_selector
> ret
;
1170 if (detect_aio_poll() && has_enough_aio_nr()) {
1171 ret
.push_back(reactor_backend_selector("linux-aio"));
1173 ret
.push_back(reactor_backend_selector("epoll"));