]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/src/core/reactor_backend.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / core / reactor_backend.hh
index 89268cc9721436603f5fcdcc2e9fff5ffd4b5c0a..1aef269c751b750898528147f5d491d84ad181ff 100644 (file)
@@ -50,8 +50,10 @@ struct aio_general_context {
     ~aio_general_context();
     internal::linux_abi::aio_context_t io_context{};
     std::unique_ptr<internal::linux_abi::iocb*[]> iocbs;
-    internal::linux_abi::iocb** last = iocbs.get();
+    internal::linux_abi::iocb** last;
+    internal::linux_abi::iocb** const end;
     void queue(internal::linux_abi::iocb* iocb);
+    // submit all queued iocbs and return their count.
     size_t flush();
 };
 
@@ -69,22 +71,34 @@ class aio_storage_context {
         bool has_capacity() const;
     };
 
-    reactor* _r;
+    reactor& _r;
     internal::linux_abi::aio_context_t _io_context;
     boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _submission_queue;
     iocb_pool _iocb_pool;
     size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
     using pending_aio_retry_t = boost::container::static_vector<internal::linux_abi::iocb*, max_aio>;
-    pending_aio_retry_t _pending_aio_retry;
+    pending_aio_retry_t _pending_aio_retry; // Pending retries iocbs
+    pending_aio_retry_t _aio_retries;       // Currently retried iocbs
+    future<> _pending_aio_retry_fut = make_ready_future<>();
     internal::linux_abi::io_event _ev_buffer[max_aio];
+
+    bool need_to_retry() const noexcept {
+        return !_pending_aio_retry.empty() || !_aio_retries.empty();
+    }
+
+    bool retry_in_progress() const noexcept {
+        return !_pending_aio_retry_fut.available();
+    }
+
 public:
-    explicit aio_storage_context(reactor* r);
+    explicit aio_storage_context(reactor& r);
     ~aio_storage_context();
 
-    bool reap_completions();
+    bool reap_completions(bool allow_retry = true);
     void schedule_retry();
     bool submit_work();
     bool can_sleep() const;
