]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/src/core/reactor.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / core / reactor.cc
index 11a9125e66854e85f712aa80b08908d4d2182c2a..79bdbf29916e1e8e4203fe2b5e6a9cd20435d732 100644 (file)
@@ -54,6 +54,7 @@
 #include <seastar/core/internal/buffer_allocator.hh>
 #include <seastar/core/scheduling_specific.hh>
 #include <seastar/util/log.hh>
+#include <seastar/util/read_first_line.hh>
 #include "core/file-impl.hh"
 #include "core/reactor_backend.hh"
 #include "core/syscall_result.hh"
@@ -83,6 +84,7 @@
 #include <dirent.h>
 #include <linux/types.h> // for xfs, below
 #include <sys/ioctl.h>
+#include <linux/perf_event.h>
 #include <xfs/linux.h>
 #define min min    /* prevent xfs.h from defining min() as a macro */
 #include <xfs/xfs.h>
@@ -145,7 +147,9 @@ struct mountpoint_params {
     uint64_t write_bytes_rate = std::numeric_limits<uint64_t>::max();
     uint64_t read_req_rate = std::numeric_limits<uint64_t>::max();
     uint64_t write_req_rate = std::numeric_limits<uint64_t>::max();
-    uint64_t num_io_queues = 0; // calculated
+    uint64_t read_saturation_length = std::numeric_limits<uint64_t>::max();
+    uint64_t write_saturation_length = std::numeric_limits<uint64_t>::max();
+    bool duplex = false;
 };
 
 }
@@ -160,6 +164,15 @@ struct convert<seastar::mountpoint_params> {
         mp.read_req_rate = parse_memory_size(node["read_iops"].as<std::string>());
         mp.write_bytes_rate = parse_memory_size(node["write_bandwidth"].as<std::string>());
         mp.write_req_rate = parse_memory_size(node["write_iops"].as<std::string>());
+        if (node["read_saturation_length"]) {
+            mp.read_saturation_length = parse_memory_size(node["read_saturation_length"].as<std::string>());
+        }
+        if (node["write_saturation_length"]) {
+            mp.write_saturation_length = parse_memory_size(node["write_saturation_length"].as<std::string>());
+        }
+        if (node["duplex"]) {
+            mp.duplex = node["duplex"].as<bool>();
+        }
         return true;
     }
 };
