]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/core/reactor.hh
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / include / seastar / core / reactor.hh
index 0f4ed45b20e8004462b8354eeb665bd006cf5695..ada767cddb1a8ef29037c3a4cbef611410548c32 100644 (file)
@@ -54,6 +54,7 @@
 #include <boost/thread/barrier.hpp>
 #include <boost/container/static_vector.hpp>
 #include <set>
+#include <seastar/core/reactor_config.hh>
 #include <seastar/core/linux-aio.hh>
 #include <seastar/util/eclipse.hh>
 #include <seastar/core/future.hh>
 #include <seastar/core/manual_clock.hh>
 #include <seastar/core/metrics_registration.hh>
 #include <seastar/core/scheduling.hh>
+#include <seastar/core/smp.hh>
+#include <seastar/core/internal/io_request.hh>
 #include "internal/pollable_fd.hh"
+#include "internal/poll.hh"
 
 #ifdef HAVE_OSV
 #include <osv/sched.hh>
@@ -126,147 +130,14 @@ bool operator==(const ::sockaddr_in a, const ::sockaddr_in b);
 namespace seastar {
 
 void register_network_stack(sstring name, boost::program_options::options_description opts,
-    std::function<future<std::unique_ptr<network_stack>>(boost::program_options::variables_map opts)> create,
+    noncopyable_function<future<std::unique_ptr<network_stack>>(boost::program_options::variables_map opts)> create,
     bool make_default = false);
 
 class thread_pool;
 class smp;
 
-class smp_message_queue {
-    static constexpr size_t queue_length = 128;
-    static constexpr size_t batch_size = 16;
-    static constexpr size_t prefetch_cnt = 2;
-    struct work_item;
-    struct lf_queue_remote {
-        reactor* remote;
-    };
-    using lf_queue_base = boost::lockfree::spsc_queue<work_item*,
-                            boost::lockfree::capacity<queue_length>>;
-    // use inheritence to control placement order
-    struct lf_queue : lf_queue_remote, lf_queue_base {
-        lf_queue(reactor* remote) : lf_queue_remote{remote} {}
-        void maybe_wakeup();
-    };
-    lf_queue _pending;
-    lf_queue _completed;
-    struct alignas(seastar::cache_line_size) {
-        size_t _sent = 0;
-        size_t _compl = 0;
-        size_t _last_snt_batch = 0;
-        size_t _last_cmpl_batch = 0;
-        size_t _current_queue_length = 0;
-    };
-    // keep this between two structures with statistics
-    // this makes sure that they have at least one cache line
-    // between them, so hw prefetcher will not accidentally prefetch
-    // cache line used by another cpu.
-    metrics::metric_groups _metrics;
-    struct alignas(seastar::cache_line_size) {
-        size_t _received = 0;
-        size_t _last_rcv_batch = 0;
-    };
-    struct work_item {
-        scheduling_group sg = current_scheduling_group();
-        virtual ~work_item() {}
-        virtual void process() = 0;
-        virtual void complete() = 0;
-    };
-    template <typename Func>
-    struct async_work_item : work_item {
-        smp_message_queue& _queue;
-        Func _func;
-        using futurator = futurize<std::result_of_t<Func()>>;
-        using future_type = typename futurator::type;
-        using value_type = typename future_type::value_type;
-        compat::optional<value_type> _result;
-        std::exception_ptr _ex; // if !_result
-        typename futurator::promise_type _promise; // used on local side
-        async_work_item(smp_message_queue& queue, Func&& func) : _queue(queue), _func(std::move(func)) {}
-        virtual void process() override {
-            try {
-              with_scheduling_group(this->sg, [this] {
-                futurator::apply(this->_func).then_wrapped([this] (auto f) {
-                    if (f.failed()) {
-                        _ex = f.get_exception();
-                    } else {
-                        _result = f.get();
-                    }
-                    _queue.respond(this);
-                });
-              });
-            } catch (...) {
-                _ex = std::current_exception();
-                _queue.respond(this);
-            }
-        }
-        virtual void complete() override {
-            if (_result) {
-                _promise.set_value(std::move(*_result));
-            } else {
-                // FIXME: _ex was allocated on another cpu
-                _promise.set_exception(std::move(_ex));
-            }
-        }
-        future_type get_future() { return _promise.get_future(); }
-    };
-    union tx_side {
-        tx_side() {}
-        ~tx_side() {}
-        void init() { new (&a) aa; }
-        struct aa {
-            std::deque<work_item*> pending_fifo;
-        } a;
-    } _tx;
-    std::vector<work_item*> _completed_fifo;
-public:
-    smp_message_queue(reactor* from, reactor* to);
-    ~smp_message_queue();
-    template <typename Func>
-    futurize_t<std::result_of_t<Func()>> submit(Func&& func) {
-        auto wi = std::make_unique<async_work_item<Func>>(*this, std::forward<Func>(func));
-        auto fut = wi->get_future();
-        submit_item(std::move(wi));
-        return fut;
-    }
-    void start(unsigned cpuid);
-    template<size_t PrefetchCnt, typename Func>
-    size_t process_queue(lf_queue& q, Func process);
-    size_t process_incoming();
-    size_t process_completions();
-    void stop();
-private:
-    void work();
-    void submit_item(std::unique_ptr<work_item> wi);
-    void respond(work_item* wi);
-    void move_pending();
-    void flush_request_batch();
-    void flush_response_batch();
-    bool has_unflushed_responses() const;
-    bool pure_poll_rx() const;
-    bool pure_poll_tx() const;
-
-    friend class smp;
-};
-
 class reactor_backend_selector;
 
-enum class open_flags {
-    rw = O_RDWR,
-    ro = O_RDONLY,
-    wo = O_WRONLY,
-    create = O_CREAT,
-    truncate = O_TRUNC,
-    exclusive = O_EXCL,
-};
-
-inline open_flags operator|(open_flags a, open_flags b) {
-    return open_flags(std::underlying_type_t<open_flags>(a) | std::underlying_type_t<open_flags>(b));
-}
-
-inline open_flags operator&(open_flags a, open_flags b) {
-    return open_flags(std::underlying_type_t<open_flags>(a) & std::underlying_type_t<open_flags>(b));
-}
-
 class reactor_backend;
 
 namespace internal {
@@ -276,64 +147,56 @@ class cpu_stall_detector;
 
 }
 
-class io_desc;
+class kernel_completion;
+class io_queue;
 class disk_config_params;
 
 class reactor {
     using sched_clock = std::chrono::steady_clock;
 private:
-    struct pollfn {
-        virtual ~pollfn() {}
-        // Returns true if work was done (false = idle)
-        virtual bool poll() = 0;
-        // Checks if work needs to be done, but without actually doing any
-        // returns true if works needs to be done (false = idle)
-        virtual bool pure_poll() = 0;
-        // Tries to enter interrupt mode.
-        //
-        // If it returns true, then events from this poller will wake
-        // a sleeping idle loop, and exit_interrupt_mode() must be called
-        // to return to normal polling.
-        //
-        // If it returns false, the sleeping idle loop may not be entered.
-        virtual bool try_enter_interrupt_mode() { return false; }
-        virtual void exit_interrupt_mode() {}
-    };
     struct task_queue;
     using task_queue_list = circular_buffer_fixed_capacity<task_queue*, max_scheduling_groups()>;
+    using pollfn = seastar::pollfn;
 
-    class io_pollfn;
     class signal_pollfn;
-    class aio_batch_submit_pollfn;
     class batch_flush_pollfn;
     class smp_pollfn;
     class drain_cross_cpu_freelist_pollfn;
     class lowres_timer_pollfn;
     class manual_timer_pollfn;
     class epoll_pollfn;
+    class reap_kernel_completions_pollfn;
+    class kernel_submit_work_pollfn;
+    class io_queue_submission_pollfn;
     class syscall_pollfn;
     class execution_stage_pollfn;
-    friend io_pollfn;
     friend signal_pollfn;
-    friend aio_batch_submit_pollfn;
     friend batch_flush_pollfn;
     friend smp_pollfn;
     friend drain_cross_cpu_freelist_pollfn;
     friend lowres_timer_pollfn;
     friend class manual_clock;
     friend class epoll_pollfn;
+    friend class reap_kernel_completions_pollfn;
+    friend class kernel_submit_work_pollfn;
+    friend class io_queue_submission_pollfn;
     friend class syscall_pollfn;
     friend class execution_stage_pollfn;
     friend class file_data_source_impl; // for fstream statistics
     friend class internal::reactor_stall_sampler;
+    friend class preempt_io_context;
+    friend struct hrtimer_aio_completion;
+    friend struct task_quota_aio_completion;
     friend class reactor_backend_epoll;
     friend class reactor_backend_aio;
+    friend class reactor_backend_selector;
+    friend class aio_storage_context;
 public:
     class poller {
         std::unique_ptr<pollfn> _pollfn;
         class registration_task;
         class deregistration_task;
-        registration_task* _registration_task;
+        registration_task* _registration_task = nullptr;
     public:
         template <typename Func> // signature: bool ()
         static poller simple(Func&& poll) {
@@ -346,15 +209,15 @@ public:
         ~poller();
         poller(poller&& x);
         poller& operator=(poller&& x);
-        void do_register();
+        void do_register() noexcept;
         friend class reactor;
     };
     enum class idle_cpu_handler_result {
         no_more_work,
         interrupted_by_higher_priority_task
     };
-    using work_waiting_on_reactor = const std::function<bool()>&;
-    using idle_cpu_handler = std::function<idle_cpu_handler_result(work_waiting_on_reactor)>;
+    using work_waiting_on_reactor = const noncopyable_function<bool()>&;
+    using idle_cpu_handler = noncopyable_function<idle_cpu_handler_result(work_waiting_on_reactor)>;
 
     struct io_stats {
         uint64_t aio_reads = 0;
@@ -370,6 +233,7 @@ public:
         uint64_t fstream_read_ahead_discarded_bytes = 0;
     };
 private:
+    reactor_config _cfg;
     file_desc _notify_eventfd;
     file_desc _task_quota_timer;
 #ifdef HAVE_OSV
@@ -389,6 +253,7 @@ private:
     static constexpr unsigned max_queues = 8;
     static constexpr unsigned max_aio = max_aio_per_queue * max_queues;
     friend disk_config_params;
+
     // Not all reactors have IO queues. If the number of IO queues is less than the number of shards,
     // some reactors will talk to foreign io_queues. If this reactor holds a valid IO queue, it will
     // be stored here.
@@ -396,19 +261,20 @@ private:
     std::unordered_map<dev_t, io_queue*> _io_queues;
     friend io_queue;
 
-    std::vector<std::function<future<> ()>> _exit_funcs;
+    std::vector<noncopyable_function<future<> ()>> _exit_funcs;
     unsigned _id = 0;
     bool _stopping = false;
     bool _stopped = false;
+    bool _finished_running_tasks = false;
     condition_variable _stop_requested;
     bool _handle_sigint = true;
-    promise<std::unique_ptr<network_stack>> _network_stack_ready_promise;
+    compat::optional<future<std::unique_ptr<network_stack>>> _network_stack_ready;
     int _return = 0;
     promise<> _start_promise;
     semaphore _cpu_started;
     internal::preemption_monitor _preemption_monitor{};
-    std::atomic<uint64_t> _tasks_processed = { 0 };
-    std::atomic<uint64_t> _polls = { 0 };
+    uint64_t _global_tasks_processed = 0;
+    uint64_t _polls = 0;
     std::unique_ptr<internal::cpu_stall_detector> _cpu_stall_detector;
 
     unsigned _max_task_backlog = 1000;
@@ -418,14 +284,10 @@ private:
     timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
     timer_set<timer<manual_clock>, &timer<manual_clock>::_link> _manual_timers;
     timer_set<timer<manual_clock>, &timer<manual_clock>::_link>::timer_list_t _expired_manual_timers;
-    internal::linux_abi::aio_context_t _io_context;
-    alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
-    std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
-    boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio;
-    boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry;
     io_stats _io_stats;
     uint64_t _fsyncs = 0;
     uint64_t _cxx_exceptions = 0;
+    uint64_t _abandoned_failed_futures = 0;
     struct task_queue {
         explicit task_queue(unsigned id, sstring name, float shares);
         int64_t _vruntime = 0;
@@ -436,15 +298,27 @@ private:
         uint8_t _id;
         sched_clock::duration _runtime = {};
         uint64_t _tasks_processed = 0;
-        circular_buffer<std::unique_ptr<task>> _q;
+        circular_buffer<task*> _q;
         sstring _name;
+        /**
+         * This array holds pointers to the scheduling group specific
+         * data. The pointer is not use as is but is cast to a reference
+         * to the appropriate type that is actually pointed to.
+         */
+        std::vector<void*> _scheduling_group_specific_vals;
         int64_t to_vruntime(sched_clock::duration runtime) const;
         void set_shares(float shares);
         struct indirect_compare;
         sched_clock::duration _time_spent_on_task_quota_violations = {};
         seastar::metrics::metric_groups _metrics;
+        void rename(sstring new_name);
+    private:
+        void register_stats();
     };
+
+    circular_buffer<internal::io_request> _pending_io;
     boost::container::static_vector<std::unique_ptr<task_queue>, max_scheduling_groups()> _task_queues;
+    std::vector<scheduling_group_key_config> _scheduling_group_key_configs;
     int64_t _last_vruntime = 0;
     task_queue_list _active_task_queues;
     task_queue_list _activating_task_queues;
@@ -478,6 +352,7 @@ private:
     bool _strict_o_direct = true;
     bool _force_io_getevents_syscall = false;
     bool _bypass_fsync = false;
+    bool _have_aio_fsync = false;
     std::atomic<bool> _dying{false};
 private:
     static std::chrono::nanoseconds calculate_poll_time();
@@ -508,7 +383,7 @@ private:
 
 public:
     /// Register a user-defined signal handler
-    void handle_signal(int signo, std::function<void ()>&& handler);
+    void handle_signal(int signo, noncopyable_function<void ()>&& handler);
 
 private:
     class signals {
@@ -518,19 +393,19 @@ private:
 
         bool poll_signal();
         bool pure_poll_signal() const;
-        void handle_signal(int signo, std::function<void ()>&& handler);
-        void handle_signal_once(int signo, std::function<void ()>&& handler);
+        void handle_signal(int signo, noncopyable_function<void ()>&& handler);
+        void handle_signal_once(int signo, noncopyable_function<void ()>&& handler);
         static void action(int signo, siginfo_t* siginfo, void* ignore);
         static void failed_to_handle(int signo);
     private:
         struct signal_handler {
-            signal_handler(int signo, std::function<void ()>&& handler);
-            std::function<void ()> _handler;
+            signal_handler(int signo, noncopyable_function<void ()>&& handler);
+            noncopyable_function<void ()> _handler;
         };
         std::atomic<uint64_t> _pending_signals;
         std::unordered_map<int, signal_handler> _signal_handlers;
 
-        friend void reactor::handle_signal(int, std::function<void ()>&&);
+        friend void reactor::handle_signal(int, noncopyable_function<void ()>&&);
     };
 
     signals _signals;
@@ -549,20 +424,51 @@ private:
     void insert_activating_task_queues();
     void account_runtime(task_queue& tq, sched_clock::duration runtime);
     void account_idle(sched_clock::duration idletime);
-    void init_scheduling_group(scheduling_group sg, sstring name, float shares);
-    void destroy_scheduling_group(scheduling_group sg);
+    void allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key);
+    future<> init_scheduling_group(scheduling_group sg, sstring name, float shares);
+    future<> init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg);
+    future<> destroy_scheduling_group(scheduling_group sg);
+    [[noreturn]] void no_such_scheduling_group(scheduling_group sg);
+    void* get_scheduling_group_specific_value(scheduling_group sg, scheduling_group_key key) {
+        if (!_task_queues[sg._id]) {
+            no_such_scheduling_group(sg);
+        }
+        return _task_queues[sg._id]->_scheduling_group_specific_vals[key.id()];
+    }
+    void* get_scheduling_group_specific_value(scheduling_group_key key) {
+        return get_scheduling_group_specific_value(*internal::current_scheduling_group_ptr(), key);
+    }
     uint64_t tasks_processed() const;
     uint64_t min_vruntime() const;
     void request_preemption();
+    void start_handling_signal();
     void reset_preemption_monitor();
     void service_highres_timer();
+
+    future<std::tuple<pollable_fd, socket_address>>
+    do_accept(pollable_fd_state& listen_fd);
+    future<> do_connect(pollable_fd_state& pfd, socket_address& sa);
+
+    future<size_t>
+    do_read_some(pollable_fd_state& fd, void* buffer, size_t size);
+    future<size_t>
+    do_read_some(pollable_fd_state& fd, const std::vector<iovec>& iov);
+
+    future<size_t>
+    do_write_some(pollable_fd_state& fd, const void* buffer, size_t size);
+    future<size_t>
+    do_write_some(pollable_fd_state& fd, net::packet& p);
 public:
-    static boost::program_options::options_description get_options_description(std::chrono::duration<double> default_task_quota);
-    explicit reactor(unsigned id, reactor_backend_selector rbs);
+    static boost::program_options::options_description get_options_description(reactor_config cfg);
+    explicit reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg);
     reactor(const reactor&) = delete;
     ~reactor();
     void operator=(const reactor&) = delete;
 
