}
};
-void prepare_iocb(io_request& req, iocb& iocb) {
+void prepare_iocb(io_request& req, io_completion* desc, iocb& iocb) {
switch (req.opcode()) {
case io_request::operation::fdatasync:
iocb = make_fdsync_iocb(req.fd());
break;
case io_request::operation::write:
iocb = make_write_iocb(req.fd(), req.pos(), req.address(), req.size());
- set_nowait(iocb, true);
+ set_nowait(iocb, req.nowait_works());
break;
case io_request::operation::writev:
iocb = make_writev_iocb(req.fd(), req.pos(), req.iov(), req.size());
- set_nowait(iocb, true);
+ set_nowait(iocb, req.nowait_works());
break;
case io_request::operation::read:
iocb = make_read_iocb(req.fd(), req.pos(), req.address(), req.size());
- set_nowait(iocb, true);
+ set_nowait(iocb, req.nowait_works());
break;
case io_request::operation::readv:
iocb = make_readv_iocb(req.fd(), req.pos(), req.iov(), req.size());
- set_nowait(iocb, true);
+ set_nowait(iocb, req.nowait_works());
break;
default:
seastar_logger.error("Invalid operation for iocb: {}", req.opname());
std::abort();
}
- set_user_data(iocb, req.get_kernel_completion());
+ set_user_data(iocb, desc);
}
aio_storage_context::iocb_pool::iocb_pool() {
}
}
-aio_storage_context::aio_storage_context(reactor* r)
+aio_storage_context::aio_storage_context(reactor& r)
: _r(r)
, _io_context(0) {
static_assert(max_aio >= reactor::max_queues * reactor::max_queues,
"Mismatch between maximum allowed io and what the IO queues can produce");
internal::setup_aio_context(max_aio, &_io_context);
+ _r.at_exit([this] { return stop(); });
}
aio_storage_context::~aio_storage_context() {
internal::io_destroy(_io_context);
}
+future<> aio_storage_context::stop() noexcept {
+ return std::exchange(_pending_aio_retry_fut, make_ready_future<>()).finally([this] {
+ return do_until([this] { return !_iocb_pool.outstanding(); }, [this] {
+ reap_completions(false);
+ return make_ready_future<>();
+ });
+ });
+}
+
inline
internal::linux_abi::iocb&
aio_storage_context::iocb_pool::get_one() {
return 1;
}
default:
- ++_r->_io_stats.aio_errors;
+ ++_r._io_stats.aio_errors;
throw_system_error_on(true, "io_submit");
abort();
}
bool
aio_storage_context::submit_work() {
- size_t pending = _r->_pending_io.size();
- size_t to_submit = 0;
bool did_work = false;
_submission_queue.resize(0);
- while ((pending > to_submit) && _iocb_pool.has_capacity()) {
- auto& req = _r->_pending_io[to_submit++];
+ size_t to_submit = _r._io_sink.drain([this] (internal::io_request& req, io_completion* desc) -> bool {
+ if (!_iocb_pool.has_capacity()) {
+ return false;
+ }
+
auto& io = _iocb_pool.get_one();
- prepare_iocb(req, io);
+ prepare_iocb(req, desc, io);
- if (_r->_aio_eventfd) {
- set_eventfd_notification(io, _r->_aio_eventfd->get_fd());
+ if (_r._aio_eventfd) {
+ set_eventfd_notification(io, _r._aio_eventfd->get_fd());
}
_submission_queue.push_back(&io);
+ return true;
+ });
+
+ if (__builtin_expect(_r._kernel_page_cache, false)) {
+ // linux-aio is not asynchrous when the page cache is used,
+ // so we don't want to call io_submit() from the reactor thread.
+ //
+ // Pretend that all aio failed with EAGAIN and submit them
+ // via schedule_retry(), below.
+ did_work = !_submission_queue.empty();
+ for (auto& iocbp : _submission_queue) {
+ set_nowait(*iocbp, false);
+ _pending_aio_retry.push_back(iocbp);
+ }
+ to_submit = 0;
}
size_t submitted = 0;
did_work = true;
submitted += nr_consumed;
}
- _r->_pending_io.erase(_r->_pending_io.begin(), _r->_pending_io.begin() + submitted);
- if (!_pending_aio_retry.empty()) {
+ if (need_to_retry() && !retry_in_progress()) {
schedule_retry();
- did_work = true;
}
return did_work;
}
void aio_storage_context::schedule_retry() {
- // FIXME: future is discarded
- (void)do_with(std::exchange(_pending_aio_retry, {}), [this](pending_aio_retry_t& retries){
- return _r->_thread_pool->submit<syscall_result<int>>([this, &retries] () mutable {
- auto r = io_submit(_io_context, retries.size(), retries.data());
+ // loop until both _pending_aio_retry and _aio_retries are empty.
+ // While retrying _aio_retries, new retries may be queued onto _pending_aio_retry.
+ _pending_aio_retry_fut = do_until([this] {
+ if (_aio_retries.empty()) {
+ if (_pending_aio_retry.empty()) {
+ return true;
+ }
+ // _pending_aio_retry, holding a batch of new iocbs to retry,
+ // is swapped with the empty _aio_retries.
+ std::swap(_aio_retries, _pending_aio_retry);
+ }
+ return false;
+ }, [this] {
+ return _r._thread_pool->submit<syscall_result<int>>([this] () mutable {
+ auto r = io_submit(_io_context, _aio_retries.size(), _aio_retries.data());
return wrap_syscall<int>(r);
- }).then([this, &retries] (syscall_result<int> result) {
- auto iocbs = retries.data();
+ }).then_wrapped([this] (future<syscall_result<int>> f) {
+ // If submit failed, just log the error and exit the loop.
+ // The next call to submit_work will call schedule_retry again.
+ if (f.failed()) {
+ auto ex = f.get_exception();
+ seastar_logger.warn("aio_storage_context::schedule_retry failed: {}", std::move(ex));
+ return;
+ }
+ auto result = f.get0();
+ auto iocbs = _aio_retries.data();
size_t nr_consumed = 0;
if (result.result == -1) {
- nr_consumed = handle_aio_error(iocbs[0], result.error);
+ try {
+ nr_consumed = handle_aio_error(iocbs[0], result.error);
+ } catch (...) {
+ seastar_logger.error("aio retry failed: {}. Aborting.", std::current_exception());
+ abort();
+ }
} else {
nr_consumed = result.result;
}
- std::copy(retries.begin() + nr_consumed, retries.end(), std::back_inserter(_pending_aio_retry));
+ _aio_retries.erase(_aio_retries.begin(), _aio_retries.begin() + nr_consumed);
});
});
}
-bool aio_storage_context::reap_completions()
+bool aio_storage_context::reap_completions(bool allow_retry)
{
struct timespec timeout = {0, 0};
- auto n = io_getevents(_io_context, 1, max_aio, _ev_buffer, &timeout, _r->_force_io_getevents_syscall);
+ auto n = io_getevents(_io_context, 1, max_aio, _ev_buffer, &timeout, _r._force_io_getevents_syscall);
if (n == -1 && errno == EINTR) {
n = 0;
}
assert(n >= 0);
for (size_t i = 0; i < size_t(n); ++i) {
auto iocb = get_iocb(_ev_buffer[i]);
- if (_ev_buffer[i].res == -EAGAIN) {
+ if (_ev_buffer[i].res == -EAGAIN && allow_retry) {
set_nowait(*iocb, false);
_pending_aio_retry.push_back(iocb);
continue;
//
// Alternatively, if we enabled _aio_eventfd, we can always enter
unsigned executing = _iocb_pool.outstanding();
- return executing == 0 || _r->_aio_eventfd;
+ return executing == 0 || _r._aio_eventfd;
}
-aio_general_context::aio_general_context(size_t nr) : iocbs(new iocb*[nr]) {
+aio_general_context::aio_general_context(size_t nr)
+ : iocbs(new iocb*[nr])
+ , last(iocbs.get())
+ , end(iocbs.get() + nr)
+{
setup_aio_context(nr, &io_context);
}
}
void aio_general_context::queue(linux_abi::iocb* iocb) {
+ assert(last < end);
*last++ = iocb;
}
size_t aio_general_context::flush() {
- if (last != iocbs.get()) {
- auto nr = last - iocbs.get();
- last = iocbs.get();
- auto r = io_submit(io_context, nr, iocbs.get());
- assert(r >= 0);
- return nr;
+ auto begin = iocbs.get();
+ auto retried = last;
+ while (begin != last) {
+ auto r = io_submit(io_context, last - begin, begin);
+ if (__builtin_expect(r > 0, true)) {
+ begin += r;
+ continue;
+ }
+ // errno == EAGAIN is expected here. We don't explicitly assert that
+ // since the assert below requires that some progress will be
+ // made, preventing an endless loop for any reason.
+ if (need_preempt()) {
+ assert(retried != begin);
+ retried = begin;
+ }
}
- return 0;
+ auto nr = last - iocbs.get();
+ last = iocbs.get();
+ return nr;
}
completion_with_iocb::completion_with_iocb(int fd, int events, void* user_data)
}
}
-hrtimer_aio_completion::hrtimer_aio_completion(reactor* r, file_desc& fd)
- : fd_kernel_completion(r, fd)
- , completion_with_iocb(fd.get(), POLLIN, this) {}
+hrtimer_aio_completion::hrtimer_aio_completion(reactor& r, file_desc& fd)
+ : fd_kernel_completion(fd)
+ , completion_with_iocb(fd.get(), POLLIN, this)
+ , _r(r) {}
-task_quota_aio_completion::task_quota_aio_completion(reactor* r, file_desc& fd)
- : fd_kernel_completion(r, fd)
+task_quota_aio_completion::task_quota_aio_completion(file_desc& fd)
+ : fd_kernel_completion(fd)
, completion_with_iocb(fd.get(), POLLIN, this) {}
-smp_wakeup_aio_completion::smp_wakeup_aio_completion(reactor* r, file_desc& fd)
- : fd_kernel_completion(r, fd)
+smp_wakeup_aio_completion::smp_wakeup_aio_completion(file_desc& fd)
+ : fd_kernel_completion(fd)
, completion_with_iocb(fd.get(), POLLIN, this) {}
void
uint64_t expirations = 0;
(void)_fd.read(&expirations, 8);
if (expirations) {
- _r->service_highres_timer();
+ _r.service_highres_timer();
}
completion_with_iocb::completed();
}
completion_with_iocb::completed();
}
-preempt_io_context::preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer)
+preempt_io_context::preempt_io_context(reactor& r, file_desc& task_quota, file_desc& hrtimer)
: _r(r)
- , _task_quota_aio_completion(r, task_quota)
+ , _task_quota_aio_completion(task_quota)
, _hrtimer_aio_completion(r, hrtimer)
{}
void preempt_io_context::start_tick() {
// Preempt whenever an event (timer tick or signal) is available on the
// _preempting_io ring
- g_need_preempt = reinterpret_cast<const preemption_monitor*>(_context.io_context + 8);
+ set_need_preempt_var(reinterpret_cast<const preemption_monitor*>(_context.io_context + 8));
// preempt_io_context::request_preemption() will write to reactor::_preemption_monitor, which is now ignored
}
void preempt_io_context::stop_tick() {
- g_need_preempt = &_r->_preemption_monitor;
+ set_need_preempt_var(&_r._preemption_monitor);
}
void preempt_io_context::request_preemption() {
return file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC|TFD_NONBLOCK);
}
+unsigned
+reactor_backend_aio::max_polls() const {
+ return _r._cfg.max_networking_aio_io_control_blocks;
+}
+
bool reactor_backend_aio::await_events(int timeout, const sigset_t* active_sigmask) {
::timespec ts = {};
::timespec* tsp = [&] () -> ::timespec* {
engine()._signals.action(signo, siginfo, ignore);
}
-reactor_backend_aio::reactor_backend_aio(reactor* r)
+reactor_backend_aio::reactor_backend_aio(reactor& r)
: _r(r)
, _hrtimer_timerfd(make_timerfd())
, _storage_context(_r)
- , _preempting_io(_r, _r->_task_quota_timer, _hrtimer_timerfd)
+ , _preempting_io(_r, _r._task_quota_timer, _hrtimer_timerfd)
, _hrtimer_poll_completion(_r, _hrtimer_timerfd)
- , _smp_wakeup_aio_completion(_r, _r->_notify_eventfd)
+ , _smp_wakeup_aio_completion(_r._notify_eventfd)
{
// Protect against spurious wakeups - if we get notified that the timer has
// expired when it really hasn't, we don't want to block in read(tfd, ...).
- auto tfd = _r->_task_quota_timer.get();
+ auto tfd = _r._task_quota_timer.get();
::fcntl(tfd, F_SETFL, ::fcntl(tfd, F_GETFL) | O_NONBLOCK);
sigset_t mask = make_sigset_mask(hrtimer_signal());
return pollable_fd_state_ptr(new aio_pollable_fd_state(std::move(fd), std::move(speculate)));
}
-reactor_backend_epoll::reactor_backend_epoll(reactor* r)
+reactor_backend_epoll::reactor_backend_epoll(reactor& r)
: _r(r)
+ , _steady_clock_timer_reactor_thread(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC))
+ , _steady_clock_timer_timer_thread(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC))
, _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC))
, _storage_context(_r) {
::epoll_event event;
event.events = EPOLLIN;
event.data.ptr = nullptr;
- auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r->_notify_eventfd.get(), &event);
+ auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r._notify_eventfd.get(), &event);
+ throw_system_error_on(ret == -1);
+ event.events = EPOLLIN;
+ event.data.ptr = &_steady_clock_timer_reactor_thread;
+ ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _steady_clock_timer_reactor_thread.get(), &event);
throw_system_error_on(ret == -1);
+}
+
+void
+reactor_backend_epoll::task_quota_timer_thread_fn() {
+ auto thread_name = seastar::format("timer-{}", _r._id);
+ pthread_setname_np(pthread_self(), thread_name.c_str());
+
+ sigset_t mask;
+ sigfillset(&mask);
+ for (auto sig : { SIGSEGV }) {
+ sigdelset(&mask, sig);
+ }
+ auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
+ if (r) {
+ seastar_logger.error("Thread {}: failed to block signals. Aborting.", thread_name.c_str());
+ abort();
+ }
- struct sigevent sev{};
- sev.sigev_notify = SIGEV_THREAD_ID;
- sev._sigev_un._tid = syscall(SYS_gettid);
- sev.sigev_signo = hrtimer_signal();
- ret = timer_create(CLOCK_MONOTONIC, &sev, &_steady_clock_timer);
- assert(ret >= 0);
+ // We need to wait until task quota is set before we can calculate how many ticks are to
+ // a minute. Technically task_quota is used from many threads, but since it is read-only here
+ // and only used during initialization we will avoid complicating the code.
+ {
+ uint64_t events;
+ _r._task_quota_timer.read(&events, 8);
+ _r.request_preemption();
+ }
- _r->_signals.handle_signal(hrtimer_signal(), [r = _r] {
- r->service_highres_timer();
- });
-}
+ while (!_r._dying.load(std::memory_order_relaxed)) {
+ // Wait for either the task quota timer, or the high resolution timer, or both,
+ // to expire.
+ struct pollfd pfds[2] = {};
+ pfds[0].fd = _r._task_quota_timer.get();
+ pfds[0].events = POLL_IN;
+ pfds[1].fd = _steady_clock_timer_timer_thread.get();
+ pfds[1].events = POLL_IN;
+ int r = poll(pfds, 2, -1);
+ assert(r != -1);
-reactor_backend_epoll::~reactor_backend_epoll() {
- timer_delete(_steady_clock_timer);
+ uint64_t events;
+ if (pfds[0].revents & POLL_IN) {
+ _r._task_quota_timer.read(&events, 8);
+ }
+ if (pfds[1].revents & POLL_IN) {
+ _steady_clock_timer_timer_thread.read(&events, 8);
+ _highres_timer_pending.store(true, std::memory_order_relaxed);
+ }
+ _r.request_preemption();
+
+ // We're in a different thread, but guaranteed to be on the same core, so even
+ // a signal fence is overdoing it
+ std::atomic_signal_fence(std::memory_order_seq_cst);
+ }
}
+reactor_backend_epoll::~reactor_backend_epoll() = default;
+
void reactor_backend_epoll::start_tick() {
- _task_quota_timer_thread = std::thread(&reactor::task_quota_timer_thread_fn, _r);
+ _task_quota_timer_thread = std::thread(&reactor_backend_epoll::task_quota_timer_thread_fn, this);
::sched_param sp;
sp.sched_priority = 1;
auto sched_ok = pthread_setschedparam(_task_quota_timer_thread.native_handle(), SCHED_FIFO, &sp);
- if (sched_ok != 0 && _r->_id == 0) {
+ if (sched_ok != 0 && _r._id == 0) {
seastar_logger.warn("Unable to set SCHED_FIFO scheduling policy for timer thread; latency impact possible. Try adding CAP_SYS_NICE");
}
}
void reactor_backend_epoll::stop_tick() {
- _r->_dying.store(true, std::memory_order_relaxed);
- _r->_task_quota_timer.timerfd_settime(0, seastar::posix::to_relative_itimerspec(1ns, 1ms)); // Make the timer fire soon
+ _r._dying.store(true, std::memory_order_relaxed);
+ _r._task_quota_timer.timerfd_settime(0, seastar::posix::to_relative_itimerspec(1ns, 1ms)); // Make the timer fire soon
_task_quota_timer_thread.join();
}
void reactor_backend_epoll::arm_highres_timer(const ::itimerspec& its) {
- auto ret = timer_settime(_steady_clock_timer, TIMER_ABSTIME, &its, NULL);
- throw_system_error_on(ret == -1);
+ _steady_clock_timer_deadline = its;
+ _steady_clock_timer_timer_thread.timerfd_settime(TFD_TIMER_ABSTIME, its);
+}
+
+void
+reactor_backend_epoll::switch_steady_clock_timers(file_desc& from, file_desc& to) {
+ auto& deadline = _steady_clock_timer_deadline;
+ if (deadline.it_value.tv_sec == 0 && deadline.it_value.tv_nsec == 0) {
+ return;
+ }
+ // Enable-then-disable, so the hardware timer doesn't have to be reprogrammed. Probably pointless.
+ to.timerfd_settime(TFD_TIMER_ABSTIME, _steady_clock_timer_deadline);
+ from.timerfd_settime(TFD_TIMER_ABSTIME, {});
+}
+
+void reactor_backend_epoll::maybe_switch_steady_clock_timers(int timeout, file_desc& from, file_desc& to) {
+ if (timeout != 0) {
+ switch_steady_clock_timers(from, to);
+ }
}
bool
reactor_backend_epoll::wait_and_process(int timeout, const sigset_t* active_sigmask) {
+ // If we plan to sleep, disable the timer thread steady clock timer (since it won't
+ // wake us up from sleep, and timer thread wakeup will just waste CPU time) and enable
+ // reactor thread steady clock timer.
+ maybe_switch_steady_clock_timers(timeout, _steady_clock_timer_timer_thread, _steady_clock_timer_reactor_thread);
+ auto undo_timer_switch = defer([&] () noexcept {
+ try {
+ maybe_switch_steady_clock_timers(timeout, _steady_clock_timer_reactor_thread, _steady_clock_timer_timer_thread);
+ } catch (...) {
+ seastar_logger.error("Switching steady_clock timers back failed: {}. Aborting...", std::current_exception());
+ abort();
+ }
+ });
std::array<epoll_event, 128> eevt;
int nr = ::epoll_pwait(_epollfd.get(), eevt.data(), eevt.size(), timeout, active_sigmask);
if (nr == -1 && errno == EINTR) {
auto pfd = reinterpret_cast<pollable_fd_state*>(evt.data.ptr);
if (!pfd) {
char dummy[8];
- _r->_notify_eventfd.read(dummy, 8);
+ _r._notify_eventfd.read(dummy, 8);
+ continue;
+ }
+ if (evt.data.ptr == &_steady_clock_timer_reactor_thread) {
+ char dummy[8];
+ _steady_clock_timer_reactor_thread.read(dummy, 8);
+ _highres_timer_pending.store(true, std::memory_order_relaxed);
+ _steady_clock_timer_deadline = {};
continue;
}
if (evt.events & (EPOLLHUP | EPOLLERR)) {
}
bool reactor_backend_epoll::kernel_submit_work() {
+ bool result = false;
_storage_context.submit_work();
if (_need_epoll_events) {
- return wait_and_process(0, nullptr);
+ result |= wait_and_process(0, nullptr);
+ }
+
+ result |= complete_hrtimer();
+
+ return result;
+}
+
+bool reactor_backend_epoll::complete_hrtimer() {
+ // This can be set from either the task quota timer thread, or
+ // wait_and_process(), above.
+ if (_highres_timer_pending.load(std::memory_order_relaxed)) {
+ _highres_timer_pending.store(false, std::memory_order_relaxed);
+ _r.service_highres_timer();
+ return true;
}
return false;
}
void reactor_backend_epoll::wait_and_process_events(const sigset_t* active_sigmask) {
wait_and_process(-1 , active_sigmask);
+ complete_hrtimer();
}
void reactor_backend_epoll::complete_epoll_event(pollable_fd_state& pfd, int events, int event) {
void
reactor_backend_epoll::request_preemption() {
- _r->_preemption_monitor.head.store(1, std::memory_order_relaxed);
+ _r._preemption_monitor.head.store(1, std::memory_order_relaxed);
}
void reactor_backend_epoll::start_handling_signal() {
}
void reactor_backend_epoll::reset_preemption_monitor() {
- _r->_preemption_monitor.head.store(0, std::memory_order_relaxed);
+ _r._preemption_monitor.head.store(0, std::memory_order_relaxed);
}
#ifdef HAVE_OSV
auto fd = file_desc::eventfd(0, 0);
aio_context_t ioc{};
setup_aio_context(1, &ioc);
- auto cleanup = defer([&] { io_destroy(ioc); });
+ auto cleanup = defer([&] () noexcept { io_destroy(ioc); });
linux_abi::iocb iocb = internal::make_poll_iocb(fd.get(), POLLIN|POLLOUT);
linux_abi::iocb* a[1] = { &iocb };
auto r = io_submit(ioc, 1, a);
return true;
}
-std::unique_ptr<reactor_backend> reactor_backend_selector::create(reactor* r) {
+std::unique_ptr<reactor_backend> reactor_backend_selector::create(reactor& r) {
if (_name == "linux-aio") {
return std::make_unique<reactor_backend_aio>(r);
} else if (_name == "epoll") {