@@ -177,42 +190,30 @@ shard_id reactor::cpu_id() const {
 
 io_priority_class
 reactor::register_one_priority_class(sstring name, uint32_t shares) {
-    return io_queue::register_one_priority_class(std::move(name), shares);
+    return io_priority_class::register_one(std::move(name), shares);
 }
 
 future<>
 reactor::update_shares_for_class(io_priority_class pc, uint32_t shares) {
-    return parallel_for_each(_io_queues, [pc, shares] (auto& queue) {
-        return queue.second->update_shares_for_class(pc, shares);
-    });
+    return pc.update_shares(shares);
 }
 
 future<>
 reactor::rename_priority_class(io_priority_class pc, sstring new_name) noexcept {
+    return pc.rename(std::move(new_name));
+}
 
-    return futurize_invoke([pc, new_name = std::move(new_name)] () mutable {
-        // Taking the lock here will prevent from newly registered classes
-        // to register under the old name (and will prevent undefined
-        // behavior since this array is shared cross shards. However, it
-        // doesn't prevent the case where a newly registered class (that
-        // got registered right after the lock release) will be unnecessarily
-        // renamed. This is not a real problem and it is a lot better than
-        // holding the lock until all cross shard activity is over.
+future<> reactor::update_shares_for_queues(io_priority_class pc, uint32_t shares) {
+    return parallel_for_each(_io_queues, [pc, shares] (auto& queue) {
+        return queue.second->update_shares_for_class(pc, shares);
+    });
+}
 
-        try {
-            if (!io_queue::rename_one_priority_class(pc, new_name)) {
-                return make_ready_future<>();
-            }
-        } catch (...) {
-            sched_logger.error("exception while trying to rename priority group with id {} to \"{}\" ({})",
-                    pc.id(), new_name, std::current_exception());
-            std::rethrow_exception(std::current_exception());
+future<> reactor::rename_queues(io_priority_class pc, sstring new_name) noexcept {
+    return futurize_invoke([this, pc, new_name = std::move(new_name)] {
+        for (auto&& queue : _io_queues) {
+            queue.second->rename_priority_class(pc, new_name);
         }
-        return smp::invoke_on_all([pc, new_name = std::move(new_name)] {
-            for (auto&& queue : engine()._io_queues) {
-                queue.second->rename_priority_class(pc, new_name);
-            }
-        });
     });
 }
 
@@ -477,6 +478,10 @@ future<size_t> pollable_fd_state::sendto(socket_address addr, const void* buf, s
 
 namespace internal {
 
+void set_need_preempt_var(const preemption_monitor* np) {
+    get_need_preempt_var() = np;
+}
+
 #ifdef SEASTAR_TASK_HISTOGRAM
 
 class task_histogram {
@@ -530,8 +535,9 @@ constexpr std::chrono::milliseconds lowres_clock_impl::_granularity;
 constexpr unsigned reactor::max_queues;
 constexpr unsigned reactor::max_aio_per_queue;
 
-// Broken (returns spurious EIO). Cause/fix unknown.
-bool aio_nowait_supported = false;
+// Base version where this works; some filesystems were only fixed later, so
+// this value is mixed in with filesystem-provided values later.
+bool aio_nowait_supported = kernel_uname().whitelisted({"4.13"});
 
 static bool sched_debug() {
     return false;
@@ -692,6 +698,23 @@ void reactor::handle_signal(int signo, noncopyable_function<void ()>&& handler)
     _signals.handle_signal(signo, std::move(handler));
 }
 
+// Fills a buffer with a hexadecimal representation of an integer
+// and returns a pointer to the first character.
+// For example, convert_hex_safe(buf, 4, uint16_t(12)) fills the buffer with "   c".
+template<typename Integral>
+SEASTAR_CONCEPT( requires std::is_integral_v<Integral> )
+char* convert_hex_safe(char *buf, size_t bufsz, Integral n) noexcept {
+    const char *digits = "0123456789abcdef";
+    memset(buf, ' ', bufsz);
+    auto* p = buf + bufsz;
+    do {
+        assert(p > buf);
+        *--p = digits[n & 0xf];
+        n >>= 4;
+    } while (n);
+    return p;
+}
+
 // Accumulates an in-memory backtrace and flush to stderr eventually.
 // Async-signal safe.
 class backtrace_buffer {
@@ -704,10 +727,15 @@ public:
         _pos = 0;
     }
 
-    void append(const char* str, size_t len) noexcept {
+    void reserve(size_t len) noexcept {
+        assert(len < _max_size);
         if (_pos + len >= _max_size) {
             flush();
         }
+    }
+
+    void append(const char* str, size_t len) noexcept {
+        reserve(len);
         memcpy(_buf + _pos, str, len);
         _pos += len;
     }
@@ -724,8 +752,8 @@ public:
     template <typename Integral>
     void append_hex(Integral ptr) noexcept {
         char buf[sizeof(ptr) * 2];
-        convert_zero_padded_hex_safe(buf, sizeof(buf), ptr);
-        append(buf, sizeof(buf));
+        auto p = convert_hex_safe(buf, sizeof(buf), ptr);
+        append(p, (buf + sizeof(buf)) - p);
     }
 
     void append_backtrace() noexcept {
@@ -741,23 +769,37 @@ public:
             append("\n");
         });
     }
+
+    void append_backtrace_oneline() noexcept {
+        backtrace([this] (frame f) noexcept {
+            reserve(3 + sizeof(f.addr) * 2);
+            append(" 0x");
+            append_hex(f.addr);
+        });
+    }
 };
 
-static void print_with_backtrace(backtrace_buffer& buf) noexcept {
+static void print_with_backtrace(backtrace_buffer& buf, bool oneline) noexcept {
     if (local_engine) {
         buf.append(" on shard ");
         buf.append_decimal(this_shard_id());
     }
 
+  if (!oneline) {
     buf.append(".\nBacktrace:\n");
     buf.append_backtrace();
+  } else {
+    buf.append(". Backtrace:");
+    buf.append_backtrace_oneline();
+    buf.append("\n");
+  }
     buf.flush();
 }
 
-static void print_with_backtrace(const char* cause) noexcept {
+static void print_with_backtrace(const char* cause, bool oneline = false) noexcept {
     backtrace_buffer buf;
     buf.append(cause);
-    print_with_backtrace(buf);
+    print_with_backtrace(buf, oneline);
 }
 
 // Installs signal handler stack for current thread.
@@ -773,7 +815,7 @@ static decltype(auto) install_signal_handler_stack() {
     stack.ss_size = size;
     auto r = sigaltstack(&stack, &prev_stack);
     throw_system_error_on(r == -1);
-    return defer([mem = std::move(mem), prev_stack] () mutable {
+    return defer([mem = std::move(mem), prev_stack] () mutable noexcept {
         try {
             auto r = sigaltstack(&prev_stack, NULL);
             throw_system_error_on(r == -1);
@@ -788,7 +830,7 @@ reactor::task_queue::task_queue(unsigned id, sstring name, float shares)
         : _shares(std::max(shares, 1.0f))
         , _reciprocal_shares_times_2_power_32((uint64_t(1) << 32) / _shares)
         , _id(id)
-        , _ts(std::chrono::steady_clock::now())
+        , _ts(now())
         , _name(name) {
     register_stats();
 }
@@ -876,8 +918,10 @@ struct reactor::task_queue::indirect_compare {
     }
 };
 
-reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
-    : _cfg(cfg)
+reactor::reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg)
+    : _smp(std::move(smp))
+    , _alien(alien)
+    , _cfg(cfg)
     , _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC))
     , _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
     , _id(id)
@@ -887,7 +931,7 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
     , _engine_thread(sched::thread::current())
 #endif
     , _cpu_started(0)
-    , _cpu_stall_detector(std::make_unique<cpu_stall_detector>())
+    , _cpu_stall_detector(make_cpu_stall_detector())
     , _reuseport(posix_reuseport_detect())
     , _thread_pool(std::make_unique<thread_pool>(this, seastar::format("syscall-{}", id))) {
     /*
@@ -895,12 +939,12 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
      * the chosen backend constructor may want to handle signals and thus
      * needs the _signals._signal_handlers map to be initialized.
      */
-    _backend = rbs.create(this);
+    _backend = rbs.create(*this);
     *internal::get_scheduling_group_specific_thread_local_data_ptr() = &_scheduling_group_specific_data;
     _task_queues.push_back(std::make_unique<task_queue>(0, "main", 1000));
     _task_queues.push_back(std::make_unique<task_queue>(1, "atexit", 1000));
     _at_destroy_tasks = _task_queues.back().get();
-    g_need_preempt = &_preemption_monitor;
+    set_need_preempt_var(&_preemption_monitor);
     seastar::thread_impl::init();
     _backend->start_tick();
 
@@ -958,6 +1002,13 @@ reactor::~reactor() {
     }
 }
 
+reactor::sched_stats
+reactor::get_sched_stats() const {
+    sched_stats ret;
+    ret.tasks_processed = tasks_processed();
+    return ret;
+}
+
 future<> reactor::readable(pollable_fd_state& fd) {
     return _backend->readable(fd);
 }
@@ -1016,6 +1067,16 @@ cpu_stall_detector::cpu_stall_detector(cpu_stall_detector_config cfg)
     // a safe place.
     backtrace([] (frame) {});
     update_config(cfg);
+
+    namespace sm = seastar::metrics;
+
+    _metrics.add_group("stall_detector", {
+            sm::make_derive("reported", _total_reported, sm::description("Total number of reported stalls, look in the traces for the exact reason"))});
+
+    // note: if something is added here that can, it should take care to destroy _timer.
+}
+
+cpu_stall_detector_posix_timer::cpu_stall_detector_posix_timer(cpu_stall_detector_config cfg) : cpu_stall_detector(cfg) {
     struct sigevent sev = {};
     sev.sigev_notify = SIGEV_THREAD_ID;
     sev.sigev_signo = signal_number();
@@ -1024,17 +1085,9 @@ cpu_stall_detector::cpu_stall_detector(cpu_stall_detector_config cfg)
     if (err) {
         throw std::system_error(std::error_code(err, std::system_category()));
     }
-
-    namespace sm = seastar::metrics;
-
-    _metrics.add_group("stall_detector", {
-            sm::make_derive("reported", _total_reported, sm::description("Total number of reported stalls, look in the traces for the exact reason"))});
-
-
-    // note: if something is added here that can, it should take care to destroy _timer.
 }
 
-cpu_stall_detector::~cpu_stall_detector() {
+cpu_stall_detector_posix_timer::~cpu_stall_detector_posix_timer() {
     timer_delete(_timer);
 }
 
@@ -1045,11 +1098,11 @@ cpu_stall_detector::get_config() const {
 
 void cpu_stall_detector::update_config(cpu_stall_detector_config cfg) {
     _config = cfg;
-    _threshold = std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold);
-    _slack = std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold * cfg.slack);
+    _threshold = std::chrono::duration_cast<sched_clock::duration>(cfg.threshold);
+    _slack = std::chrono::duration_cast<sched_clock::duration>(cfg.threshold * cfg.slack);
     _stall_detector_reports_per_minute = cfg.stall_detector_reports_per_minute;
     _max_reports_per_minute = cfg.stall_detector_reports_per_minute;
-    _rearm_timer_at = std::chrono::steady_clock::now();
+    _rearm_timer_at = reactor::now();
 }
 
 void cpu_stall_detector::maybe_report() {
@@ -1063,6 +1116,9 @@ void cpu_stall_detector::maybe_report() {
 //
 // We can do it a cheaper if we don't report suppressed backtraces.
 void cpu_stall_detector::on_signal() {
+    if (reap_event_and_check_spuriousness()) {
+        return;
+    }
     auto tasks_processed = engine().tasks_processed();
     auto last_seen = _last_tasks_processed_seen.load(std::memory_order_relaxed);
     if (!last_seen) {
@@ -1076,7 +1132,7 @@ void cpu_stall_detector::on_signal() {
     arm_timer();
 }
 
-void cpu_stall_detector::report_suppressions(std::chrono::steady_clock::time_point now) {
+void cpu_stall_detector::report_suppressions(sched_clock::time_point now) {
     if (now > _minute_mark + 60s) {
         if (_reported > _max_reports_per_minute) {
             auto suppressed = _reported - _max_reports_per_minute;
@@ -1095,12 +1151,12 @@ void cpu_stall_detector::report_suppressions(std::chrono::steady_clock::time_poi
     }
 }
 
-void cpu_stall_detector::arm_timer() {
+void cpu_stall_detector_posix_timer::arm_timer() {
     auto its = posix::to_relative_itimerspec(_threshold * _report_at + _slack, 0s);
     timer_settime(_timer, 0, &its, nullptr);
 }
 
-void cpu_stall_detector::start_task_run(std::chrono::steady_clock::time_point now) {
+void cpu_stall_detector::start_task_run(sched_clock::time_point now) {
     if (now > _rearm_timer_at) {
         report_suppressions(now);
         _report_at = 1;
@@ -1112,55 +1168,156 @@ void cpu_stall_detector::start_task_run(std::chrono::steady_clock::time_point no
     std::atomic_signal_fence(std::memory_order_release); // Don't delay this write, so the signal handler can see it
 }
 
-void cpu_stall_detector::end_task_run(std::chrono::steady_clock::time_point now) {
+void cpu_stall_detector::end_task_run(sched_clock::time_point now) {
     std::atomic_signal_fence(std::memory_order_acquire); // Don't hoist this write, so the signal handler can see it
     _last_tasks_processed_seen.store(0, std::memory_order_relaxed);
 }
 
-void cpu_stall_detector::start_sleep() {
+void cpu_stall_detector_posix_timer::start_sleep() {
     auto its = posix::to_relative_itimerspec(0s,  0s);
     timer_settime(_timer, 0, &its, nullptr);
-    _rearm_timer_at = std::chrono::steady_clock::now();
+    _rearm_timer_at = reactor::now();
 }
 
 void cpu_stall_detector::end_sleep() {
 }
 
+static long
+perf_event_open(struct perf_event_attr* hw_event, pid_t pid, int cpu, int group_fd, unsigned long flags) {
+    return syscall(__NR_perf_event_open, hw_event, pid, cpu, group_fd, flags);
+}
+
+cpu_stall_detector_linux_perf_event::cpu_stall_detector_linux_perf_event(file_desc fd, cpu_stall_detector_config cfg)
+        : cpu_stall_detector(cfg), _fd(std::move(fd)) {
+    void* ret = ::mmap(nullptr, 2*getpagesize(), PROT_READ|PROT_WRITE, MAP_SHARED, _fd.get(), 0);
+    if (ret == MAP_FAILED) {
+        abort();
+    }
+    _mmap = static_cast<struct ::perf_event_mmap_page*>(ret);
+    _data_area = reinterpret_cast<char*>(_mmap) + getpagesize();
+    _data_area_mask = getpagesize() - 1;
+}
+
+cpu_stall_detector_linux_perf_event::~cpu_stall_detector_linux_perf_event() {
+    ::munmap(_mmap, 2*getpagesize());
+}
+
+void
+cpu_stall_detector_linux_perf_event::arm_timer() {
+    uint64_t ns = (_threshold * _report_at + _slack) / 1ns;
+    if (_enabled && _current_period == ns) [[likely]] {
+        // Common case - we're re-arming with the same period, the counter
+        // is already enabled.
+        _fd.ioctl(PERF_EVENT_IOC_RESET, 0);
+    } else {
+        // Uncommon case - we're moving from disabled to enabled, or changing
+        // the period. Issue more calls and be careful.
+        _fd.ioctl(PERF_EVENT_IOC_DISABLE, 0); // avoid false alarms while we modify stuff
+        _fd.ioctl(PERF_EVENT_IOC_PERIOD, ns);
+        _fd.ioctl(PERF_EVENT_IOC_RESET, 0);
+        _fd.ioctl(PERF_EVENT_IOC_ENABLE, 0);
+        _enabled = true;
+        _current_period = ns;
+    }
+}
+
 void
-reactor::task_quota_timer_thread_fn() {
-    auto thread_name = seastar::format("timer-{}", _id);
-    pthread_setname_np(pthread_self(), thread_name.c_str());
+cpu_stall_detector_linux_perf_event::start_sleep() {
+    _fd.ioctl(PERF_EVENT_IOC_DISABLE, 0);
+    _enabled = false;
+}
 
-    sigset_t mask;
-    sigfillset(&mask);
-    for (auto sig : { SIGSEGV }) {
-        sigdelset(&mask, sig);
+bool
+cpu_stall_detector_linux_perf_event::reap_event_and_check_spuriousness() {
+    struct read_format {
+        uint64_t value;
+    } buf;
+    _fd.read(&buf, sizeof(read_format));
+    return buf.value < _current_period;
+}
+
+void
+cpu_stall_detector_linux_perf_event::maybe_report_kernel_trace() {
+    data_area_reader reader(*this);
+    auto current_record = [&] () -> ::perf_event_header {
+        return reader.read_struct<perf_event_header>();
+    };
+
+    while (reader.have_data()) {
+        auto record = current_record();
+
+        if (record.type != PERF_RECORD_SAMPLE) {
+            reader.skip(record.size - sizeof(record));
+            continue;
+        }
+
+        auto nr = reader.read_u64();
+        backtrace_buffer buf;
+        buf.append("kernel callstack:");
+        for (uint64_t i = 0; i < nr; ++i) {
+            buf.append(" 0x");
+            buf.append_hex(uintptr_t(reader.read_u64()));
+        }
+        buf.append("\n");
+        buf.flush();
+    };
+}
+
+std::unique_ptr<cpu_stall_detector_linux_perf_event>
+cpu_stall_detector_linux_perf_event::try_make(cpu_stall_detector_config cfg) {
+    ::perf_event_attr pea = {
+        .type = PERF_TYPE_SOFTWARE,
+        .size = sizeof(pea),
+        .config = PERF_COUNT_SW_TASK_CLOCK, // more likely to work on virtual machines than hardware events
+        .sample_period = 1'000'000'000, // Needs non-zero value or PERF_IOC_PERIOD gets confused
+        .sample_type = PERF_SAMPLE_CALLCHAIN,
+        .disabled = 1,
+        .exclude_callchain_user = 1,  // we're using backtrace() to capture the user callchain
+        .wakeup_events = 1,
+    };
+    unsigned long flags = 0;
+    if (kernel_uname().whitelisted({"3.14"})) {
+        flags |= PERF_FLAG_FD_CLOEXEC;
+    }
+    int fd = perf_event_open(&pea, 0, -1, -1, flags);
+    if (fd == -1) {
+        throw std::system_error(errno, std::system_category(), "perf_event_open() failed");
+    }
+    auto desc = file_desc::from_fd(fd);
+    struct f_owner_ex sig_owner = {
+        .type = F_OWNER_TID,
+        .pid = static_cast<pid_t>(syscall(SYS_gettid)),
+    };
+    auto ret1 = ::fcntl(fd, F_SETOWN_EX, &sig_owner);
+    if (ret1 == -1) {
+        abort();
     }
-    auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
-    if (r) {
-        seastar_logger.error("Thread {}: failed to block signals. Aborting.", thread_name.c_str());
+    auto ret2 = ::fcntl(fd, F_SETSIG, signal_number());
+    if (ret2 == -1) {
         abort();
     }
-
-    // We need to wait until task quota is set before we can calculate how many ticks are to
-    // a minute. Technically task_quota is used from many threads, but since it is read-only here
-    // and only used during initialization we will avoid complicating the code.
-    {
-        uint64_t events;
-        _task_quota_timer.read(&events, 8);
-        request_preemption();
+    auto fd_flags = ::fcntl(fd, F_GETFL);
+    if (fd_flags == -1) {
+        abort();
+    }
+    auto ret3 = ::fcntl(fd, F_SETFL, fd_flags | O_ASYNC);
+    if (ret3 == -1) {
+        abort();
     }
+    return std::make_unique<cpu_stall_detector_linux_perf_event>(std::move(desc), std::move(cfg));
+}
 
-    while (!_dying.load(std::memory_order_relaxed)) {
-        uint64_t events;
-        _task_quota_timer.read(&events, 8);
-        request_preemption();
 
-        // We're in a different thread, but guaranteed to be on the same core, so even
-        // a signal fence is overdoing it
-        std::atomic_signal_fence(std::memory_order_seq_cst);
+std::unique_ptr<cpu_stall_detector>
+internal::make_cpu_stall_detector(cpu_stall_detector_config cfg) {
+    try {
+        return cpu_stall_detector_linux_perf_event::try_make(cfg);
+    } catch (...) {
+        seastar_logger.warn("Creation of perf_event based stall detector failed, falling back to posix timer: {}", std::current_exception());
+        return std::make_unique<cpu_stall_detector_posix_timer>(cfg);
     }
 }
+
 void
 reactor::update_blocked_reactor_notify_ms(std::chrono::milliseconds ms) {
     auto cfg = _cpu_stall_detector->get_config();
@@ -1196,7 +1353,7 @@ reactor::block_notifier(int) {
 
 void
 cpu_stall_detector::generate_trace() {
-    auto delta = std::chrono::steady_clock::now() - _run_started_at;
+    auto delta = reactor::now() - _run_started_at;
 
     _total_reported++;
     if (_config.report) {
@@ -1208,7 +1365,8 @@ cpu_stall_detector::generate_trace() {
     buf.append("Reactor stalled for ");
     buf.append_decimal(uint64_t(delta / 1ms));
     buf.append(" ms");
-    print_with_backtrace(buf);
+    print_with_backtrace(buf, _config.oneline);
+    maybe_report_kernel_trace();
 }
 
 template <typename T, typename E, typename EnableFunc>
@@ -1276,68 +1434,46 @@ void reactor::set_timer(sched::timer &tmr, s64 t) {
 }
 #endif
 
-class network_stack_registry {
-public:
-    using options = boost::program_options::variables_map;
-private:
-    static std::unordered_map<sstring,
-            noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)>>& _map() {
-        static std::unordered_map<sstring,
-                noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)>> map;
-        return map;
-    }
-    static sstring& _default() {
-        static sstring def;
-        return def;
-    }
+class network_stack_factory {
+    network_stack_entry::factory_func _func;
+
 public:
-    static boost::program_options::options_description& options_description() {
-        static boost::program_options::options_description opts;
-        return opts;
-    }
-    static void register_stack(sstring name, boost::program_options::options_description opts,
-        noncopyable_function<future<std::unique_ptr<network_stack>>(options opts)> create,
-        bool make_default);
-    static sstring default_stack();
-    static std::vector<sstring> list();
-    static future<std::unique_ptr<network_stack>> create(options opts);
-    static future<std::unique_ptr<network_stack>> create(sstring name, options opts);
+    network_stack_factory(noncopyable_function<future<std::unique_ptr<network_stack>> (const program_options::option_group&)> func)
+        : _func(std::move(func)) { }
+    future<std::unique_ptr<network_stack>> operator()(const program_options::option_group& opts) { return _func(opts); }
 };
 
-void reactor::configure(boost::program_options::variables_map vm) {
-    _network_stack_ready = vm.count("network-stack")
-        ? network_stack_registry::create(sstring(vm["network-stack"].as<std::string>()), vm)
-        : network_stack_registry::create(vm);
+void reactor::configure(const reactor_options& opts) {
+    _network_stack_ready = opts.network_stack.get_selected_candidate()(*opts.network_stack.get_selected_candidate_opts());
 
-    _handle_sigint = !vm.count("no-handle-interrupt");
-    auto task_quota = vm["task-quota-ms"].as<double>() * 1ms;
+    _handle_sigint = !opts.no_handle_interrupt;
+    auto task_quota = opts.task_quota_ms.get_value() * 1ms;
     _task_quota = std::chrono::duration_cast<sched_clock::duration>(task_quota);
 
-    auto blocked_time = vm["blocked-reactor-notify-ms"].as<unsigned>() * 1ms;
+    auto blocked_time = opts.blocked_reactor_notify_ms.get_value() * 1ms;
     cpu_stall_detector_config csdc;
     csdc.threshold = blocked_time;
-    csdc.stall_detector_reports_per_minute = vm["blocked-reactor-reports-per-minute"].as<unsigned>();
+    csdc.stall_detector_reports_per_minute = opts.blocked_reactor_reports_per_minute.get_value();
+    csdc.oneline = opts.blocked_reactor_report_format_oneline.get_value();
     _cpu_stall_detector->update_config(csdc);
 
-    _max_task_backlog = vm["max-task-backlog"].as<unsigned>();
-    _max_poll_time = vm["idle-poll-time-us"].as<unsigned>() * 1us;
-    if (vm.count("poll-mode")) {
+    _max_task_backlog = opts.max_task_backlog.get_value();
+    _max_poll_time = opts.idle_poll_time_us.get_value() * 1us;
+    if (opts.poll_mode) {
         _max_poll_time = std::chrono::nanoseconds::max();
     }
-    if (vm.count("overprovisioned")
-           && vm["idle-poll-time-us"].defaulted()
-           && !vm.count("poll-mode")) {
+    if (opts.overprovisioned && opts.idle_poll_time_us.defaulted() && !opts.poll_mode) {
         _max_poll_time = 0us;
     }
-    set_strict_dma(!vm.count("relaxed-dma"));
-    if (!vm["poll-aio"].as<bool>()
-            || (vm["poll-aio"].defaulted() && vm.count("overprovisioned"))) {
+    set_strict_dma(!opts.relaxed_dma);
+    if (!opts.poll_aio.get_value() || (opts.poll_aio.defaulted() && opts.overprovisioned)) {
         _aio_eventfd = pollable_fd(file_desc::eventfd(0, 0));
     }
-    set_bypass_fsync(vm["unsafe-bypass-fsync"].as<bool>());
-    _force_io_getevents_syscall = vm["force-aio-syscalls"].as<bool>();
-    aio_nowait_supported = vm["linux-aio-nowait"].as<bool>();
-    _have_aio_fsync = vm["aio-fsync"].as<bool>();
+    set_bypass_fsync(opts.unsafe_bypass_fsync.get_value());
+    _kernel_page_cache = opts.kernel_page_cache.get_value();
+    _force_io_getevents_syscall = opts.force_aio_syscalls.get_value();
+    aio_nowait_supported = opts.linux_aio_nowait.get_value();
+    _have_aio_fsync = opts.aio_fsync.get_value();
 }
 
 pollable_fd
@@ -1517,24 +1653,27 @@ void io_completion::complete_with(ssize_t res) {
     }
 }
 
-void
-reactor::submit_io(io_completion* desc, io_request req) noexcept {
-    req.attach_kernel_completion(desc);
-    try {
-        _pending_io.push_back(std::move(req));
-    } catch (...) {
-        desc->set_exception(std::current_exception());
-    }
-}
-
 bool
 reactor::flush_pending_aio() {
-    for (auto& ioq : my_io_queues) {
-        ioq->poll_io_queue();
+    for (auto& ioq : _io_queues) {
+        ioq.second->poll_io_queue();
     }
     return false;
 }
 
+steady_clock_type::time_point reactor::next_pending_aio() const noexcept {
+    steady_clock_type::time_point next = steady_clock_type::time_point::max();
+
+    for (auto& ioq : _io_queues) {
+        steady_clock_type::time_point n = ioq.second->next_pending_aio();
+        if (n < next) {
+            next = std::move(n);
+        }
+    }
+
+    return next;
+}
+
 bool
 reactor::reap_kernel_completions() {
     return _backend->reap_kernel_completions();
@@ -1542,23 +1681,23 @@ reactor::reap_kernel_completions() {
 
 const io_priority_class& default_priority_class() {
     static thread_local auto shard_default_class = [] {
-        return engine().register_one_priority_class("default", 1);
+        return io_priority_class::register_one("default", 1);
     }();
     return shard_default_class;
 }
 
 future<size_t>
-reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
+reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req, io_intent* intent) noexcept {
     ++_io_stats.aio_reads;
     _io_stats.aio_read_bytes += len;
-    return ioq->queue_request(pc, len, std::move(req));
+    return ioq->queue_request(pc, len, std::move(req), intent);
 }
 
 future<size_t>
-reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
+reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req, io_intent* intent) noexcept {
     ++_io_stats.aio_writes;
     _io_stats.aio_write_bytes += len;
-    return ioq->queue_request(pc, len, std::move(req));
+    return ioq->queue_request(pc, len, std::move(req), intent);
 }
 
 namespace internal {
@@ -1586,10 +1725,11 @@ future<file>
 reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_options options) noexcept {
     return do_with(static_cast<int>(flags), std::move(options), [this, nameref] (auto& open_flags, file_open_options& options) {
         sstring name(nameref);
-        return _thread_pool->submit<syscall_result<int>>([name, &open_flags, &options, strict_o_direct = _strict_o_direct, bypass_fsync = _bypass_fsync] () mutable {
-            // We want O_DIRECT, except in two cases:
+        return _thread_pool->submit<syscall_result<int>>([this, name, &open_flags, &options, strict_o_direct = _strict_o_direct, bypass_fsync = _bypass_fsync] () mutable {
+            // We want O_DIRECT, except in three cases:
             //   - tmpfs (which doesn't support it, but works fine anyway)
             //   - strict_o_direct == false (where we forgive it being not supported)
+            //   - _kernel_page_cache == true (where we disable it for short-lived test processes)
             // Because open() with O_DIRECT will fail, we open it without O_DIRECT, try
             // to update it to O_DIRECT with fcntl(), and if that fails, see if we
             // can forgive it.
@@ -1610,7 +1750,8 @@ reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_opt
             if (fd == -1) {
                 return wrap_syscall<int>(fd);
             }
-            int r = ::fcntl(fd, F_SETFL, open_flags | O_DIRECT);
+            int o_direct_flag = _kernel_page_cache ? 0 : O_DIRECT;
+            int r = ::fcntl(fd, F_SETFL, open_flags | o_direct_flag);
             auto maybe_ret = wrap_syscall<int>(r);  // capture errno (should be EINVAL)
             if (r == -1  && strict_o_direct && !is_tmpfs(fd)) {
                 ::close(fd);
@@ -1618,9 +1759,14 @@ reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_opt
             }
             if (fd != -1) {
                 fsxattr attr = {};
-                if (options.extent_allocation_size_hint) {
+                // xfs delayed allocation is disabled when extent size hints are present.
+                // This causes tons of xfs log fsyncs. Given that extent size hints are
+                // unneeded when delayed allocation is available (which is the case
+                // when not using O_DIRECT), disable them.
+                if (options.extent_allocation_size_hint && !_kernel_page_cache) {
                     attr.fsx_xflags |= XFS_XFLAG_EXTSIZE;
-                    attr.fsx_extsize = options.extent_allocation_size_hint;
+                    attr.fsx_extsize = std::min(options.extent_allocation_size_hint,
+                                        file_open_options::max_extent_allocation_size_hint);
                 }
                 // Ignore error; may be !xfs, and just a hint anyway
                 ::ioctl(fd, XFS_IOC_FSSETXATTR, &attr);
@@ -1971,7 +2117,7 @@ reactor::fdatasync(int fd) noexcept {
             auto desc = new fsync_io_desc;
             auto fut = desc->get_future();
             auto req = io_request::make_fdatasync(fd);
-            submit_io(desc, std::move(req));
+            _io_sink.submit(desc, std::move(req));
             return fut;
         });
     }
@@ -2072,7 +2218,7 @@ future<> reactor::run_exit_tasks() {
 
 void reactor::stop() {
     assert(_id == 0);
-    smp::cleanup_cpu();
+    _smp->cleanup_cpu();
     if (!_stopping) {
         // Run exit tasks locally and then stop all other engines
         // in the background and wait on semaphore for all to complete.
@@ -2081,7 +2227,7 @@ void reactor::stop() {
             return do_with(semaphore(0), [this] (semaphore& sem) {
                 // Stop other cpus asynchronously, signal when done.
                 (void)smp::invoke_on_others(0, [] {
-                    smp::cleanup_cpu();
+                    engine()._smp->cleanup_cpu();
                     return engine().run_exit_tasks().then([] {
                         engine()._stopped = true;
                     });
@@ -2153,9 +2299,9 @@ void reactor::register_metrics() {
             sm::make_derive("free_operations", [] { return memory::stats().frees(); }, sm::description("Total number of free operations")),
             sm::make_derive("cross_cpu_free_operations", [] { return memory::stats().cross_cpu_frees(); }, sm::description("Total number of cross cpu free")),
             sm::make_gauge("malloc_live_objects", [] { return memory::stats().live_objects(); }, sm::description("Number of live objects")),
-            sm::make_current_bytes("free_memory", [] { return memory::stats().free_memory(); }, sm::description("Free memeory size in bytes")),
-            sm::make_current_bytes("total_memory", [] { return memory::stats().total_memory(); }, sm::description("Total memeory size in bytes")),
-            sm::make_current_bytes("allocated_memory", [] { return memory::stats().allocated_memory(); }, sm::description("Allocated memeory size in bytes")),
+            sm::make_current_bytes("free_memory", [] { return memory::stats().free_memory(); }, sm::description("Free memory size in bytes")),
+            sm::make_current_bytes("total_memory", [] { return memory::stats().total_memory(); }, sm::description("Total memory size in bytes")),
+            sm::make_current_bytes("allocated_memory", [] { return memory::stats().allocated_memory(); }, sm::description("Allocated memory size in bytes")),
             sm::make_derive("reclaims_operations", [] { return memory::stats().reclaims(); }, sm::description("Total reclaims operations"))
     });
 
@@ -2363,13 +2509,37 @@ public:
     }
 };
 
-class reactor::io_queue_submission_pollfn final : public simple_pollfn<true> {
+class reactor::io_queue_submission_pollfn final : public reactor::pollfn {
     reactor& _r;
+    // Wake-up the reactor with highres timer when the io-queue
+    // decides to delay dispatching until some time point in
+    // the future
+    timer<> _nearest_wakeup { [this] { _armed = false; } };
+    bool _armed = false;
 public:
     io_queue_submission_pollfn(reactor& r) : _r(r) {}
     virtual bool poll() final override {
         return _r.flush_pending_aio();
     }
+    virtual bool pure_poll() override final {
+        return poll();
+    }
+    virtual bool try_enter_interrupt_mode() override {
+        auto next = _r.next_pending_aio();
+        auto now = steady_clock_type::now();
+        if (next <= now) {
+            return false;
+        }
+        _nearest_wakeup.arm(next);
+        _armed = true;
+        return true;
+    }
+    virtual void exit_interrupt_mode() override final {
+        if (_armed) {
+            _nearest_wakeup.cancel();
+            _armed = false;
+        }
+    }
 };
 
 // Other cpus can queue items for us to free; and they won't notify
@@ -2429,11 +2599,11 @@ public:
     smp_pollfn(reactor& r) : _r(r) {}
     virtual bool poll() final override {
         return (smp::poll_queues() |
-                alien::smp::poll_queues());
+                _r._alien.poll_queues());
     }
     virtual bool pure_poll() final override {
         return (smp::pure_poll_queues() ||
-                alien::smp::pure_poll_queues());
+                _r._alien.pure_poll_queues());
     }
     virtual bool try_enter_interrupt_mode() override {
         // systemwide_memory_barrier() is very slow if run concurrently,
@@ -2580,7 +2750,7 @@ reactor::run_some_tasks() {
     sched_print("run_some_tasks: start");
     reset_preemption_monitor();
 
-    sched_clock::time_point t_run_completed = std::chrono::steady_clock::now();
+    sched_clock::time_point t_run_completed = now();
     STAP_PROBE(seastar, reactor_run_tasks_start);
     _cpu_stall_detector->start_task_run(t_run_completed);
     do {
@@ -2592,7 +2762,7 @@ reactor::run_some_tasks() {
         _last_vruntime = std::max(tq->_vruntime, _last_vruntime);
         run_tasks(*tq);
         tq->_current = false;
-        t_run_completed = std::chrono::steady_clock::now();
+        t_run_completed = now();
         auto delta = t_run_completed - t_run_started;
         account_runtime(*tq, delta);
         sched_print("run complete ({} {}); time consumed {} usec; final vruntime {} empty {}",
@@ -2626,7 +2796,7 @@ reactor::activate(task_queue& tq) {
         sched_print("tq {} {} losing vruntime {} due to sleep", (void*)&tq, tq._name, _last_vruntime - tq._vruntime);
     }
     tq._vruntime = std::max(_last_vruntime, tq._vruntime);
-    auto now = std::chrono::steady_clock::now();
+    auto now = reactor::now();
     tq._waittime += now - tq._ts;
     tq._ts = now;
     _activating_task_queues.push_back(&tq);
@@ -2640,7 +2810,17 @@ void reactor::service_highres_timer() noexcept {
     });
 }
 
-int reactor::run() {
+int reactor::run() noexcept {
+    try {
+        return do_run();
+    } catch (const std::exception& e) {
+        seastar_logger.error(e.what());
+        print_with_backtrace("exception running reactor main loop");
+        _exit(1);
+    }
+}
+
+int reactor::do_run() {
 #ifndef SEASTAR_ASAN_ENABLED
     // SIGSTKSZ is too small when using asan. We also don't need to
     // handle SIGSEGV ourselves when using asan, so just don't install
@@ -2713,7 +2893,7 @@ int reactor::run() {
     using namespace std::chrono_literals;
     timer<lowres_clock> load_timer;
     auto last_idle = _total_idle;
-    auto idle_start = sched_clock::now(), idle_end = idle_start;
+    auto idle_start = now(), idle_end = idle_start;
     load_timer.set_callback([this, &last_idle, &idle_start, &idle_end] () mutable {
         _total_idle += idle_end - idle_start;
         auto load = double((_total_idle - last_idle).count()) / double(std::chrono::duration_cast<sched_clock::duration>(1s).count());
@@ -2760,9 +2940,9 @@ int reactor::run() {
                 run_tasks(*_at_destroy_tasks);
             }
             _finished_running_tasks = true;
-            smp::arrive_at_event_loop_end();
+            _smp->arrive_at_event_loop_end();
             if (_id == 0) {
-                smp::join_all();
+                _smp->join_all();
             }
             break;
         }
@@ -2777,7 +2957,7 @@ int reactor::run() {
                 idle = false;
             }
         } else {
-            idle_end = sched_clock::now();
+            idle_end = now();
             if (!idle) {
                 idle_start = idle_end;
                 idle = true;
@@ -2798,12 +2978,12 @@ int reactor::run() {
                     // Turn off the task quota timer to avoid spurious wakeups
                     struct itimerspec zero_itimerspec = {};
                     _task_quota_timer.timerfd_settime(0, zero_itimerspec);
-                    auto start_sleep = sched_clock::now();
+                    auto start_sleep = now();
                     _cpu_stall_detector->start_sleep();
                     sleep();
                     _cpu_stall_detector->end_sleep();
                     // We may have slept for a while, so freshen idle_end
-                    idle_end = sched_clock::now();
+                    idle_end = now();
                     _total_sleep += idle_end - start_sleep;
                     _task_quota_timer.timerfd_settime(0, task_quote_itimerspec);
                 }
@@ -2818,7 +2998,7 @@ int reactor::run() {
     // This is needed because the reactor is destroyed from the thread_local destructors. If
     // the I/O queue happens to use any other infrastructure that is also kept this way (for
     // instance, collectd), we will not have any way to guarantee who is destroyed first.
-    my_io_queues.clear();
+    _io_queues.clear();
     return _return;
 }
 
@@ -3238,135 +3418,122 @@ bool operator==(const ::sockaddr_in a, const ::sockaddr_in b) {
 
 namespace seastar {
 
-void network_stack_registry::register_stack(sstring name,
-        boost::program_options::options_description opts,
-        noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)> create, bool make_default) {
-    if (_map().count(name)) {
-        return;
-    }
-    _map()[name] = std::move(create);
-    options_description().add(opts);
-    if (make_default) {
-        _default() = name;
-    }
+static bool kernel_supports_aio_fsync() {
+    return kernel_uname().whitelisted({"4.18"});
 }
 
-void register_network_stack(sstring name, boost::program_options::options_description opts,
-    noncopyable_function<future<std::unique_ptr<network_stack>>(boost::program_options::variables_map)>
-        create,
-    bool make_default) {
-    return network_stack_registry::register_stack(
-        std::move(name), std::move(opts), std::move(create), make_default);
-}
+static program_options::selection_value<network_stack_factory> create_network_stacks_option(reactor_options& zis) {
+    using value_type = program_options::selection_value<network_stack_factory>;
+    value_type::candidates candidates;
+    std::vector<std::string> net_stack_names;
 
-sstring network_stack_registry::default_stack() {
-    return _default();
-}
+    auto deleter = [] (network_stack_factory* p) { delete p; };
 
-std::vector<sstring> network_stack_registry::list() {
-    std::vector<sstring> ret;
-    for (auto&& ns : _map()) {
-        ret.push_back(ns.first);
+    std::string default_stack;
+    for (auto reg_func : {register_native_stack, register_posix_stack}) {
+        auto s = reg_func();
+        if (s.is_default) {
+            default_stack = s.name;
+        }
+        candidates.push_back({s.name, {new network_stack_factory(std::move(s.factory)), deleter}, std::move(s.opts)});
+        net_stack_names.emplace_back(s.name);
     }
-    return ret;
-}
 
-future<std::unique_ptr<network_stack>>
-network_stack_registry::create(options opts) {
-    return create(_default(), opts);
+    return program_options::selection_value<network_stack_factory>(zis, "network-stack", std::move(candidates), default_stack,
+            format("select network stack (valid values: {})", format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")));
 }
 
-future<std::unique_ptr<network_stack>>
-network_stack_registry::create(sstring name, options opts) {
-    if (!_map().count(name)) {
-        throw std::runtime_error(format("network stack {} not registered", name));
-    }
-    return _map()[name](opts);
-}
+static program_options::selection_value<reactor_backend_selector>::candidates backend_selector_candidates() {
+    using value_type = program_options::selection_value<reactor_backend_selector>;
+    value_type::candidates candidates;
 
-static bool kernel_supports_aio_fsync() {
-    return kernel_uname().whitelisted({"4.18"});
+    auto deleter = [] (reactor_backend_selector* p) { delete p; };
+
+    for (auto&& be : reactor_backend_selector::available()) {
+        auto name = be.name();
+        candidates.push_back({std::move(name), {new reactor_backend_selector(std::move(be)), deleter}, {}});
+    }
+    return candidates;
 }
 
-boost::program_options::options_description
-reactor::get_options_description(reactor_config cfg) {
-    namespace bpo = boost::program_options;
-    bpo::options_description opts("Core options");
-    auto net_stack_names = network_stack_registry::list();
-    opts.add_options()
-        ("network-stack", bpo::value<std::string>(),
-                format("select network stack (valid values: {})",
-                        format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")).c_str())
-        ("poll-mode", "poll continuously (100% cpu use)")
-        ("idle-poll-time-us", bpo::value<unsigned>()->default_value(calculate_poll_time() / 1us),
+reactor_options::reactor_options(program_options::option_group* parent_group)
+    : program_options::option_group(parent_group, "Core options")
+    , network_stack(create_network_stacks_option(*this))
+    , poll_mode(*this, "poll-mode", "poll continuously (100% cpu use)")
+    , idle_poll_time_us(*this, "idle-poll-time-us", reactor::calculate_poll_time() / 1us,
                 "idle polling time in microseconds (reduce for overprovisioned environments or laptops)")
-        ("poll-aio", bpo::value<bool>()->default_value(true),
+    , poll_aio(*this, "poll-aio", true,
                 "busy-poll for disk I/O (reduces latency and increases throughput)")
-        ("task-quota-ms", bpo::value<double>()->default_value(cfg.task_quota / 1ms), "Max time (ms) between polls")
-        ("max-task-backlog", bpo::value<unsigned>()->default_value(1000), "Maximum number of task backlog to allow; above this we ignore I/O")
-        ("blocked-reactor-notify-ms", bpo::value<unsigned>()->default_value(200), "threshold in miliseconds over which the reactor is considered blocked if no progress is made")
-        ("blocked-reactor-reports-per-minute", bpo::value<unsigned>()->default_value(5), "Maximum number of backtraces reported by stall detector per minute")
-        ("relaxed-dma", "allow using buffered I/O if DMA is not available (reduces performance)")
-        ("linux-aio-nowait",
-                bpo::value<bool>()->default_value(aio_nowait_supported),
+    , task_quota_ms(*this, "task-quota-ms", 500, "Max time (ms) between polls")
+    , max_task_backlog(*this, "max-task-backlog", 1000, "Maximum number of task backlog to allow; above this we ignore I/O")
+    , blocked_reactor_notify_ms(*this, "blocked-reactor-notify-ms", 200, "threshold in miliseconds over which the reactor is considered blocked if no progress is made")
+    , blocked_reactor_reports_per_minute(*this, "blocked-reactor-reports-per-minute", 5, "Maximum number of backtraces reported by stall detector per minute")
+    , blocked_reactor_report_format_oneline(*this, "blocked-reactor-report-format-oneline", true, "Print a simplified backtrace on a single line")
+    , relaxed_dma(*this, "relaxed-dma", "allow using buffered I/O if DMA is not available (reduces performance)")
+    , linux_aio_nowait(*this, "linux-aio-nowait", aio_nowait_supported,
                 "use the Linux NOWAIT AIO feature, which reduces reactor stalls due to aio (autodetected)")
-        ("unsafe-bypass-fsync", bpo::value<bool>()->default_value(false), "Bypass fsync(), may result in data loss. Use for testing on consumer drives")
-        ("overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0")
-        ("abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory")
-        ("force-aio-syscalls", bpo::value<bool>()->default_value(false),
+    , unsafe_bypass_fsync(*this, "unsafe-bypass-fsync", false, "Bypass fsync(), may result in data loss. Use for testing on consumer drives")
+    , kernel_page_cache(*this, "kernel-page-cache", false,
+                "Use the kernel page cache. This disables DMA (O_DIRECT)."
+                " Useful for short-lived functional tests with a small data set.")
+    , overprovisioned(*this, "overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0")
+    , abort_on_seastar_bad_alloc(*this, "abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory")
+    , force_aio_syscalls(*this, "force-aio-syscalls", false,
                 "Force io_getevents(2) to issue a system call, instead of bypassing the kernel when possible."
                 " This makes strace output more useful, but slows down the application")
-        ("dump-memory-diagnostics-on-alloc-failure-kind", bpo::value<std::string>()->default_value("critical"), "Dump diagnostics of the seastar allocator state on allocation failure."
+    , dump_memory_diagnostics_on_alloc_failure_kind(*this, "dump-memory-diagnostics-on-alloc-failure-kind", memory::alloc_failure_kind::critical,
+                "Dump diagnostics of the seastar allocator state on allocation failure."
                  " Accepted values: never, critical (default), always. When set to critical, only allocations marked as critical will trigger diagnostics dump."
                  " The diagnostics will be written to the seastar_memory logger, with error level."
                  " Note that if the seastar_memory logger is set to debug or trace level, the diagnostics will be logged irrespective of this setting.")
-        ("reactor-backend", bpo::value<reactor_backend_selector>()->default_value(reactor_backend_selector::default_backend()),
-                format("Internal reactor implementation ({})", reactor_backend_selector::available()).c_str())
-        ("aio-fsync", bpo::value<bool>()->default_value(kernel_supports_aio_fsync()),
+    , reactor_backend(*this, "reactor-backend", backend_selector_candidates(), reactor_backend_selector::default_backend().name(),
+                format("Internal reactor implementation ({})", reactor_backend_selector::available()))
+    , aio_fsync(*this, "aio-fsync", kernel_supports_aio_fsync(),
                 "Use Linux aio for fsync() calls. This reduces latency; requires Linux 4.18 or later.")
+    , max_networking_io_control_blocks(*this, "max-networking-io-control-blocks", 10000,
+                "Maximum number of I/O control blocks (IOCBs) to allocate per shard. This translates to the number of sockets supported per shard."
+                " Requires tuning /proc/sys/fs/aio-max-nr. Only valid for the linux-aio reactor backend (see --reactor-backend).")
 #ifdef SEASTAR_HEAPPROF
-        ("heapprof", "enable seastar heap profiling")
+    , heapprof(*this, "heapprof", "enable seastar heap profiling")
+#else
+    , heapprof(*this, "heapprof", program_options::unused{})
 #endif
-        ;
-    if (cfg.auto_handle_sigint_sigterm) {
-        opts.add_options()
-                ("no-handle-interrupt", "ignore SIGINT (for gdb)")
-                ;
-    }
-    opts.add(network_stack_registry::options_description());
-    return opts;
+    , no_handle_interrupt(*this, "no-handle-interrupt", "ignore SIGINT (for gdb)")
+{
 }
 
-boost::program_options::options_description
-smp::get_options_description()
-{
-    namespace bpo = boost::program_options;
-    bpo::options_description opts("SMP options");
-    opts.add_options()
-        ("smp,c", bpo::value<unsigned>(), "number of threads (default: one per CPU)")
-        ("cpuset", bpo::value<cpuset_bpo_wrapper>(), "CPUs to use (in cpuset(7) format; default: all))")
-        ("memory,m", bpo::value<std::string>(), "memory to use, in bytes (ex: 4G) (default: all)")
-        ("reserve-memory", bpo::value<std::string>(), "memory reserved to OS (if --memory not specified)")
-        ("hugepages", bpo::value<std::string>(), "path to accessible hugetlbfs mount (typically /dev/hugepages/something)")
-        ("lock-memory", bpo::value<bool>(), "lock all memory (prevents swapping)")
-        ("thread-affinity", bpo::value<bool>()->default_value(true), "pin threads to their cpus (disable for overprovisioning)")
+smp_options::smp_options(program_options::option_group* parent_group)
+    : program_options::option_group(parent_group, "SMP options")
+    , smp(*this, "smp", {}, "number of threads (default: one per CPU)")
+    , cpuset(*this, "cpuset", {}, "CPUs to use (in cpuset(7) format; default: all))")
+    , memory(*this, "memory", std::nullopt, "memory to use, in bytes (ex: 4G) (default: all)")
+    , reserve_memory(*this, "reserve-memory", {}, "memory reserved to OS (if --memory not specified)")
+    , hugepages(*this, "hugepages", {}, "path to accessible hugetlbfs mount (typically /dev/hugepages/something)")
+    , lock_memory(*this, "lock-memory", {}, "lock all memory (prevents swapping)")
+    , thread_affinity(*this, "thread-affinity", true, "pin threads to their cpus (disable for overprovisioning)")
 #ifdef SEASTAR_HAVE_HWLOC
-        ("num-io-queues", bpo::value<unsigned>(), "Number of IO queues. Each IO unit will be responsible for a fraction of the IO requests. Defaults to the number of threads")
-        ("max-io-requests", bpo::value<unsigned>(), "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of IO queues")
+    , num_io_queues(*this, "num-io-queues", {}, "Number of IO queues. Each IO unit will be responsible for a fraction of the IO requests. Defaults to the number of threads")
+    , num_io_groups(*this, "num-io-groups", {}, "Number of IO groups. Each IO group will be responsible for a fraction of the IO requests. Defaults to the number of NUMA nodes")
+    , max_io_requests(*this, "max-io-requests", {}, "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of IO queues")
 #else
-        ("max-io-requests", bpo::value<unsigned>(), "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of processors")
+    , num_io_queues(*this, "num-io-queues", program_options::unused{})
+    , num_io_groups(*this, "num-io-groups", program_options::unused{})
+    , max_io_requests(*this, "max-io-requests", {}, "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of processors")
 #endif
-        ("io-properties-file", bpo::value<std::string>(), "path to a YAML file describing the characteristics of the I/O Subsystem")
-        ("io-properties", bpo::value<std::string>(), "a YAML string describing the characteristics of the I/O Subsystem")
-        ("mbind", bpo::value<bool>()->default_value(true), "enable mbind")
+    , io_properties_file(*this, "io-properties-file", {}, "path to a YAML file describing the characteristics of the I/O Subsystem")
+    , io_properties(*this, "io-properties", {}, "a YAML string describing the characteristics of the I/O Subsystem")
+    , mbind(*this, "mbind", true, "enable mbind")
 #ifndef SEASTAR_NO_EXCEPTION_HACK
-        ("enable-glibc-exception-scaling-workaround", bpo::value<bool>()->default_value(true), "enable workaround for glibc/gcc c++ exception scalablity problem")
+    , enable_glibc_exception_scaling_workaround(*this, "enable-glibc-exception-scaling-workaround", true, "enable workaround for glibc/gcc c++ exception scalablity problem")
+#else
+    , enable_glibc_exception_scaling_workaround(*this, program_options::unused{})
 #endif
 #ifdef SEASTAR_HAVE_HWLOC
-        ("allow-cpus-in-remote-numa-nodes", bpo::value<bool>()->default_value(true), "if some CPUs are found not to have any local NUMA nodes, allow assigning them to remote ones")
+    , allow_cpus_in_remote_numa_nodes(*this, "allow-cpus-in-remote-numa-nodes", true, "if some CPUs are found not to have any local NUMA nodes, allow assigning them to remote ones")
+#else
+    , allow_cpus_in_remote_numa_nodes(*this, "allow-cpus-in-remote-numa-nodes", program_options::unused{})
 #endif
-        ;
-    return opts;
+{
 }
 
 thread_local scollectd::impl scollectd_impl;
@@ -3384,14 +3551,9 @@ struct reactor_deleter {
 
 thread_local std::unique_ptr<reactor, reactor_deleter> reactor_holder;
 
-std::vector<posix_thread> smp::_threads;
-std::vector<std::function<void ()>> smp::_thread_loops;
-std::optional<boost::barrier> smp::_all_event_loops_done;
-std::vector<reactor*> smp::_reactors;
-std::unique_ptr<smp_message_queue*[], smp::qs_deleter> smp::_qs;
-std::thread::id smp::_tmain;
-unsigned smp::count = 1;
-bool smp::_using_dpdk;
+thread_local smp_message_queue** smp::_qs;
+thread_local std::thread::id smp::_tmain;
+unsigned smp::count = 0;
 
 void smp::start_all_queues()
 {
@@ -3400,7 +3562,7 @@ void smp::start_all_queues()
             _qs[c][this_shard_id()].start(c);
         }
     }
-    alien::smp::_qs[this_shard_id()].start();
+    _alien._qs[this_shard_id()].start();
 }
 
 #ifdef SEASTAR_HAVE_DPDK
@@ -3450,11 +3612,11 @@ void smp::allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_co
     assert(r == 0);
     local_engine = reinterpret_cast<reactor*>(buf);
     *internal::this_shard_id_ptr() = id;
-    new (buf) reactor(id, std::move(rbs), cfg);
+    new (buf) reactor(this->shared_from_this(), _alien, id, std::move(rbs), cfg);
     reactor_holder.reset(local_engine);
 }
 
-void smp::cleanup() {
+void smp::cleanup() noexcept {
     smp::_threads = std::vector<posix_thread>();
     _thread_loops.clear();
 }
@@ -3467,8 +3629,8 @@ void smp::cleanup_cpu() {
             _qs[i][cpuid].stop();
         }
     }
-    if (alien::smp::_qs) {
-        alien::smp::_qs[cpuid].stop();
+    if (_alien._qs) {
+        _alien._qs[cpuid].stop();
     }
 }
 
@@ -3525,56 +3687,52 @@ void smp::qs_deleter::operator()(smp_message_queue** qs) const {
 
 class disk_config_params {
 private:
-    unsigned _num_io_queues = 0;
+    unsigned _num_io_groups = 0;
     std::optional<unsigned> _capacity;
     std::unordered_map<dev_t, mountpoint_params> _mountpoints;
     std::chrono::duration<double> _latency_goal;
 
 public:
-    uint64_t per_io_queue(uint64_t qty, dev_t devid) const {
-        const mountpoint_params& p = _mountpoints.at(devid);
-        return std::max(qty / p.num_io_queues, 1ul);
+    uint64_t per_io_group(uint64_t qty, unsigned nr_groups) const noexcept {
+        return std::max(qty / nr_groups, 1ul);
     }
 
-    unsigned num_io_queues(dev_t devid) const {
-        const mountpoint_params& p = _mountpoints.at(devid);
-        return p.num_io_queues;
-    }
+    unsigned num_io_groups() const noexcept { return _num_io_groups; }
 
     std::chrono::duration<double> latency_goal() const {
         return _latency_goal;
     }
 
-    void parse_config(boost::program_options::variables_map& configuration) {
+    void parse_config(const smp_options& smp_opts, const reactor_options& reactor_opts) {
         seastar_logger.debug("smp::count: {}", smp::count);
-        _latency_goal = std::chrono::duration_cast<std::chrono::duration<double>>(configuration["task-quota-ms"].as<double>() * 1.5 * 1ms);
+        _latency_goal = std::chrono::duration_cast<std::chrono::duration<double>>(reactor_opts.task_quota_ms.get_value() * 1.5 * 1ms);
         seastar_logger.debug("latency_goal: {}", latency_goal().count());
 
-        if (configuration.count("max-io-requests")) {
-            _capacity = configuration["max-io-requests"].as<unsigned>();
+        if (smp_opts.max_io_requests) {
+            seastar_logger.warn("the --max-io-requests option is deprecated, switch to io properties file instead");
+            _capacity = smp_opts.max_io_requests.get_value();
         }
 
-        if (configuration.count("num-io-queues")) {
-            _num_io_queues = configuration["num-io-queues"].as<unsigned>();
-            if (!_num_io_queues) {
-                throw std::runtime_error("num-io-queues must be greater than zero");
+        if (smp_opts.num_io_groups) {
+            _num_io_groups = smp_opts.num_io_groups.get_value();
+            if (!_num_io_groups) {
+                throw std::runtime_error("num-io-groups must be greater than zero");
             }
+        } else if (smp_opts.num_io_queues) {
+            seastar_logger.warn("the --num-io-queues option is deprecated, switch to --num-io-groups instead");
         }
-        if (configuration.count("io-properties-file") && configuration.count("io-properties")) {
+        if (smp_opts.io_properties_file && smp_opts.io_properties) {
             throw std::runtime_error("Both io-properties and io-properties-file specified. Don't know which to trust!");
         }
 
         std::optional<YAML::Node> doc;
-        if (configuration.count("io-properties-file")) {
-            doc = YAML::LoadFile(configuration["io-properties-file"].as<std::string>());
-        } else if (configuration.count("io-properties")) {
-            doc = YAML::Load(configuration["io-properties"].as<std::string>());
+        if (smp_opts.io_properties_file) {
+            doc = YAML::LoadFile(smp_opts.io_properties_file.get_value());
+        } else if (smp_opts.io_properties) {
+            doc = YAML::Load(smp_opts.io_properties.get_value());
         }
 
         if (doc) {
-            static constexpr unsigned task_quotas_in_default_latency_goal = 3;
-            unsigned auto_num_io_queues = smp::count;
-
             for (auto&& section : *doc) {
                 auto sec_name = section.first.as<std::string>();
                 if (sec_name != "disks") {
@@ -3599,71 +3757,52 @@ public:
                         throw std::runtime_error(fmt::format("R/W bytes and req rates must not be zero"));
                     }
 
-                    // Ideally we wouldn't have I/O Queues and would dispatch from every shard (https://github.com/scylladb/seastar/issues/485)
-                    // While we don't do that, we'll just be conservative and try to recommend values of I/O Queues that are close to what we
-                    // suggested before the I/O Scheduler rework. The I/O Scheduler has traditionally tried to make sure that each queue would have
-                    // at least 4 requests in depth, and all its requests were 4kB in size. Therefore, try to arrange the I/O Queues so that we would
-                    // end up in the same situation here (that's where the 4 comes from).
-                    //
-                    // For the bandwidth limit, we want that to be 4 * 4096, so each I/O Queue has the same bandwidth as before.
-                    if (!_num_io_queues) {
-                        unsigned dev_io_queues = smp::count;
-                        dev_io_queues = std::min(dev_io_queues, unsigned((task_quotas_in_default_latency_goal * d.write_req_rate * latency_goal().count()) / 4));
-                        dev_io_queues = std::min(dev_io_queues, unsigned((task_quotas_in_default_latency_goal * d.write_bytes_rate * latency_goal().count()) / (4 * 4096)));
-                        dev_io_queues = std::max(dev_io_queues, 1u);
-                        seastar_logger.debug("dev_io_queues: {}", dev_io_queues);
-                        d.num_io_queues = dev_io_queues;
-                        auto_num_io_queues = std::min(auto_num_io_queues, dev_io_queues);
-                    } else {
-                        d.num_io_queues = _num_io_queues;
-                    }
-
                     seastar_logger.debug("dev_id: {} mountpoint: {}", buf.st_dev, d.mountpoint);
                     _mountpoints.emplace(buf.st_dev, d);
                 }
             }
-            if (!_num_io_queues) {
-                _num_io_queues = auto_num_io_queues;
-            }
-        } else if (!_num_io_queues) {
-            _num_io_queues = smp::count;
         }
 
         // Placeholder for unconfigured disks.
         mountpoint_params d = {};
-        d.num_io_queues = _num_io_queues;
-        seastar_logger.debug("num_io_queues: {}", d.num_io_queues);
         _mountpoints.emplace(0, d);
     }
 
-    struct io_queue::config generate_config(dev_t devid) const {
+    struct io_queue::config generate_config(dev_t devid, unsigned nr_groups) const {
         seastar_logger.debug("generate_config dev_id: {}", devid);
         const mountpoint_params& p = _mountpoints.at(devid);
         struct io_queue::config cfg;
-        uint64_t max_bandwidth = std::max(p.read_bytes_rate, p.write_bytes_rate);
-        uint64_t max_iops = std::max(p.read_req_rate, p.write_req_rate);
 
         cfg.devid = devid;
-        cfg.disk_bytes_write_to_read_multiplier = io_queue::read_request_base_count;
-        cfg.disk_req_write_to_read_multiplier = io_queue::read_request_base_count;
 
         if (!_capacity) {
-            if (max_bandwidth != std::numeric_limits<uint64_t>::max()) {
+            if (p.read_bytes_rate != std::numeric_limits<uint64_t>::max()) {
+                cfg.max_bytes_count = io_queue::read_request_base_count * per_io_group(p.read_bytes_rate * latency_goal().count(), nr_groups);
                 cfg.disk_bytes_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_bytes_rate) / p.write_bytes_rate;
-                cfg.max_bytes_count = io_queue::read_request_base_count * per_io_queue(max_bandwidth * latency_goal().count(), devid);
+                cfg.disk_us_per_byte = 1000000. / p.read_bytes_rate;
             }
-            if (max_iops != std::numeric_limits<uint64_t>::max()) {
-                cfg.max_req_count = io_queue::read_request_base_count * per_io_queue(max_iops * latency_goal().count(), devid);
+            if (p.read_req_rate != std::numeric_limits<uint64_t>::max()) {
+                cfg.max_req_count = io_queue::read_request_base_count * per_io_group(p.read_req_rate * latency_goal().count(), nr_groups);
                 cfg.disk_req_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_req_rate) / p.write_req_rate;
+                cfg.disk_us_per_request = 1000000. / p.read_req_rate;
+            }
+            if (p.read_saturation_length != std::numeric_limits<uint64_t>::max()) {
+                cfg.disk_read_saturation_length = p.read_saturation_length;
+            }
+            if (p.write_saturation_length != std::numeric_limits<uint64_t>::max()) {
+                cfg.disk_write_saturation_length = p.write_saturation_length;
             }
             cfg.mountpoint = p.mountpoint;
+            cfg.duplex = p.duplex;
         } else {
             // For backwards compatibility
             cfg.capacity = *_capacity;
             // Legacy configuration when only concurrency is specified.
-            cfg.max_req_count = io_queue::read_request_base_count * std::min(*_capacity, reactor::max_aio_per_queue);
+            unsigned max_req_count = std::min(*_capacity, reactor::max_aio_per_queue);
+            cfg.max_req_count = io_queue::read_request_base_count * max_req_count;
             // specify size in terms of 16kB IOPS.
-            cfg.max_bytes_count = io_queue::read_request_base_count * (cfg.max_req_count << 14);
+            static_assert(reactor::max_aio_per_queue << 14 <= std::numeric_limits<decltype(cfg.max_bytes_count)>::max() / io_queue::read_request_base_count);
+            cfg.max_bytes_count = io_queue::read_request_base_count * (max_req_count << 14);
         }
         return cfg;
     }
@@ -3673,15 +3812,36 @@ public:
     }
 };
 
-void smp::register_network_stacks() {
-    register_posix_stack();
-    register_native_stack();
+unsigned smp::adjust_max_networking_aio_io_control_blocks(unsigned network_iocbs)
+{
+    static unsigned constexpr storage_iocbs = reactor::max_aio;
+    static unsigned constexpr preempt_iocbs = 2;
+
+    auto aio_max_nr = read_first_line_as<unsigned>("/proc/sys/fs/aio-max-nr");
+    auto aio_nr = read_first_line_as<unsigned>("/proc/sys/fs/aio-nr");
+    auto available_aio = aio_max_nr - aio_nr;
+    auto requested_aio_network = network_iocbs * smp::count;
+    auto requested_aio_other = (storage_iocbs + preempt_iocbs) * smp::count;
+    auto requested_aio = requested_aio_network + requested_aio_other;
+    auto network_iocbs_old = network_iocbs;
+
+    if (available_aio < requested_aio) {
+        seastar_logger.warn("Requested AIO slots too large, please increase request capacity in /proc/sys/fs/aio-max-nr. available:{} requested:{}", available_aio, requested_aio);
+        if (available_aio >= requested_aio_other + smp::count) { // at least one queue for each shard
+            network_iocbs = (available_aio - requested_aio_other) / smp::count;
+            seastar_logger.warn("max-networking-io-control-blocks adjusted from {} to {}, since AIO slots are unavailable", network_iocbs_old, network_iocbs);
+        } else {
+            throw std::runtime_error("Could not setup Async I/O: Not enough request capacity in /proc/sys/fs/aio-max-nr. Try increasing that number or reducing the amount of logical CPUs available for your application");
+        }
+    }
+
+    return network_iocbs;
 }
 
-void smp::configure(boost::program_options::variables_map configuration, reactor_config reactor_cfg)
+void smp::configure(const smp_options& smp_opts, const reactor_options& reactor_opts)
 {
 #ifndef SEASTAR_NO_EXCEPTION_HACK
-    if (configuration["enable-glibc-exception-scaling-workaround"].as<bool>()) {
+    if (smp_opts.enable_glibc_exception_scaling_workaround.get_value()) {
         init_phdr_cache();
     }
 #endif
@@ -3697,7 +3857,7 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
             SIGALRM, SIGCONT, SIGSTOP, SIGTSTP, SIGTTIN, SIGTTOU}) {
         sigdelset(&sigs, sig);
     }
-    if (!reactor_cfg.auto_handle_sigint_sigterm) {
+    if (!reactor_opts._auto_handle_sigint_sigterm) {
         sigdelset(&sigs, SIGINT);
         sigdelset(&sigs, SIGTERM);
     }
@@ -3712,32 +3872,35 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
     install_oneshot_signal_handler<SIGABRT, sigabrt_action>();
 
 #ifdef SEASTAR_HAVE_DPDK
-    _using_dpdk = configuration.count("dpdk-pmd");
+    const auto* native_stack = dynamic_cast<const net::native_stack_options*>(reactor_opts.network_stack.get_selected_candidate_opts());
+    _using_dpdk = native_stack && native_stack->dpdk_pmd;
 #endif
-    auto thread_affinity = configuration["thread-affinity"].as<bool>();
-    if (configuration.count("overprovisioned")
-           && configuration["thread-affinity"].defaulted()) {
+    auto thread_affinity = smp_opts.thread_affinity.get_value();
+    if (reactor_opts.overprovisioned
+           && smp_opts.thread_affinity.defaulted()) {
         thread_affinity = false;
     }
     if (!thread_affinity && _using_dpdk) {
         fmt::print("warning: --thread-affinity 0 ignored in dpdk mode\n");
     }
-    auto mbind = configuration["mbind"].as<bool>();
+    auto mbind = smp_opts.mbind.get_value();
     if (!thread_affinity) {
         mbind = false;
     }
 
+    resource::configuration rc;
+
     smp::count = 1;
     smp::_tmain = std::this_thread::get_id();
-    auto nr_cpus = resource::nr_processing_units();
+    auto nr_cpus = resource::nr_processing_units(rc);
     resource::cpuset cpu_set;
     auto cgroup_cpu_set = cgroup::cpu_set();
 
     std::copy(boost::counting_iterator<unsigned>(0), boost::counting_iterator<unsigned>(nr_cpus),
             std::inserter(cpu_set, cpu_set.end()));
 
-    if (configuration.count("cpuset")) {
-        cpu_set = configuration["cpuset"].as<cpuset_bpo_wrapper>().value;
+    if (smp_opts.cpuset) {
+        cpu_set = smp_opts.cpuset.get_value();
         if (cgroup_cpu_set && *cgroup_cpu_set != cpu_set) {
             // CPUs that are not available are those pinned by
             // --cpuset but not by cgroups, if mounted.
@@ -3759,25 +3922,24 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
         cpu_set = *cgroup_cpu_set;
     }
 
-    if (configuration.count("smp")) {
-        nr_cpus = configuration["smp"].as<unsigned>();
+    if (smp_opts.smp) {
+        nr_cpus = smp_opts.smp.get_value();
     } else {
         nr_cpus = cpu_set.size();
     }
     smp::count = nr_cpus;
-    _reactors.resize(nr_cpus);
-    resource::configuration rc;
-    if (configuration.count("memory")) {
-        rc.total_memory = parse_memory_size(configuration["memory"].as<std::string>());
+    std::vector<reactor*> reactors(nr_cpus);
+    if (smp_opts.memory) {
+        rc.total_memory = parse_memory_size(smp_opts.memory.get_value());
 #ifdef SEASTAR_HAVE_DPDK
-        if (configuration.count("hugepages") &&
-            !configuration["network-stack"].as<std::string>().compare("native") &&
+        if (smp_opts.hugepages &&
+            !reactor_opts.network_stack.get_selected_candidate_name().compare("native") &&
             _using_dpdk) {
             size_t dpdk_memory = dpdk::eal::mem_size(smp::count);
 
             if (dpdk_memory >= rc.total_memory) {
                 std::cerr<<"Can't run with the given amount of memory: ";
-                std::cerr<<configuration["memory"].as<std::string>();
+                std::cerr<<smp_opts.memory.get_value();
                 std::cerr<<". Consider giving more."<<std::endl;
                 exit(1);
             }
@@ -3790,16 +3952,16 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
         }
 #endif
     }
-    if (configuration.count("reserve-memory")) {
-        rc.reserve_memory = parse_memory_size(configuration["reserve-memory"].as<std::string>());
+    if (smp_opts.reserve_memory) {
+        rc.reserve_memory = parse_memory_size(smp_opts.reserve_memory.get_value());
     }
     std::optional<std::string> hugepages_path;
-    if (configuration.count("hugepages")) {
-        hugepages_path = configuration["hugepages"].as<std::string>();
+    if (smp_opts.hugepages) {
+        hugepages_path = smp_opts.hugepages.get_value();
     }
     auto mlock = false;
-    if (configuration.count("lock-memory")) {
-        mlock = configuration["lock-memory"].as<bool>();
+    if (smp_opts.lock_memory) {
+        mlock = smp_opts.lock_memory.get_value();
     }
     if (mlock) {
         auto extra_flags = 0;
@@ -3823,13 +3985,14 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
     rc.cpu_set = std::move(cpu_set);
 
     disk_config_params disk_config;
-    disk_config.parse_config(configuration);
+    disk_config.parse_config(smp_opts, reactor_opts);
     for (auto& id : disk_config.device_ids()) {
-        rc.num_io_queues.emplace(id, disk_config.num_io_queues(id));
+        rc.devices.push_back(id);
     }
+    rc.num_io_groups = disk_config.num_io_groups();
 
 #ifdef SEASTAR_HAVE_HWLOC
-    if (configuration["allow-cpus-in-remote-numa-nodes"].as<bool>()) {
+    if (smp_opts.allow_cpus_in_remote_numa_nodes.get_value()) {
         rc.assign_orphan_cpus = true;
     }
 #endif
@@ -3841,77 +4004,99 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
     }
     memory::configure(allocations[0].mem, mbind, hugepages_path);
 
-    if (configuration.count("abort-on-seastar-bad-alloc")) {
+    if (reactor_opts.abort_on_seastar_bad_alloc) {
         memory::enable_abort_on_allocation_failure();
     }
 
-    if (configuration.count("dump-memory-diagnostics-on-alloc-failure-kind")) {
-        memory::set_dump_memory_diagnostics_on_alloc_failure_kind(configuration["dump-memory-diagnostics-on-alloc-failure-kind"].as<std::string>());
+    if (reactor_opts.dump_memory_diagnostics_on_alloc_failure_kind) {
+        memory::set_dump_memory_diagnostics_on_alloc_failure_kind(reactor_opts.dump_memory_diagnostics_on_alloc_failure_kind.get_value());
     }
 
-    bool heapprof_enabled = configuration.count("heapprof");
+    reactor_config reactor_cfg;
+    reactor_cfg.auto_handle_sigint_sigterm = reactor_opts._auto_handle_sigint_sigterm;
+    reactor_cfg.max_networking_aio_io_control_blocks = adjust_max_networking_aio_io_control_blocks(reactor_opts.max_networking_io_control_blocks.get_value());
+
+#ifdef SEASTAR_HEAPPROF
+    bool heapprof_enabled = reactor_opts.heapprof;
     if (heapprof_enabled) {
         memory::set_heap_profiling_enabled(heapprof_enabled);
     }
+#else
+    bool heapprof_enabled = false;
+#endif
 
 #ifdef SEASTAR_HAVE_DPDK
-    if (smp::_using_dpdk) {
+    if (_using_dpdk) {
         dpdk::eal::cpuset cpus;
         for (auto&& a : allocations) {
             cpus[a.cpu_id] = true;
         }
-        dpdk::eal::init(cpus, configuration);
+        dpdk::eal::init(cpus, reactor_opts._argv0, hugepages_path, native_stack ? bool(native_stack->dpdk_pmd) : false);
     }
 #endif
 
     // Better to put it into the smp class, but at smp construction time
     // correct smp::count is not known.
-    static boost::barrier reactors_registered(smp::count);
-    static boost::barrier smp_queues_constructed(smp::count);
-    static boost::barrier inited(smp::count);
+    boost::barrier reactors_registered(smp::count);
+    boost::barrier smp_queues_constructed(smp::count);
+    // We use shared_ptr since this thread can exit while other threads are still unlocking
+    auto inited = std::make_shared<boost::barrier>(smp::count);
 
     auto ioq_topology = std::move(resources.ioq_topology);
 
-    std::unordered_map<dev_t, std::vector<io_queue*>> all_io_queues;
+    // ATTN: The ioq_topology value is referenced by below lambdas which are
+    // then copied to other shard's threads, so each shard has a copy of the
+    // ioq_topology on stack, but (!) still references and uses the value
+    // from shard-0. This access is race-free because
+    //  1. The .shard_to_group is not modified
+    //  2. The .queues is pre-resize()-d in advance, so the vector itself
+    //     doesn't change; existing slots are accessed by owning shards only
+    //     without interference
+    //  3. The .groups manipulations are guarded by the .lock lock (but it's
+    //     also pre-resize()-d in advance)
+
+    auto alloc_io_queues = [&ioq_topology, &disk_config] (shard_id shard) {
+        for (auto& topo : ioq_topology) {
+            auto& io_info = topo.second;
+            auto group_idx = io_info.shard_to_group[shard];
+            std::shared_ptr<io_group> group;
+
+            {
+                std::lock_guard _(io_info.lock);
+                auto& iog = io_info.groups[group_idx];
+                if (!iog) {
+                    struct io_queue::config qcfg = disk_config.generate_config(topo.first, io_info.groups.size());
+                    iog = std::make_shared<io_group>(std::move(qcfg));
+                    seastar_logger.debug("allocate {} IO group", group_idx);
+                }
+                group = iog;
+            }
 
-    for (auto& id : disk_config.device_ids()) {
-        auto io_info = ioq_topology.at(id);
-        all_io_queues.emplace(id, io_info.nr_coordinators);
-    }
-
-    auto alloc_io_queue = [&ioq_topology, &all_io_queues, &disk_config] (unsigned shard, dev_t id) {
-        auto io_info = ioq_topology.at(id);
-        auto cid = io_info.shard_to_coordinator[shard];
-        auto vec_idx = io_info.coordinator_to_idx[cid];
-        assert(io_info.coordinator_to_idx_valid[cid]);
-        if (shard == cid) {
-            struct io_queue::config cfg = disk_config.generate_config(id);
-            cfg.coordinator = cid;
-            assert(vec_idx < all_io_queues[id].size());
-            assert(!all_io_queues[id][vec_idx]);
-            all_io_queues[id][vec_idx] = new io_queue(std::move(cfg));
+            io_info.queues[shard] = std::make_unique<io_queue>(std::move(group), engine()._io_sink);
+            seastar_logger.debug("attached {} queue to {} IO group", shard, group_idx);
         }
     };
 
-    auto assign_io_queue = [&ioq_topology, &all_io_queues] (shard_id shard_id, dev_t dev_id) {
-        auto io_info = ioq_topology.at(dev_id);
-        auto cid = io_info.shard_to_coordinator[shard_id];
-        auto queue_idx = io_info.coordinator_to_idx[cid];
-        if (all_io_queues[dev_id][queue_idx]->coordinator() == shard_id) {
-            engine().my_io_queues.emplace_back(all_io_queues[dev_id][queue_idx]);
+    auto assign_io_queues = [&ioq_topology] (shard_id shard) {
+        for (auto& topo : ioq_topology) {
+            auto queue = std::move(topo.second.queues[shard]);
+            assert(queue);
+            engine()._io_queues.emplace(topo.first, std::move(queue));
         }
-        engine()._io_queues.emplace(dev_id, all_io_queues[dev_id][queue_idx]);
     };
 
     _all_event_loops_done.emplace(smp::count);
 
-    auto backend_selector = configuration["reactor-backend"].as<reactor_backend_selector>();
+    auto backend_selector = reactor_opts.reactor_backend.get_selected_candidate();
 
     unsigned i;
+    auto smp_tmain = smp::_tmain;
     for (i = 1; i < smp::count; i++) {
         auto allocation = allocations[i];
-        create_thread([configuration, &disk_config, hugepages_path, i, allocation, assign_io_queue, alloc_io_queue, thread_affinity, heapprof_enabled, mbind, backend_selector, reactor_cfg] {
+        create_thread([this, smp_tmain, inited, &reactors_registered, &smp_queues_constructed, &reactor_opts, &reactors, hugepages_path, i, allocation, assign_io_queues, alloc_io_queues, thread_affinity, heapprof_enabled, mbind, backend_selector, reactor_cfg] {
           try {
+            // initialize thread_locals that are equal across all reacto threads of this smp instance
+            smp::_tmain = smp_tmain;
             auto thread_name = seastar::format("reactor-{}", i);
             pthread_setname_np(pthread_self(), thread_name.c_str());
             if (thread_affinity) {
@@ -3930,19 +4115,17 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
             throw_pthread_error(r);
             init_default_smp_service_group(i);
             allocate_reactor(i, backend_selector, reactor_cfg);
-            _reactors[i] = &engine();
-            for (auto& dev_id : disk_config.device_ids()) {
-                alloc_io_queue(i, dev_id);
-            }
+            reactors[i] = &engine();
+            alloc_io_queues(i);
             reactors_registered.wait();
             smp_queues_constructed.wait();
+            // _qs_owner is only initialized here
+            _qs = _qs_owner.get();
             start_all_queues();
-            for (auto& dev_id : disk_config.device_ids()) {
-                assign_io_queue(i, dev_id);
-            }
-            inited.wait();
-            engine().configure(configuration);
-            engine().run();
+            assign_io_queues(i);
+            inited->wait();
+            engine().configure(reactor_opts);
+            engine().do_run();
           } catch (const std::exception& e) {
               seastar_logger.error(e.what());
               _exit(1);
@@ -3958,10 +4141,8 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
         _exit(1);
     }
 
-    _reactors[0] = &engine();
-    for (auto& dev_id : disk_config.device_ids()) {
-        alloc_io_queue(0, dev_id);
-    }
+    reactors[0] = &engine();
+    alloc_io_queues(0);
 
 #ifdef SEASTAR_HAVE_DPDK
     if (_using_dpdk) {
@@ -3973,22 +4154,21 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
 #endif
 
     reactors_registered.wait();
-    smp::_qs = decltype(smp::_qs){new smp_message_queue* [smp::count], qs_deleter{}};
+    _qs_owner = decltype(smp::_qs_owner){new smp_message_queue* [smp::count], qs_deleter{}};
+    _qs = _qs_owner.get();
     for(unsigned i = 0; i < smp::count; i++) {
-        smp::_qs[i] = reinterpret_cast<smp_message_queue*>(operator new[] (sizeof(smp_message_queue) * smp::count));
+        smp::_qs_owner[i] = reinterpret_cast<smp_message_queue*>(operator new[] (sizeof(smp_message_queue) * smp::count));
         for (unsigned j = 0; j < smp::count; ++j) {
-            new (&smp::_qs[i][j]) smp_message_queue(_reactors[j], _reactors[i]);
+            new (&smp::_qs_owner[i][j]) smp_message_queue(reactors[j], reactors[i]);
         }
     }
-    alien::smp::_qs = alien::smp::create_qs(_reactors);
+    _alien._qs = alien::instance::create_qs(reactors);
     smp_queues_constructed.wait();
     start_all_queues();
-    for (auto& dev_id : disk_config.device_ids()) {
-        assign_io_queue(0, dev_id);
-    }
-    inited.wait();
+    assign_io_queues(0);
+    inited->wait();
 
-    engine().configure(configuration);
+    engine().configure(reactor_opts);
     // The raw `new` is necessary because of the private constructor of `lowres_clock_impl`.
     engine()._lowres_clock_impl = std::unique_ptr<lowres_clock_impl>(new lowres_clock_impl);
 }
@@ -4024,9 +4204,6 @@ bool smp::pure_poll_queues() {
     return false;
 }
 
-internal::preemption_monitor bootstrap_preemption_monitor{};
-__thread const internal::preemption_monitor* g_need_preempt = &bootstrap_preemption_monitor;
-
 __thread reactor* local_engine;
 
 void report_exception(std::string_view message, std::exception_ptr eptr) noexcept {
@@ -4144,12 +4321,12 @@ void add_to_flush_poller(output_stream<char>* os) {
     engine()._flush_batching.emplace_back(os);
 }
 
-reactor::sched_clock::duration reactor::total_idle_time() {
+steady_clock_type::duration reactor::total_idle_time() {
     return _total_idle;
 }
 
-reactor::sched_clock::duration reactor::total_busy_time() {
-    return sched_clock::now() - _start_time - _total_idle;
+steady_clock_type::duration reactor::total_busy_time() {
+    return now() - _start_time - _total_idle;
 }
 
 std::chrono::nanoseconds reactor::total_steal_time() {
@@ -4166,7 +4343,7 @@ std::chrono::nanoseconds reactor::total_steal_time() {
     // steal time but we have no ways to account it.
     //
     // But what we have here should be good enough and at least has a well defined meaning.
-    return std::chrono::duration_cast<std::chrono::nanoseconds>(sched_clock::now() - _start_time - _total_sleep) -
+    return std::chrono::duration_cast<std::chrono::nanoseconds>(now() - _start_time - _total_sleep) -
            std::chrono::duration_cast<std::chrono::nanoseconds>(thread_cputime_clock::now().time_since_epoch());
 }
 
@@ -4317,7 +4494,7 @@ scheduling_group_key_create(scheduling_group_key_config cfg) noexcept {
 
 future<>
 rename_priority_class(io_priority_class pc, sstring new_name) {
-    return reactor::rename_priority_class(pc, new_name);
+    return pc.rename(std::move(new_name));
 }
 
 future<>
@@ -4348,35 +4525,35 @@ rename_scheduling_group(scheduling_group sg, sstring new_name) noexcept {
 namespace internal {
 
 inline
-std::chrono::steady_clock::duration
+sched_clock::duration
 timeval_to_duration(::timeval tv) {
     return std::chrono::seconds(tv.tv_sec) + std::chrono::microseconds(tv.tv_usec);
 }
 
 class reactor_stall_sampler : public reactor::pollfn {
-    std::chrono::steady_clock::time_point _run_start;
+    sched_clock::time_point _run_start;
     ::rusage _run_start_rusage;
     uint64_t _kernel_stalls = 0;
-    std::chrono::steady_clock::duration _nonsleep_cpu_time = {};
-    std::chrono::steady_clock::duration _nonsleep_wall_time = {};
+    sched_clock::duration _nonsleep_cpu_time = {};
+    sched_clock::duration _nonsleep_wall_time = {};
 private:
     static ::rusage get_rusage() {
         struct ::rusage ru;
         ::getrusage(RUSAGE_THREAD, &ru);
         return ru;
     }
-    static std::chrono::steady_clock::duration cpu_time(const ::rusage& ru) {
+    static sched_clock::duration cpu_time(const ::rusage& ru) {
         return timeval_to_duration(ru.ru_stime) + timeval_to_duration(ru.ru_utime);
     }
     void mark_run_start() {
-        _run_start = std::chrono::steady_clock::now();
+        _run_start = reactor::now();
         _run_start_rusage = get_rusage();
     }
     void mark_run_end() {
         auto start_nvcsw = _run_start_rusage.ru_nvcsw;
         auto start_cpu_time = cpu_time(_run_start_rusage);
         auto start_time = _run_start;
-        _run_start = std::chrono::steady_clock::now();
+        _run_start = reactor::now();
         _run_start_rusage = get_rusage();
         _kernel_stalls += _run_start_rusage.ru_nvcsw - start_nvcsw;
         _nonsleep_cpu_time += cpu_time(_run_start_rusage) - start_cpu_time;
@@ -4419,12 +4596,17 @@ report_reactor_stalls(noncopyable_function<future<> ()> uut) {
 }
 
 std::ostream& operator<<(std::ostream& os, const stall_report& sr) {
-    auto to_ms = [] (std::chrono::steady_clock::duration d) -> float {
+    auto to_ms = [] (sched_clock::duration d) -> float {
         return std::chrono::duration<float>(d) / 1ms;
     };
     return os << format("{} stalls, {} ms stall time, {} ms run time", sr.kernel_stalls, to_ms(sr.stall_time), to_ms(sr.run_wall_time));
 }
 
+size_t scheduling_group_count() {
+    auto b = s_used_scheduling_group_ids_bitmap.load(std::memory_order_relaxed);
+    return __builtin_popcountl(b);
+}
+
 }
 
 #ifdef SEASTAR_TASK_BACKTRACE