+    sched_clock::duration uptime() {
+        return sched_clock::now() - _start_time;
+    }
+
     io_queue& get_io_queue(dev_t devid = 0) {
         auto queue = _io_queues.find(devid);
         if (queue == _io_queues.end()) {
@@ -583,6 +489,7 @@ public:
     /// \param shares the new shares value
     /// \return a future that is ready when the share update is applied
     future<> update_shares_for_class(io_priority_class pc, uint32_t shares);
+    static future<> rename_priority_class(io_priority_class pc, sstring new_name);
 
     void configure(boost::program_options::variables_map config);
 
@@ -595,44 +502,44 @@ public:
 
     bool posix_reuseport_available() const { return _reuseport; }
 
-    lw_shared_ptr<pollable_fd> make_pollable_fd(socket_address sa, transport proto = transport::TCP);
-    future<> posix_connect(lw_shared_ptr<pollable_fd> pfd, socket_address sa, socket_address local);
-
-    future<pollable_fd, socket_address> accept(pollable_fd_state& listen_fd);
+    lw_shared_ptr<pollable_fd> make_pollable_fd(socket_address sa, int proto);
 
-    future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t size);
-    future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov);
-
-    future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t size);
+    future<> posix_connect(lw_shared_ptr<pollable_fd> pfd, socket_address sa, socket_address local);
 
     future<> write_all(pollable_fd_state& fd, const void* buffer, size_t size);
 
     future<file> open_file_dma(sstring name, open_flags flags, file_open_options options = {});
     future<file> open_directory(sstring name);
