~aio_general_context();
internal::linux_abi::aio_context_t io_context{};
std::unique_ptr<internal::linux_abi::iocb*[]> iocbs;
- internal::linux_abi::iocb** last = iocbs.get();
+ internal::linux_abi::iocb** last;
+ internal::linux_abi::iocb** const end;
void queue(internal::linux_abi::iocb* iocb);
+ // submit all queued iocbs and return their count.
size_t flush();
};
bool has_capacity() const;
};
- reactor* _r;
+ reactor& _r;
internal::linux_abi::aio_context_t _io_context;
boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue;
iocb_pool _iocb_pool;
size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
using pending_aio_retry_t = boost::container::static_vector<internal::linux_abi::iocb*, max_aio>;
- pending_aio_retry_t _pending_aio_retry;
+ pending_aio_retry_t _pending_aio_retry; // Pending retries iocbs
+ pending_aio_retry_t _aio_retries; // Currently retried iocbs
+ future<> _pending_aio_retry_fut = make_ready_future<>();
internal::linux_abi::io_event _ev_buffer[max_aio];
+
+ bool need_to_retry() const noexcept {
+ return !_pending_aio_retry.empty() || !_aio_retries.empty();
+ }
+
+ bool retry_in_progress() const noexcept {
+ return !_pending_aio_retry_fut.available();
+ }
+
public:
- explicit aio_storage_context(reactor* r);
+ explicit aio_storage_context(reactor& r);
~aio_storage_context();
- bool reap_completions();
+ bool reap_completions(bool allow_retry = true);
void schedule_retry();
bool submit_work();
bool can_sleep() const;
+ future<> stop() noexcept;
};
class completion_with_iocb {
class fd_kernel_completion : public kernel_completion {
protected:
- reactor* _r;
file_desc& _fd;
- fd_kernel_completion(reactor* r, file_desc& fd) : _r(r), _fd(fd) {}
+ fd_kernel_completion(file_desc& fd) : _fd(fd) {}
public:
file_desc& fd() {
return _fd;
struct hrtimer_aio_completion : public fd_kernel_completion,
public completion_with_iocb {
- hrtimer_aio_completion(reactor* r, file_desc& fd);
+private:
+ reactor& _r;
+public:
+ hrtimer_aio_completion(reactor& r, file_desc& fd);
virtual void complete_with(ssize_t value) override;
};
struct task_quota_aio_completion : public fd_kernel_completion,
public completion_with_iocb {
- task_quota_aio_completion(reactor* r, file_desc& fd);
+ task_quota_aio_completion(file_desc& fd);
virtual void complete_with(ssize_t value) override;
};
struct smp_wakeup_aio_completion : public fd_kernel_completion,
public completion_with_iocb {
- smp_wakeup_aio_completion(reactor* r, file_desc& fd);
+ smp_wakeup_aio_completion(file_desc& fd);
virtual void complete_with(ssize_t value) override;
};
// Common aio-based Implementation of the task quota and hrtimer.
class preempt_io_context {
- reactor* _r;
+ reactor& _r;
aio_general_context _context{2};
task_quota_aio_completion _task_quota_aio_completion;
hrtimer_aio_completion _hrtimer_aio_completion;
public:
- preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer);
+ preempt_io_context(reactor& r, file_desc& task_quota, file_desc& hrtimer);
bool service_preempting_io();
size_t flush() {
// (such as timers, signals, inter-thread notifications) into file descriptors
// using mechanisms like timerfd, signalfd and eventfd respectively.
class reactor_backend_epoll : public reactor_backend {
- reactor* _r;
+ reactor& _r;
+ std::atomic<bool> _highres_timer_pending;
std::thread _task_quota_timer_thread;
- timer_t _steady_clock_timer = {};
+ ::itimerspec _steady_clock_timer_deadline = {};
+ // These two timers are used for high resolution timer<>s, one for
+ // the reactor thread (when sleeping) and one for the timer thread
+ // (when awake). We can't use one timer because of races between the
+ // timer thread and reactor thread.
+ //
+ // Only one of the two is active at any time.
+ file_desc _steady_clock_timer_reactor_thread;
+ file_desc _steady_clock_timer_timer_thread;
private:
file_desc _epollfd;
+ void task_quota_timer_thread_fn();
future<> get_epoll_future(pollable_fd_state& fd, int event);
void complete_epoll_event(pollable_fd_state& fd, int events, int event);
aio_storage_context _storage_context;
+ void switch_steady_clock_timers(file_desc& from, file_desc& to);
+ void maybe_switch_steady_clock_timers(int timeout, file_desc& from, file_desc& to);
bool wait_and_process(int timeout, const sigset_t* active_sigmask);
+ bool complete_hrtimer();
bool _need_epoll_events = false;
public:
- explicit reactor_backend_epoll(reactor* r);
+ explicit reactor_backend_epoll(reactor& r);
virtual ~reactor_backend_epoll() override;
virtual bool reap_kernel_completions() override;
};
class reactor_backend_aio : public reactor_backend {
- static constexpr size_t max_polls = 10000;
- reactor* _r;
+ reactor& _r;
+ unsigned max_polls() const;
file_desc _hrtimer_timerfd;
aio_storage_context _storage_context;
// We use two aio contexts, one for preempting events (the timer tick and
// signals), the other for non-preempting events (fd poll).
preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer
- aio_general_context _polling_io{max_polls}; // FIXME: unify with disk aio_context
+ aio_general_context _polling_io{max_polls()}; // FIXME: unify with disk aio_context
hrtimer_aio_completion _hrtimer_poll_completion;
smp_wakeup_aio_completion _smp_wakeup_aio_completion;
static file_desc make_timerfd();
bool await_events(int timeout, const sigset_t* active_sigmask);
public:
- explicit reactor_backend_aio(reactor* r);
+ explicit reactor_backend_aio(reactor& r);
virtual bool reap_kernel_completions() override;
virtual bool kernel_submit_work() override;
static bool has_enough_aio_nr();
explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {}
public:
- std::unique_ptr<reactor_backend> create(reactor* r);
+ const std::string& name() const { return _name; }
+ std::unique_ptr<reactor_backend> create(reactor& r);
static reactor_backend_selector default_backend();
static std::vector<reactor_backend_selector> available();
friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) {
return os << rbs._name;
}
- friend void validate(boost::any& v, const std::vector<std::string> values, reactor_backend_selector* rbs, int) {
- namespace bpo = boost::program_options;
- bpo::validators::check_first_occurrence(v);
- auto s = bpo::validators::get_single_string(values);
- for (auto&& x : available()) {
- if (s == x._name) {
- v = std::move(x);
- return;
- }
- }
- throw bpo::validation_error(bpo::validation_error::invalid_option_value);
- }
};
}