]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/src/core/reactor_backend.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / core / reactor_backend.cc
index a159dfd2e26e693662d75cd20c2b7b5652a24438..4a1bca1afcb3ffd2ba739098f49ecd8f9c1e2bb2 100644 (file)
@@ -52,32 +52,32 @@ public:
     }
 };
 
-void prepare_iocb(io_request& req, iocb& iocb) {
+void prepare_iocb(io_request& req, io_completion* desc, iocb& iocb) {
     switch (req.opcode()) {
     case io_request::operation::fdatasync:
         iocb = make_fdsync_iocb(req.fd());
         break;
     case io_request::operation::write:
         iocb = make_write_iocb(req.fd(), req.pos(), req.address(), req.size());
-        set_nowait(iocb, true);
+        set_nowait(iocb, req.nowait_works());
         break;
     case io_request::operation::writev:
         iocb = make_writev_iocb(req.fd(), req.pos(), req.iov(), req.size());
-        set_nowait(iocb, true);
+        set_nowait(iocb, req.nowait_works());
         break;
     case io_request::operation::read:
         iocb = make_read_iocb(req.fd(), req.pos(), req.address(), req.size());
-        set_nowait(iocb, true);
+        set_nowait(iocb, req.nowait_works());
         break;
     case io_request::operation::readv:
         iocb = make_readv_iocb(req.fd(), req.pos(), req.iov(), req.size());
-        set_nowait(iocb, true);
+        set_nowait(iocb, req.nowait_works());
         break;
     default:
         seastar_logger.error("Invalid operation for iocb: {}", req.opname());
         std::abort();
     }
-    set_user_data(iocb, req.get_kernel_completion());
+    set_user_data(iocb, desc);
 }
 
 aio_storage_context::iocb_pool::iocb_pool() {
@@ -86,18 +86,28 @@ aio_storage_context::iocb_pool::iocb_pool() {
     }
 }
 
-aio_storage_context::aio_storage_context(reactor* r)
+aio_storage_context::aio_storage_context(reactor& r)
     : _r(r)
     , _io_context(0) {
     static_assert(max_aio >= reactor::max_queues * reactor::max_queues,
                   "Mismatch between maximum allowed io and what the IO queues can produce");
     internal::setup_aio_context(max_aio, &_io_context);
+    _r.at_exit([this] { return stop(); });
 }
 
 aio_storage_context::~aio_storage_context() {
     internal::io_destroy(_io_context);
 }
 
+future<> aio_storage_context::stop() noexcept {
+    return std::exchange(_pending_aio_retry_fut, make_ready_future<>()).finally([this] {
+        return do_until([this] { return !_iocb_pool.outstanding(); }, [this] {
+            reap_completions(false);
+            return make_ready_future<>();
+        });
+    });
+}
+
 inline
 internal::linux_abi::iocb&
 aio_storage_context::iocb_pool::get_one() {
@@ -139,7 +149,7 @@ aio_storage_context::handle_aio_error(linux_abi::iocb* iocb, int ec) {
             return 1;
         }
         default:
-            ++_r->_io_stats.aio_errors;
+            ++_r._io_stats.aio_errors;
             throw_system_error_on(true, "io_submit");
             abort();
     }
@@ -149,20 +159,36 @@ extern bool aio_nowait_supported;
 
 bool
 aio_storage_context::submit_work() {
-    size_t pending = _r->_pending_io.size();
-    size_t to_submit = 0;
     bool did_work = false;
 
     _submission_queue.resize(0);
-    while ((pending > to_submit) && _iocb_pool.has_capacity()) {
-        auto& req = _r->_pending_io[to_submit++];
+    size_t to_submit = _r._io_sink.drain([this] (internal::io_request& req, io_completion* desc) -> bool {
+        if (!_iocb_pool.has_capacity()) {
+            return false;
+        }
+
         auto& io = _iocb_pool.get_one();
-        prepare_iocb(req, io);
+        prepare_iocb(req, desc, io);
 
-        if (_r->_aio_eventfd) {
-            set_eventfd_notification(io, _r->_aio_eventfd->get_fd());
+        if (_r._aio_eventfd) {
+            set_eventfd_notification(io, _r._aio_eventfd->get_fd());
         }
         _submission_queue.push_back(&io);
+        return true;
+    });
+
+    if (__builtin_expect(_r._kernel_page_cache, false)) {
+        // linux-aio is not asynchrous when the page cache is used,
+        // so we don't want to call io_submit() from the reactor thread.
+        //
+        // Pretend that all aio failed with EAGAIN and submit them
+        // via schedule_retry(), below.
+        did_work = !_submission_queue.empty();
+        for (auto& iocbp : _submission_queue) {
+            set_nowait(*iocbp, false);
+            _pending_aio_retry.push_back(iocbp);
+        }
+        to_submit = 0;
     }
 
     size_t submitted = 0;
@@ -179,46 +205,68 @@ aio_storage_context::submit_work() {
         did_work = true;
         submitted += nr_consumed;
     }
-    _r->_pending_io.erase(_r->_pending_io.begin(), _r->_pending_io.begin() + submitted);
 
-    if (!_pending_aio_retry.empty()) {
+    if (need_to_retry() && !retry_in_progress()) {
         schedule_retry();
-        did_work = true;
     }
 
     return did_work;
 }
 
 void aio_storage_context::schedule_retry() {
-    // FIXME: future is discarded
-    (void)do_with(std::exchange(_pending_aio_retry, {}), [this](pending_aio_retry_t& retries){
-        return _r->_thread_pool->submit<syscall_result<int>>([this, &retries] () mutable {
-            auto r = io_submit(_io_context, retries.size(), retries.data());
+    // loop until both _pending_aio_retry and _aio_retries are empty.
+    // While retrying _aio_retries, new retries may be queued onto _pending_aio_retry.
+    _pending_aio_retry_fut = do_until([this] {
+        if (_aio_retries.empty()) {
+            if (_pending_aio_retry.empty()) {
+                return true;
+            }
+            // _pending_aio_retry, holding a batch of new iocbs to retry,
+            // is swapped with the empty _aio_retries.
+            std::swap(_aio_retries, _pending_aio_retry);
+        }
+        return false;
+    }, [this] {
+        return _r._thread_pool->submit<syscall_result<int>>([this] () mutable {
+            auto r = io_submit(_io_context, _aio_retries.size(), _aio_retries.data());
             return wrap_syscall<int>(r);
-        }).then([this, &retries] (syscall_result<int> result) {
-            auto iocbs = retries.data();
+        }).then_wrapped([this] (future<syscall_result<int>> f) {
+            // If submit failed, just log the error and exit the loop.
+            // The next call to submit_work will call schedule_retry again.
+            if (f.failed()) {
+                auto ex = f.get_exception();
+                seastar_logger.warn("aio_storage_context::schedule_retry failed: {}", std::move(ex));
+                return;
+            }
+            auto result = f.get0();
+            auto iocbs = _aio_retries.data();
             size_t nr_consumed = 0;
             if (result.result == -1) {
-                nr_consumed = handle_aio_error(iocbs[0], result.error);
+                try {
+                    nr_consumed = handle_aio_error(iocbs[0], result.error);
+                } catch (...) {
+                    seastar_logger.error("aio retry failed: {}. Aborting.", std::current_exception());
+                    abort();
+                }
             } else {
                 nr_consumed = result.result;
             }
-            std::copy(retries.begin() + nr_consumed, retries.end(), std::back_inserter(_pending_aio_retry));
+            _aio_retries.erase(_aio_retries.begin(), _aio_retries.begin() + nr_consumed);
         });
     });
 }
 
-bool aio_storage_context::reap_completions()
+bool aio_storage_context::reap_completions(bool allow_retry)
 {
     struct timespec timeout = {0, 0};
-    auto n = io_getevents(_io_context, 1, max_aio, _ev_buffer, &timeout, _r->_force_io_getevents_syscall);
+    auto n = io_getevents(_io_context, 1, max_aio, _ev_buffer, &timeout, _r._force_io_getevents_syscall);
     if (n == -1 && errno == EINTR) {
         n = 0;
     }
     assert(n >= 0);
     for (size_t i = 0; i < size_t(n); ++i) {
         auto iocb = get_iocb(_ev_buffer[i]);
-        if (_ev_buffer[i].res == -EAGAIN) {
+        if (_ev_buffer[i].res == -EAGAIN && allow_retry) {
             set_nowait(*iocb, false);
             _pending_aio_retry.push_back(iocb);
             continue;
@@ -236,10 +284,14 @@ bool aio_storage_context::can_sleep() const {
     //
     // Alternatively, if we enabled _aio_eventfd, we can always enter
     unsigned executing = _iocb_pool.outstanding();
-    return executing == 0 || _r->_aio_eventfd;
+    return executing == 0 || _r._aio_eventfd;
 }
 
-aio_general_context::aio_general_context(size_t nr) : iocbs(new iocb*[nr]) {
+aio_general_context::aio_general_context(size_t nr)
+        : iocbs(new iocb*[nr])
+        , last(iocbs.get())
+        , end(iocbs.get() + nr)
+{
     setup_aio_context(nr, &io_context);
 }
 
@@ -248,18 +300,30 @@ aio_general_context::~aio_general_context() {
 }
 
 void aio_general_context::queue(linux_abi::iocb* iocb) {
+    assert(last < end);
     *last++ = iocb;
 }
 
 size_t aio_general_context::flush() {
-    if (last != iocbs.get()) {
-        auto nr = last - iocbs.get();
-        last = iocbs.get();
-        auto r = io_submit(io_context, nr, iocbs.get());
-        assert(r >= 0);
-        return nr;
+    auto begin = iocbs.get();
+    auto retried = last;
+    while (begin != last) {
+        auto r = io_submit(io_context, last - begin, begin);
+        if (__builtin_expect(r > 0, true)) {
+            begin += r;
+            continue;
+        }
+        // errno == EAGAIN is expected here. We don't explicitly assert that
+        // since the assert below requires that some progress will be
+        // made, preventing an endless loop for any reason.
+        if (need_preempt()) {
+            assert(retried != begin);
+            retried = begin;
+        }
     }
-    return 0;
+    auto nr = last - iocbs.get();
+    last = iocbs.get();
+    return nr;
 }
 
 completion_with_iocb::completion_with_iocb(int fd, int events, void* user_data)
@@ -274,16 +338,17 @@ void completion_with_iocb::maybe_queue(aio_general_context& context) {
     }
 }
 
-hrtimer_aio_completion::hrtimer_aio_completion(reactor* r, file_desc& fd)
-    : fd_kernel_completion(r, fd)
-    , completion_with_iocb(fd.get(), POLLIN, this) {}
+hrtimer_aio_completion::hrtimer_aio_completion(reactor& r, file_desc& fd)
+    : fd_kernel_completion(fd)
+    , completion_with_iocb(fd.get(), POLLIN, this)
+    , _r(r) {}
 
-task_quota_aio_completion::task_quota_aio_completion(reactor* r, file_desc& fd)
-    : fd_kernel_completion(r, fd)
+task_quota_aio_completion::task_quota_aio_completion(file_desc& fd)
+    : fd_kernel_completion(fd)
     , completion_with_iocb(fd.get(), POLLIN, this) {}
 
-smp_wakeup_aio_completion::smp_wakeup_aio_completion(reactor* r, file_desc& fd)
-        : fd_kernel_completion(r, fd)
+smp_wakeup_aio_completion::smp_wakeup_aio_completion(file_desc& fd)
+        : fd_kernel_completion(fd)
         , completion_with_iocb(fd.get(), POLLIN, this) {}
 
 void
@@ -291,7 +356,7 @@ hrtimer_aio_completion::complete_with(ssize_t ret) {
     uint64_t expirations = 0;
     (void)_fd.read(&expirations, 8);
     if (expirations) {
-        _r->service_highres_timer();
+        _r.service_highres_timer();
     }
     completion_with_iocb::completed();
 }
@@ -310,21 +375,21 @@ smp_wakeup_aio_completion::complete_with(ssize_t ret) {
     completion_with_iocb::completed();
 }
 
-preempt_io_context::preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer)
+preempt_io_context::preempt_io_context(reactor& r, file_desc& task_quota, file_desc& hrtimer)
     : _r(r)
-    , _task_quota_aio_completion(r, task_quota)
+    , _task_quota_aio_completion(task_quota)
     , _hrtimer_aio_completion(r, hrtimer)
 {}
 
 void preempt_io_context::start_tick() {
     // Preempt whenever an event (timer tick or signal) is available on the
     // _preempting_io ring
-    g_need_preempt = reinterpret_cast<const preemption_monitor*>(_context.io_context + 8);
+    set_need_preempt_var(reinterpret_cast<const preemption_monitor*>(_context.io_context + 8));
     // preempt_io_context::request_preemption() will write to reactor::_preemption_monitor, which is now ignored
 }
 
 void preempt_io_context::stop_tick() {
-    g_need_preempt = &_r->_preemption_monitor;
+    set_need_preempt_var(&_r._preemption_monitor);
 }
 
 void preempt_io_context::request_preemption() {
@@ -367,6 +432,11 @@ file_desc reactor_backend_aio::make_timerfd() {
     return file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC|TFD_NONBLOCK);
 }
 
+unsigned
+reactor_backend_aio::max_polls() const {
+    return _r._cfg.max_networking_aio_io_control_blocks;
+}
+
 bool reactor_backend_aio::await_events(int timeout, const sigset_t* active_sigmask) {
     ::timespec ts = {};
     ::timespec* tsp = [&] () -> ::timespec* {
@@ -406,17 +476,17 @@ void reactor_backend_aio::signal_received(int signo, siginfo_t* siginfo, void* i
     engine()._signals.action(signo, siginfo, ignore);
 }
 
-reactor_backend_aio::reactor_backend_aio(reactor* r)
+reactor_backend_aio::reactor_backend_aio(reactor& r)
     : _r(r)
     , _hrtimer_timerfd(make_timerfd())
     , _storage_context(_r)
-    , _preempting_io(_r, _r->_task_quota_timer, _hrtimer_timerfd)
+    , _preempting_io(_r, _r._task_quota_timer, _hrtimer_timerfd)
     , _hrtimer_poll_completion(_r, _hrtimer_timerfd)
-    , _smp_wakeup_aio_completion(_r, _r->_notify_eventfd)
+    , _smp_wakeup_aio_completion(_r._notify_eventfd)
 {
     // Protect against spurious wakeups - if we get notified that the timer has
     // expired when it really hasn't, we don't want to block in read(tfd, ...).
-    auto tfd = _r->_task_quota_timer.get();
+    auto tfd = _r._task_quota_timer.get();
     ::fcntl(tfd, F_SETFL, ::fcntl(tfd, F_GETFL) | O_NONBLOCK);
 
     sigset_t mask = make_sigset_mask(hrtimer_signal());
@@ -590,56 +660,130 @@ reactor_backend_aio::make_pollable_fd_state(file_desc fd, pollable_fd::speculati
     return pollable_fd_state_ptr(new aio_pollable_fd_state(std::move(fd), std::move(speculate)));
 }
 
-reactor_backend_epoll::reactor_backend_epoll(reactor* r)
+reactor_backend_epoll::reactor_backend_epoll(reactor& r)
         : _r(r)
+        , _steady_clock_timer_reactor_thread(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC))
+        , _steady_clock_timer_timer_thread(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC))
         , _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC))
         , _storage_context(_r) {
     ::epoll_event event;
     event.events = EPOLLIN;
     event.data.ptr = nullptr;
-    auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r->_notify_eventfd.get(), &event);
+    auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r._notify_eventfd.get(), &event);
+    throw_system_error_on(ret == -1);
+    event.events = EPOLLIN;
+    event.data.ptr = &_steady_clock_timer_reactor_thread;
+    ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _steady_clock_timer_reactor_thread.get(), &event);
     throw_system_error_on(ret == -1);
+}
+
+void
+reactor_backend_epoll::task_quota_timer_thread_fn() {
+    auto thread_name = seastar::format("timer-{}", _r._id);
+    pthread_setname_np(pthread_self(), thread_name.c_str());
+
+    sigset_t mask;
+    sigfillset(&mask);
+    for (auto sig : { SIGSEGV }) {
+        sigdelset(&mask, sig);
+    }
+    auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
+    if (r) {
+        seastar_logger.error("Thread {}: failed to block signals. Aborting.", thread_name.c_str());
+        abort();
+    }
 
-    struct sigevent sev{};
-    sev.sigev_notify = SIGEV_THREAD_ID;
-    sev._sigev_un._tid = syscall(SYS_gettid);
-    sev.sigev_signo = hrtimer_signal();
-    ret = timer_create(CLOCK_MONOTONIC, &sev, &_steady_clock_timer);
-    assert(ret >= 0);
+    // We need to wait until task quota is set before we can calculate how many ticks are to
+    // a minute. Technically task_quota is used from many threads, but since it is read-only here
+    // and only used during initialization we will avoid complicating the code.
+    {
+        uint64_t events;
+        _r._task_quota_timer.read(&events, 8);
+        _r.request_preemption();
+    }
 
-    _r->_signals.handle_signal(hrtimer_signal(), [r = _r] {
-        r->service_highres_timer();
-    });
-}
+    while (!_r._dying.load(std::memory_order_relaxed)) {
+        // Wait for either the task quota timer, or the high resolution timer, or both,
+        // to expire.
+        struct pollfd pfds[2] = {};
+        pfds[0].fd = _r._task_quota_timer.get();
+        pfds[0].events = POLL_IN;
+        pfds[1].fd = _steady_clock_timer_timer_thread.get();
+        pfds[1].events = POLL_IN;
+        int r = poll(pfds, 2, -1);
+        assert(r != -1);
 
-reactor_backend_epoll::~reactor_backend_epoll() {
-    timer_delete(_steady_clock_timer);
+        uint64_t events;
+        if (pfds[0].revents & POLL_IN) {
+            _r._task_quota_timer.read(&events, 8);
+        }
+        if (pfds[1].revents & POLL_IN) {
+            _steady_clock_timer_timer_thread.read(&events, 8);
+            _highres_timer_pending.store(true, std::memory_order_relaxed);
+        }
+        _r.request_preemption();
+
+        // We're in a different thread, but guaranteed to be on the same core, so even
+        // a signal fence is overdoing it
+        std::atomic_signal_fence(std::memory_order_seq_cst);
+    }
 }
 
+reactor_backend_epoll::~reactor_backend_epoll() = default;
+
 void reactor_backend_epoll::start_tick() {
-    _task_quota_timer_thread = std::thread(&reactor::task_quota_timer_thread_fn, _r);
+    _task_quota_timer_thread = std::thread(&reactor_backend_epoll::task_quota_timer_thread_fn, this);
 
     ::sched_param sp;
     sp.sched_priority = 1;
     auto sched_ok = pthread_setschedparam(_task_quota_timer_thread.native_handle(), SCHED_FIFO, &sp);
-    if (sched_ok != 0 && _r->_id == 0) {
+    if (sched_ok != 0 && _r._id == 0) {
         seastar_logger.warn("Unable to set SCHED_FIFO scheduling policy for timer thread; latency impact possible. Try adding CAP_SYS_NICE");
     }
 }
 
 void reactor_backend_epoll::stop_tick() {
-    _r->_dying.store(true, std::memory_order_relaxed);
-    _r->_task_quota_timer.timerfd_settime(0, seastar::posix::to_relative_itimerspec(1ns, 1ms)); // Make the timer fire soon
+    _r._dying.store(true, std::memory_order_relaxed);
+    _r._task_quota_timer.timerfd_settime(0, seastar::posix::to_relative_itimerspec(1ns, 1ms)); // Make the timer fire soon
     _task_quota_timer_thread.join();
 }
 
 void reactor_backend_epoll::arm_highres_timer(const ::itimerspec& its) {
-    auto ret = timer_settime(_steady_clock_timer, TIMER_ABSTIME, &its, NULL);
-    throw_system_error_on(ret == -1);
+    _steady_clock_timer_deadline = its;
+    _steady_clock_timer_timer_thread.timerfd_settime(TFD_TIMER_ABSTIME, its);
+}
+
+void
+reactor_backend_epoll::switch_steady_clock_timers(file_desc& from, file_desc& to) {
+    auto& deadline = _steady_clock_timer_deadline;
+    if (deadline.it_value.tv_sec == 0 && deadline.it_value.tv_nsec == 0) {
+        return;
+    }
+    // Enable-then-disable, so the hardware timer doesn't have to be reprogrammed. Probably pointless.
+    to.timerfd_settime(TFD_TIMER_ABSTIME, _steady_clock_timer_deadline);
+    from.timerfd_settime(TFD_TIMER_ABSTIME, {});
+}
+
+void reactor_backend_epoll::maybe_switch_steady_clock_timers(int timeout, file_desc& from, file_desc& to) {
+    if (timeout != 0) {
+        switch_steady_clock_timers(from, to);
+    }
 }
 
 bool
 reactor_backend_epoll::wait_and_process(int timeout, const sigset_t* active_sigmask) {
+    // If we plan to sleep, disable the timer thread steady clock timer (since it won't
+    // wake us up from sleep, and timer thread wakeup will just waste CPU time) and enable
+    // reactor thread steady clock timer.
+    maybe_switch_steady_clock_timers(timeout, _steady_clock_timer_timer_thread, _steady_clock_timer_reactor_thread);
+    auto undo_timer_switch = defer([&] () noexcept {
+      try {
+        maybe_switch_steady_clock_timers(timeout, _steady_clock_timer_reactor_thread, _steady_clock_timer_timer_thread);
+      } catch (...) {
+        seastar_logger.error("Switching steady_clock timers back failed: {}. Aborting...", std::current_exception());
+        abort();
+      }
+    });
     std::array<epoll_event, 128> eevt;
     int nr = ::epoll_pwait(_epollfd.get(), eevt.data(), eevt.size(), timeout, active_sigmask);
     if (nr == -1 && errno == EINTR) {
@@ -651,7 +795,14 @@ reactor_backend_epoll::wait_and_process(int timeout, const sigset_t* active_sigm
         auto pfd = reinterpret_cast<pollable_fd_state*>(evt.data.ptr);
         if (!pfd) {
             char dummy[8];
-            _r->_notify_eventfd.read(dummy, 8);
+            _r._notify_eventfd.read(dummy, 8);
+            continue;
+        }
+        if (evt.data.ptr == &_steady_clock_timer_reactor_thread) {
+            char dummy[8];
+            _steady_clock_timer_reactor_thread.read(dummy, 8);
+            _highres_timer_pending.store(true, std::memory_order_relaxed);
+            _steady_clock_timer_deadline = {};
             continue;
         }
         if (evt.events & (EPOLLHUP | EPOLLERR)) {
@@ -722,9 +873,24 @@ bool reactor_backend_epoll::reap_kernel_completions() {
 }
 
 bool reactor_backend_epoll::kernel_submit_work() {
+    bool result = false;
     _storage_context.submit_work();
     if (_need_epoll_events) {
-        return wait_and_process(0, nullptr);
+        result |= wait_and_process(0, nullptr);
+    }
+
+    result |= complete_hrtimer();
+
+    return result;
+}
+
+bool reactor_backend_epoll::complete_hrtimer() {
+    // This can be set from either the task quota timer thread, or
+    // wait_and_process(), above.
+    if (_highres_timer_pending.load(std::memory_order_relaxed)) {
+        _highres_timer_pending.store(false, std::memory_order_relaxed);
+        _r.service_highres_timer();
+        return true;
     }
     return false;
 }
@@ -735,6 +901,7 @@ bool reactor_backend_epoll::kernel_events_can_sleep() const {
 
 void reactor_backend_epoll::wait_and_process_events(const sigset_t* active_sigmask) {
     wait_and_process(-1 , active_sigmask);
+    complete_hrtimer();
 }
 
 void reactor_backend_epoll::complete_epoll_event(pollable_fd_state& pfd, int events, int event) {
@@ -836,7 +1003,7 @@ reactor_backend_epoll::write_some(pollable_fd_state& fd, net::packet& p) {
 
 void
 reactor_backend_epoll::request_preemption() {
-    _r->_preemption_monitor.head.store(1, std::memory_order_relaxed);
+    _r._preemption_monitor.head.store(1, std::memory_order_relaxed);
 }
 
 void reactor_backend_epoll::start_handling_signal() {
@@ -851,7 +1018,7 @@ reactor_backend_epoll::make_pollable_fd_state(file_desc fd, pollable_fd::specula
 }
 
 void reactor_backend_epoll::reset_preemption_monitor() {
-    _r->_preemption_monitor.head.store(0, std::memory_order_relaxed);
+    _r._preemption_monitor.head.store(0, std::memory_order_relaxed);
 }
 
 #ifdef HAVE_OSV
@@ -951,7 +1118,7 @@ static bool detect_aio_poll() {
     auto fd = file_desc::eventfd(0, 0);
     aio_context_t ioc{};
     setup_aio_context(1, &ioc);
-    auto cleanup = defer([&] { io_destroy(ioc); });
+    auto cleanup = defer([&] () noexcept { io_destroy(ioc); });
     linux_abi::iocb iocb = internal::make_poll_iocb(fd.get(), POLLIN|POLLOUT);
     linux_abi::iocb* a[1] = { &iocb };
     auto r = io_submit(ioc, 1, a);
@@ -985,7 +1152,7 @@ bool reactor_backend_selector::has_enough_aio_nr() {
     return true;
 }
 
-std::unique_ptr<reactor_backend> reactor_backend_selector::create(reactor* r) {
+std::unique_ptr<reactor_backend> reactor_backend_selector::create(reactor& r) {
     if (_name == "linux-aio") {
         return std::make_unique<reactor_backend_aio>(r);
     } else if (_name == "epoll") {