-    future<> make_directory(sstring name);
-    future<> touch_directory(sstring name);
-    future<compat::optional<directory_entry_type>>  file_type(sstring name);
+    future<> make_directory(sstring name, file_permissions permissions = file_permissions::default_dir_permissions);
+    future<> touch_directory(sstring name, file_permissions permissions = file_permissions::default_dir_permissions);
+    future<compat::optional<directory_entry_type>>  file_type(sstring name, follow_symlink = follow_symlink::yes);
+    future<stat_data> file_stat(sstring pathname, follow_symlink);
     future<uint64_t> file_size(sstring pathname);
-    future<bool> file_exists(sstring pathname);
+    future<bool> file_accessible(sstring pathname, access_flags flags);
+    future<bool> file_exists(sstring pathname) {
+        return file_accessible(pathname, access_flags::exists);
+    }
     future<fs_type> file_system_at(sstring pathname);
     future<struct statvfs> statvfs(sstring pathname);
     future<> remove_file(sstring pathname);
     future<> rename_file(sstring old_pathname, sstring new_pathname);
     future<> link_file(sstring oldpath, sstring newpath);
+    future<> chmod(sstring name, file_permissions permissions);
 
     // In the following three methods, prepare_io is not guaranteed to execute in the same processor
     // in which it was generated. Therefore, care must be taken to avoid the use of objects that could
     // be destroyed within or at exit of prepare_io.