+    future<> stop() noexcept;
 };
 
 class completion_with_iocb {
@@ -101,9 +115,8 @@ public:
 
 class fd_kernel_completion : public kernel_completion {
 protected:
-    reactor* _r;
     file_desc& _fd;
-    fd_kernel_completion(reactor* r, file_desc& fd) : _r(r), _fd(fd) {}
+    fd_kernel_completion(file_desc& fd) : _fd(fd) {}
 public:
     file_desc& fd() {
         return _fd;
@@ -112,31 +125,34 @@ public:
 
 struct hrtimer_aio_completion : public fd_kernel_completion,
                                 public completion_with_iocb {
-    hrtimer_aio_completion(reactor* r, file_desc& fd);
+private:
+    reactor& _r;
+public:
+    hrtimer_aio_completion(reactor& r, file_desc& fd);
     virtual void complete_with(ssize_t value) override;
 };
 
 struct task_quota_aio_completion : public fd_kernel_completion,
                                    public completion_with_iocb {
-    task_quota_aio_completion(reactor* r, file_desc& fd);
+    task_quota_aio_completion(file_desc& fd);
     virtual void complete_with(ssize_t value) override;
 };
 
 struct smp_wakeup_aio_completion : public fd_kernel_completion,
                                    public completion_with_iocb {
-    smp_wakeup_aio_completion(reactor* r, file_desc& fd);
+    smp_wakeup_aio_completion(file_desc& fd);
     virtual void complete_with(ssize_t value) override;
 };
 
 // Common aio-based Implementation of the task quota and hrtimer.
 class preempt_io_context {
-    reactor* _r;
+    reactor& _r;
     aio_general_context _context{2};
 
     task_quota_aio_completion _task_quota_aio_completion;
     hrtimer_aio_completion _hrtimer_aio_completion;
 public:
-    preempt_io_context(reactor* r, file_desc& task_quota, file_desc& hrtimer);
+    preempt_io_context(reactor& r, file_desc& task_quota, file_desc& hrtimer);
     bool service_preempting_io();
 
     size_t flush() {
@@ -203,18 +219,31 @@ public:
 // (such as timers, signals, inter-thread notifications) into file descriptors
 // using mechanisms like timerfd, signalfd and eventfd respectively.
 class reactor_backend_epoll : public reactor_backend {
-    reactor* _r;
+    reactor& _r;
+    std::atomic<bool> _highres_timer_pending;
     std::thread _task_quota_timer_thread;
-    timer_t _steady_clock_timer = {};
+    ::itimerspec _steady_clock_timer_deadline = {};
+    // These two timers are used for high resolution timer<>s, one for
+    // the reactor thread (when sleeping) and one for the timer thread
+    // (when awake). We can't use one timer because of races between the
+    // timer thread and reactor thread.
+    //
+    // Only one of the two is active at any time.
+    file_desc _steady_clock_timer_reactor_thread;
+    file_desc _steady_clock_timer_timer_thread;
 private:
     file_desc _epollfd;
+    void task_quota_timer_thread_fn();
     future<> get_epoll_future(pollable_fd_state& fd, int event);
     void complete_epoll_event(pollable_fd_state& fd, int events, int event);
     aio_storage_context _storage_context;
+    void switch_steady_clock_timers(file_desc& from, file_desc& to);
+    void maybe_switch_steady_clock_timers(int timeout, file_desc& from, file_desc& to);
     bool wait_and_process(int timeout, const sigset_t* active_sigmask);
+    bool complete_hrtimer();
     bool _need_epoll_events = false;
 public:
-    explicit reactor_backend_epoll(reactor* r);
+    explicit reactor_backend_epoll(reactor& r);
     virtual ~reactor_backend_epoll() override;
 
     virtual bool reap_kernel_completions() override;
@@ -249,20 +278,20 @@ public:
 };
 
 class reactor_backend_aio : public reactor_backend {
-    static constexpr size_t max_polls = 10000;
-    reactor* _r;
+    reactor& _r;
+    unsigned max_polls() const;
     file_desc _hrtimer_timerfd;
     aio_storage_context _storage_context;
     // We use two aio contexts, one for preempting events (the timer tick and
     // signals), the other for non-preempting events (fd poll).
     preempt_io_context _preempting_io; // Used for the timer tick and the high resolution timer
-    aio_general_context _polling_io{max_polls}; // FIXME: unify with disk aio_context
+    aio_general_context _polling_io{max_polls()}; // FIXME: unify with disk aio_context
     hrtimer_aio_completion _hrtimer_poll_completion;
     smp_wakeup_aio_completion _smp_wakeup_aio_completion;
     static file_desc make_timerfd();
     bool await_events(int timeout, const sigset_t* active_sigmask);
 public:
-    explicit reactor_backend_aio(reactor* r);
+    explicit reactor_backend_aio(reactor& r);
 
     virtual bool reap_kernel_completions() override;
     virtual bool kernel_submit_work() override;
@@ -340,24 +369,13 @@ private:
     static bool has_enough_aio_nr();
     explicit reactor_backend_selector(std::string name) : _name(std::move(name)) {}
 public:
-    std::unique_ptr<reactor_backend> create(reactor* r);
+    const std::string& name() const { return _name; }
+    std::unique_ptr<reactor_backend> create(reactor& r);
     static reactor_backend_selector default_backend();
     static std::vector<reactor_backend_selector> available();
     friend std::ostream& operator<<(std::ostream& os, const reactor_backend_selector& rbs) {
         return os << rbs._name;
     }
-    friend void validate(boost::any& v, const std::vector<std::string> values, reactor_backend_selector* rbs, int) {
-        namespace bpo = boost::program_options;
-        bpo::validators::check_first_occurrence(v);
-        auto s = bpo::validators::get_single_string(values);
-        for (auto&& x : available()) {
-            if (s == x._name) {
-                v = std::move(x);
-                return;
-            }
-        }
-        throw bpo::validation_error(bpo::validation_error::invalid_option_value);
-    }
 };
 
 }