-    template <typename Func>
-    void submit_io(io_desc* desc, Func prepare_io);
-
-    template <typename Func>
-    future<internal::linux_abi::io_event> submit_io_read(io_queue* ioq, const io_priority_class& priority_class, size_t len, Func prepare_io);
-    template <typename Func>
-    future<internal::linux_abi::io_event> submit_io_write(io_queue* ioq, const io_priority_class& priority_class, size_t len, Func prepare_io);
-
-    inline void handle_io_result(const internal::linux_abi::io_event& ev) {
-        auto res = long(ev.res);
+    void submit_io(kernel_completion* desc, internal::io_request req);
+    future<size_t> submit_io_read(io_queue* ioq,
+            const io_priority_class& priority_class,
+            size_t len,
+            internal::io_request req);
+    future<size_t> submit_io_write(io_queue* ioq,
+            const io_priority_class& priority_class,
+            size_t len,
+            internal::io_request req);
+
+    inline void handle_io_result(ssize_t res) {
         if (res < 0) {
             ++_io_stats.aio_errors;
             throw_kernel_error(res);
@@ -649,7 +556,7 @@ public:
         return _stop_requested.wait(timeout, [this] { return _stopping; });
     }
 
-    void at_exit(std::function<future<> ()> func);
+    void at_exit(noncopyable_function<future<> ()> func);
 
     template <typename Func>
     void at_destroy(Func&& func) {
@@ -657,10 +564,10 @@ public:
     }
 
 #ifdef SEASTAR_SHUFFLE_TASK_QUEUE
-    void shuffle(std::unique_ptr<task>&, task_queue&);
+    void shuffle(task*&, task_queue&);
 #endif
 
-    void add_task(std::unique_ptr<task>&& t) {
+    void add_task(task* t) noexcept {
         auto sg = t->group();
         auto* q = _task_queues[sg._id].get();
         bool was_empty = q->_q.empty();
@@ -672,7 +579,7 @@ public:
             activate(*q);
         }
     }
-    void add_urgent_task(std::unique_ptr<task>&& t) {
+    void add_urgent_task(task* t) noexcept {
         auto sg = t->group();
         auto* q = _task_queues[sg._id].get();
         bool was_empty = q->_q.empty();
@@ -698,12 +605,11 @@ public:
     }
     void force_poll();
 
-    void add_high_priority_task(std::unique_ptr<task>&&);
+    void add_high_priority_task(task*) noexcept;
 
     network_stack& net() { return *_network_stack; }
     shard_id cpu_id() const { return _id; }
 
-    void start_epoll();
     void sleep();
 
     steady_clock_type::duration total_idle_time();
@@ -711,6 +617,7 @@ public:
     std::chrono::nanoseconds total_steal_time();
 
     const io_stats& get_io_stats() const { return _io_stats; }
+    uint64_t abandoned_failed_futures() const { return _abandoned_failed_futures; }
 #ifdef HAVE_OSV
     void timer_thread_func();
     void set_timer(sched::timer &tmr, s64 t);
@@ -730,7 +637,7 @@ private:
     void register_metrics();
     future<> write_all_part(pollable_fd_state& fd, const void* buffer, size_t size, size_t completed);
 
-    bool process_io();
+    future<> fdatasync(int fd);
 
     void add_timer(timer<steady_clock_type>*);
     bool queue_timer(timer<steady_clock_type>*);
@@ -747,6 +654,7 @@ private:
     friend class alien::message_queue;
     friend class pollable_fd;
     friend class pollable_fd_state;
+    friend struct pollable_fd_state_deleter;
     friend class posix_file_impl;
     friend class blockdev_file_impl;
     friend class readable_eventfd;
@@ -759,15 +667,33 @@ private:
     friend class scheduling_group;
     friend void add_to_flush_poller(output_stream<char>* os);
     friend int ::_Unwind_RaiseException(struct _Unwind_Exception *h);
+    friend void report_failed_future(const std::exception_ptr& eptr) noexcept;
     metrics::metric_groups _metric_groups;
     friend future<scheduling_group> create_scheduling_group(sstring name, float shares);
     friend future<> seastar::destroy_scheduling_group(scheduling_group);
+    friend future<> seastar::rename_scheduling_group(scheduling_group sg, sstring new_name);
+    friend future<scheduling_group_key> scheduling_group_key_create(scheduling_group_key_config cfg);
+
+    template<typename T>
+    friend T& scheduling_group_get_specific(scheduling_group sg, scheduling_group_key key);
+    template<typename T>
+    friend T& scheduling_group_get_specific(scheduling_group_key key);
+    template<typename SpecificValType, typename Mapper, typename Reducer, typename Initial>
+        GCC6_CONCEPT( requires requires(SpecificValType specific_val, Mapper mapper, Reducer reducer, Initial initial) {
+            {reducer(initial, mapper(specific_val))} -> Initial;
+        })
+    friend future<typename function_traits<Reducer>::return_type>
+    map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer, Initial initial_val, scheduling_group_key key);
+    template<typename SpecificValType, typename Reducer, typename Initial>
+    GCC6_CONCEPT( requires requires(SpecificValType specific_val, Reducer reducer, Initial initial) {
+        {reducer(initial, specific_val)} -> Initial;
+    })
+    friend future<typename function_traits<Reducer>::return_type>
+        reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, scheduling_group_key key);
 public:
-    bool wait_and_process(int timeout = 0, const sigset_t* active_sigmask = nullptr);
     future<> readable(pollable_fd_state& fd);
     future<> writeable(pollable_fd_state& fd);
     future<> readable_or_writeable(pollable_fd_state& fd);
-    void forget(pollable_fd_state& fd);
     void abort_reader(pollable_fd_state& fd);
     void abort_writer(pollable_fd_state& fd);
     void enable_timer(steady_clock_type::time_point when);
@@ -817,95 +743,6 @@ inline bool engine_is_ready() {
     return local_engine != nullptr;
 }
 
-class smp {
-    static std::vector<posix_thread> _threads;
-    static std::vector<std::function<void ()>> _thread_loops; // for dpdk
-    static compat::optional<boost::barrier> _all_event_loops_done;
-    static std::vector<reactor*> _reactors;
-    struct qs_deleter {
-      void operator()(smp_message_queue** qs) const;
-    };
-    static std::unique_ptr<smp_message_queue*[], qs_deleter> _qs;
-    static std::thread::id _tmain;
-    static bool _using_dpdk;
-
-    template <typename Func>
-    using returns_future = is_future<std::result_of_t<Func()>>;
-    template <typename Func>
-    using returns_void = std::is_same<std::result_of_t<Func()>, void>;
-public:
-    static boost::program_options::options_description get_options_description();
-    static void configure(boost::program_options::variables_map vm);
-    static void cleanup();
-    static void cleanup_cpu();
-    static void arrive_at_event_loop_end();
-    static void join_all();
-    static bool main_thread() { return std::this_thread::get_id() == _tmain; }
-
-    /// Runs a function on a remote core.
-    ///
-    /// \param t designates the core to run the function on (may be a remote
-    ///          core or the local core).
-    /// \param func a callable to run on core \c t.  If \c func is a temporary object,
-    ///          its lifetime will be extended by moving it.  If @func is a reference,
-    ///          the caller must guarantee that it will survive the call.
-    /// \return whatever \c func returns, as a future<> (if \c func does not return a future,
-    ///         submit_to() will wrap it in a future<>).
-    template <typename Func>
-    static futurize_t<std::result_of_t<Func()>> submit_to(unsigned t, Func&& func) {
-        using ret_type = std::result_of_t<Func()>;
-        if (t == engine().cpu_id()) {
-            try {
-                if (!is_future<ret_type>::value) {
-                    // Non-deferring function, so don't worry about func lifetime
-                    return futurize<ret_type>::apply(std::forward<Func>(func));
-                } else if (std::is_lvalue_reference<Func>::value) {
-                    // func is an lvalue, so caller worries about its lifetime
-                    return futurize<ret_type>::apply(func);
-                } else {
-                    // Deferring call on rvalue function, make sure to preserve it across call
-                    auto w = std::make_unique<std::decay_t<Func>>(std::move(func));
-                    auto ret = futurize<ret_type>::apply(*w);
-                    return ret.finally([w = std::move(w)] {});
-                }
-            } catch (...) {
-                // Consistently return a failed future rather than throwing, to simplify callers
-                return futurize<std::result_of_t<Func()>>::make_exception_future(std::current_exception());
-            }
-        } else {
-            return _qs[t][engine().cpu_id()].submit(std::forward<Func>(func));
-        }
-    }
-    static bool poll_queues();
-    static bool pure_poll_queues();
-    static boost::integer_range<unsigned> all_cpus() {
-        return boost::irange(0u, count);
-    }
-    // Invokes func on all shards.
-    // The returned future resolves when all async invocations finish.
-    // The func may return void or future<>.
-    // Each async invocation will work with a separate copy of func.
-    template<typename Func>
-    static future<> invoke_on_all(Func&& func) {
-        static_assert(std::is_same<future<>, typename futurize<std::result_of_t<Func()>>::type>::value, "bad Func signature");
-        return parallel_for_each(all_cpus(), [&func] (unsigned id) {
-            return smp::submit_to(id, Func(func));
-        });
-    }
-private:
-    static void start_all_queues();
-    static void pin(unsigned cpu_id);
-    static void allocate_reactor(unsigned id, reactor_backend_selector rbs);
-    static void create_thread(std::function<void ()> thread_loop);
-public:
-    static unsigned count;
-};
-
-inline
-pollable_fd_state::~pollable_fd_state() {
-    engine().forget(*this);
-}
-
 inline
 size_t iovec_len(const iovec* begin, size_t len)
 {
@@ -917,90 +754,12 @@ size_t iovec_len(const iovec* begin, size_t len)
     return ret;
 }
 
-template <typename Clock>
-inline
-timer<Clock>::timer(callback_t&& callback) : _callback(std::move(callback)) {
-}
-
-template <typename Clock>
-inline
-timer<Clock>::~timer() {
-    if (_queued) {
-        engine().del_timer(this);
-    }
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::set_callback(callback_t&& callback) {
-    _callback = std::move(callback);
+inline int hrtimer_signal() {
+    // We don't want to use SIGALRM, because the boost unit test library
+    // also plays with it.
+    return SIGRTMIN;
 }
 
-template <typename Clock>
-inline
-void timer<Clock>::arm_state(time_point until, compat::optional<duration> period) {
-    assert(!_armed);
-    _period = period;
-    _armed = true;
-    _expired = false;
-    _expiry = until;
-    _queued = true;
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::arm(time_point until, compat::optional<duration> period) {
-    arm_state(until, period);
-    engine().add_timer(this);
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::rearm(time_point until, compat::optional<duration> period) {
-    if (_armed) {
-        cancel();
-    }
-    arm(until, period);
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::arm(duration delta) {
-    return arm(Clock::now() + delta);
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::arm_periodic(duration delta) {
-    arm(Clock::now() + delta, {delta});
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::readd_periodic() {
-    arm_state(Clock::now() + _period.value(), {_period.value()});
-    engine().queue_timer(this);
-}
-
-template <typename Clock>
-inline
-bool timer<Clock>::cancel() {
-    if (!_armed) {
-        return false;
-    }
-    _armed = false;
-    if (_queued) {
-        engine().del_timer(this);
-        _queued = false;
-    }
-    return true;
-}
-
-template <typename Clock>
-inline
-typename timer<Clock>::time_point timer<Clock>::get_timeout() {
-    return _expiry;
-}
 
 extern logger seastar_logger;