#include <seastar/core/internal/buffer_allocator.hh>
#include <seastar/core/scheduling_specific.hh>
#include <seastar/util/log.hh>
+#include <seastar/util/read_first_line.hh>
#include "core/file-impl.hh"
#include "core/reactor_backend.hh"
#include "core/syscall_result.hh"
#include <dirent.h>
#include <linux/types.h> // for xfs, below
#include <sys/ioctl.h>
+#include <linux/perf_event.h>
#include <xfs/linux.h>
#define min min /* prevent xfs.h from defining min() as a macro */
#include <xfs/xfs.h>
uint64_t write_bytes_rate = std::numeric_limits<uint64_t>::max();
uint64_t read_req_rate = std::numeric_limits<uint64_t>::max();
uint64_t write_req_rate = std::numeric_limits<uint64_t>::max();
- uint64_t num_io_queues = 0; // calculated
+ uint64_t read_saturation_length = std::numeric_limits<uint64_t>::max();
+ uint64_t write_saturation_length = std::numeric_limits<uint64_t>::max();
+ bool duplex = false;
};
}
mp.read_req_rate = parse_memory_size(node["read_iops"].as<std::string>());
mp.write_bytes_rate = parse_memory_size(node["write_bandwidth"].as<std::string>());
mp.write_req_rate = parse_memory_size(node["write_iops"].as<std::string>());
+ if (node["read_saturation_length"]) {
+ mp.read_saturation_length = parse_memory_size(node["read_saturation_length"].as<std::string>());
+ }
+ if (node["write_saturation_length"]) {
+ mp.write_saturation_length = parse_memory_size(node["write_saturation_length"].as<std::string>());
+ }
+ if (node["duplex"]) {
+ mp.duplex = node["duplex"].as<bool>();
+ }
return true;
}
};
io_priority_class
reactor::register_one_priority_class(sstring name, uint32_t shares) {
- return io_queue::register_one_priority_class(std::move(name), shares);
+ return io_priority_class::register_one(std::move(name), shares);
}
future<>
reactor::update_shares_for_class(io_priority_class pc, uint32_t shares) {
- return parallel_for_each(_io_queues, [pc, shares] (auto& queue) {
- return queue.second->update_shares_for_class(pc, shares);
- });
+ return pc.update_shares(shares);
}
future<>
reactor::rename_priority_class(io_priority_class pc, sstring new_name) noexcept {
+ return pc.rename(std::move(new_name));
+}
- return futurize_invoke([pc, new_name = std::move(new_name)] () mutable {
- // Taking the lock here will prevent from newly registered classes
- // to register under the old name (and will prevent undefined
- // behavior since this array is shared cross shards. However, it
- // doesn't prevent the case where a newly registered class (that
- // got registered right after the lock release) will be unnecessarily
- // renamed. This is not a real problem and it is a lot better than
- // holding the lock until all cross shard activity is over.
+future<> reactor::update_shares_for_queues(io_priority_class pc, uint32_t shares) {
+ return parallel_for_each(_io_queues, [pc, shares] (auto& queue) {
+ return queue.second->update_shares_for_class(pc, shares);
+ });
+}
- try {
- if (!io_queue::rename_one_priority_class(pc, new_name)) {
- return make_ready_future<>();
- }
- } catch (...) {
- sched_logger.error("exception while trying to rename priority group with id {} to \"{}\" ({})",
- pc.id(), new_name, std::current_exception());
- std::rethrow_exception(std::current_exception());
+future<> reactor::rename_queues(io_priority_class pc, sstring new_name) noexcept {
+ return futurize_invoke([this, pc, new_name = std::move(new_name)] {
+ for (auto&& queue : _io_queues) {
+ queue.second->rename_priority_class(pc, new_name);
}
- return smp::invoke_on_all([pc, new_name = std::move(new_name)] {
- for (auto&& queue : engine()._io_queues) {
- queue.second->rename_priority_class(pc, new_name);
- }
- });
});
}
namespace internal {
+void set_need_preempt_var(const preemption_monitor* np) {
+ get_need_preempt_var() = np;
+}
+
#ifdef SEASTAR_TASK_HISTOGRAM
class task_histogram {
constexpr unsigned reactor::max_queues;
constexpr unsigned reactor::max_aio_per_queue;
-// Broken (returns spurious EIO). Cause/fix unknown.
-bool aio_nowait_supported = false;
+// Base version where this works; some filesystems were only fixed later, so
+// this value is mixed in with filesystem-provided values later.
+bool aio_nowait_supported = kernel_uname().whitelisted({"4.13"});
static bool sched_debug() {
return false;
_signals.handle_signal(signo, std::move(handler));
}
+// Fills a buffer with a hexadecimal representation of an integer
+// and returns a pointer to the first character.
+// For example, convert_hex_safe(buf, 4, uint16_t(12)) fills the buffer with " c".
+template<typename Integral>
+SEASTAR_CONCEPT( requires std::is_integral_v<Integral> )
+char* convert_hex_safe(char *buf, size_t bufsz, Integral n) noexcept {
+ const char *digits = "0123456789abcdef";
+ memset(buf, ' ', bufsz);
+ auto* p = buf + bufsz;
+ do {
+ assert(p > buf);
+ *--p = digits[n & 0xf];
+ n >>= 4;
+ } while (n);
+ return p;
+}
+
// Accumulates an in-memory backtrace and flush to stderr eventually.
// Async-signal safe.
class backtrace_buffer {
_pos = 0;
}
- void append(const char* str, size_t len) noexcept {
+ void reserve(size_t len) noexcept {
+ assert(len < _max_size);
if (_pos + len >= _max_size) {
flush();
}
+ }
+
+ void append(const char* str, size_t len) noexcept {
+ reserve(len);
memcpy(_buf + _pos, str, len);
_pos += len;
}
template <typename Integral>
void append_hex(Integral ptr) noexcept {
char buf[sizeof(ptr) * 2];
- convert_zero_padded_hex_safe(buf, sizeof(buf), ptr);
- append(buf, sizeof(buf));
+ auto p = convert_hex_safe(buf, sizeof(buf), ptr);
+ append(p, (buf + sizeof(buf)) - p);
}
void append_backtrace() noexcept {
append("\n");
});
}
+
+ void append_backtrace_oneline() noexcept {
+ backtrace([this] (frame f) noexcept {
+ reserve(3 + sizeof(f.addr) * 2);
+ append(" 0x");
+ append_hex(f.addr);
+ });
+ }
};
-static void print_with_backtrace(backtrace_buffer& buf) noexcept {
+static void print_with_backtrace(backtrace_buffer& buf, bool oneline) noexcept {
if (local_engine) {
buf.append(" on shard ");
buf.append_decimal(this_shard_id());
}
+ if (!oneline) {
buf.append(".\nBacktrace:\n");
buf.append_backtrace();
+ } else {
+ buf.append(". Backtrace:");
+ buf.append_backtrace_oneline();
+ buf.append("\n");
+ }
buf.flush();
}
-static void print_with_backtrace(const char* cause) noexcept {
+static void print_with_backtrace(const char* cause, bool oneline = false) noexcept {
backtrace_buffer buf;
buf.append(cause);
- print_with_backtrace(buf);
+ print_with_backtrace(buf, oneline);
}
// Installs signal handler stack for current thread.
stack.ss_size = size;
auto r = sigaltstack(&stack, &prev_stack);
throw_system_error_on(r == -1);
- return defer([mem = std::move(mem), prev_stack] () mutable {
+ return defer([mem = std::move(mem), prev_stack] () mutable noexcept {
try {
auto r = sigaltstack(&prev_stack, NULL);
throw_system_error_on(r == -1);
: _shares(std::max(shares, 1.0f))
, _reciprocal_shares_times_2_power_32((uint64_t(1) << 32) / _shares)
, _id(id)
- , _ts(std::chrono::steady_clock::now())
+ , _ts(now())
, _name(name) {
register_stats();
}
}
};
-reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
- : _cfg(cfg)
+reactor::reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg)
+ : _smp(std::move(smp))
+ , _alien(alien)
+ , _cfg(cfg)
, _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC))
, _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
, _id(id)
, _engine_thread(sched::thread::current())
#endif
, _cpu_started(0)
- , _cpu_stall_detector(std::make_unique<cpu_stall_detector>())
+ , _cpu_stall_detector(make_cpu_stall_detector())
, _reuseport(posix_reuseport_detect())
, _thread_pool(std::make_unique<thread_pool>(this, seastar::format("syscall-{}", id))) {
/*
* the chosen backend constructor may want to handle signals and thus
* needs the _signals._signal_handlers map to be initialized.
*/
- _backend = rbs.create(this);
+ _backend = rbs.create(*this);
*internal::get_scheduling_group_specific_thread_local_data_ptr() = &_scheduling_group_specific_data;
_task_queues.push_back(std::make_unique<task_queue>(0, "main", 1000));
_task_queues.push_back(std::make_unique<task_queue>(1, "atexit", 1000));
_at_destroy_tasks = _task_queues.back().get();
- g_need_preempt = &_preemption_monitor;
+ set_need_preempt_var(&_preemption_monitor);
seastar::thread_impl::init();
_backend->start_tick();
}
}
+reactor::sched_stats
+reactor::get_sched_stats() const {
+ sched_stats ret;
+ ret.tasks_processed = tasks_processed();
+ return ret;
+}
+
future<> reactor::readable(pollable_fd_state& fd) {
return _backend->readable(fd);
}
// a safe place.
backtrace([] (frame) {});
update_config(cfg);
+
+ namespace sm = seastar::metrics;
+
+ _metrics.add_group("stall_detector", {
+ sm::make_derive("reported", _total_reported, sm::description("Total number of reported stalls, look in the traces for the exact reason"))});
+
+ // note: if something is added here that can, it should take care to destroy _timer.
+}
+
+cpu_stall_detector_posix_timer::cpu_stall_detector_posix_timer(cpu_stall_detector_config cfg) : cpu_stall_detector(cfg) {
struct sigevent sev = {};
sev.sigev_notify = SIGEV_THREAD_ID;
sev.sigev_signo = signal_number();
if (err) {
throw std::system_error(std::error_code(err, std::system_category()));
}
-
- namespace sm = seastar::metrics;
-
- _metrics.add_group("stall_detector", {
- sm::make_derive("reported", _total_reported, sm::description("Total number of reported stalls, look in the traces for the exact reason"))});
-
-
- // note: if something is added here that can, it should take care to destroy _timer.
}
-cpu_stall_detector::~cpu_stall_detector() {
+cpu_stall_detector_posix_timer::~cpu_stall_detector_posix_timer() {
timer_delete(_timer);
}
void cpu_stall_detector::update_config(cpu_stall_detector_config cfg) {
_config = cfg;
- _threshold = std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold);
- _slack = std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold * cfg.slack);
+ _threshold = std::chrono::duration_cast<sched_clock::duration>(cfg.threshold);
+ _slack = std::chrono::duration_cast<sched_clock::duration>(cfg.threshold * cfg.slack);
_stall_detector_reports_per_minute = cfg.stall_detector_reports_per_minute;
_max_reports_per_minute = cfg.stall_detector_reports_per_minute;
- _rearm_timer_at = std::chrono::steady_clock::now();
+ _rearm_timer_at = reactor::now();
}
void cpu_stall_detector::maybe_report() {
//
// We can do it a cheaper if we don't report suppressed backtraces.
void cpu_stall_detector::on_signal() {
+ if (reap_event_and_check_spuriousness()) {
+ return;
+ }
auto tasks_processed = engine().tasks_processed();
auto last_seen = _last_tasks_processed_seen.load(std::memory_order_relaxed);
if (!last_seen) {
arm_timer();
}
-void cpu_stall_detector::report_suppressions(std::chrono::steady_clock::time_point now) {
+void cpu_stall_detector::report_suppressions(sched_clock::time_point now) {
if (now > _minute_mark + 60s) {
if (_reported > _max_reports_per_minute) {
auto suppressed = _reported - _max_reports_per_minute;
}
}
-void cpu_stall_detector::arm_timer() {
+void cpu_stall_detector_posix_timer::arm_timer() {
auto its = posix::to_relative_itimerspec(_threshold * _report_at + _slack, 0s);
timer_settime(_timer, 0, &its, nullptr);
}
-void cpu_stall_detector::start_task_run(std::chrono::steady_clock::time_point now) {
+void cpu_stall_detector::start_task_run(sched_clock::time_point now) {
if (now > _rearm_timer_at) {
report_suppressions(now);
_report_at = 1;
std::atomic_signal_fence(std::memory_order_release); // Don't delay this write, so the signal handler can see it
}
-void cpu_stall_detector::end_task_run(std::chrono::steady_clock::time_point now) {
+void cpu_stall_detector::end_task_run(sched_clock::time_point now) {
std::atomic_signal_fence(std::memory_order_acquire); // Don't hoist this write, so the signal handler can see it
_last_tasks_processed_seen.store(0, std::memory_order_relaxed);
}
-void cpu_stall_detector::start_sleep() {
+void cpu_stall_detector_posix_timer::start_sleep() {
auto its = posix::to_relative_itimerspec(0s, 0s);
timer_settime(_timer, 0, &its, nullptr);
- _rearm_timer_at = std::chrono::steady_clock::now();
+ _rearm_timer_at = reactor::now();
}
void cpu_stall_detector::end_sleep() {
}
+static long
+perf_event_open(struct perf_event_attr* hw_event, pid_t pid, int cpu, int group_fd, unsigned long flags) {
+ return syscall(__NR_perf_event_open, hw_event, pid, cpu, group_fd, flags);
+}
+
+cpu_stall_detector_linux_perf_event::cpu_stall_detector_linux_perf_event(file_desc fd, cpu_stall_detector_config cfg)
+ : cpu_stall_detector(cfg), _fd(std::move(fd)) {
+ void* ret = ::mmap(nullptr, 2*getpagesize(), PROT_READ|PROT_WRITE, MAP_SHARED, _fd.get(), 0);
+ if (ret == MAP_FAILED) {
+ abort();
+ }
+ _mmap = static_cast<struct ::perf_event_mmap_page*>(ret);
+ _data_area = reinterpret_cast<char*>(_mmap) + getpagesize();
+ _data_area_mask = getpagesize() - 1;
+}
+
+cpu_stall_detector_linux_perf_event::~cpu_stall_detector_linux_perf_event() {
+ ::munmap(_mmap, 2*getpagesize());
+}
+
+void
+cpu_stall_detector_linux_perf_event::arm_timer() {
+ uint64_t ns = (_threshold * _report_at + _slack) / 1ns;
+ if (_enabled && _current_period == ns) [[likely]] {
+ // Common case - we're re-arming with the same period, the counter
+ // is already enabled.
+ _fd.ioctl(PERF_EVENT_IOC_RESET, 0);
+ } else {
+ // Uncommon case - we're moving from disabled to enabled, or changing
+ // the period. Issue more calls and be careful.
+ _fd.ioctl(PERF_EVENT_IOC_DISABLE, 0); // avoid false alarms while we modify stuff
+ _fd.ioctl(PERF_EVENT_IOC_PERIOD, ns);
+ _fd.ioctl(PERF_EVENT_IOC_RESET, 0);
+ _fd.ioctl(PERF_EVENT_IOC_ENABLE, 0);
+ _enabled = true;
+ _current_period = ns;
+ }
+}
+
void
-reactor::task_quota_timer_thread_fn() {
- auto thread_name = seastar::format("timer-{}", _id);
- pthread_setname_np(pthread_self(), thread_name.c_str());
+cpu_stall_detector_linux_perf_event::start_sleep() {
+ _fd.ioctl(PERF_EVENT_IOC_DISABLE, 0);
+ _enabled = false;
+}
- sigset_t mask;
- sigfillset(&mask);
- for (auto sig : { SIGSEGV }) {
- sigdelset(&mask, sig);
+bool
+cpu_stall_detector_linux_perf_event::reap_event_and_check_spuriousness() {
+ struct read_format {
+ uint64_t value;
+ } buf;
+ _fd.read(&buf, sizeof(read_format));
+ return buf.value < _current_period;
+}
+
+void
+cpu_stall_detector_linux_perf_event::maybe_report_kernel_trace() {
+ data_area_reader reader(*this);
+ auto current_record = [&] () -> ::perf_event_header {
+ return reader.read_struct<perf_event_header>();
+ };
+
+ while (reader.have_data()) {
+ auto record = current_record();
+
+ if (record.type != PERF_RECORD_SAMPLE) {
+ reader.skip(record.size - sizeof(record));
+ continue;
+ }
+
+ auto nr = reader.read_u64();
+ backtrace_buffer buf;
+ buf.append("kernel callstack:");
+ for (uint64_t i = 0; i < nr; ++i) {
+ buf.append(" 0x");
+ buf.append_hex(uintptr_t(reader.read_u64()));
+ }
+ buf.append("\n");
+ buf.flush();
+ };
+}
+
+std::unique_ptr<cpu_stall_detector_linux_perf_event>
+cpu_stall_detector_linux_perf_event::try_make(cpu_stall_detector_config cfg) {
+ ::perf_event_attr pea = {
+ .type = PERF_TYPE_SOFTWARE,
+ .size = sizeof(pea),
+ .config = PERF_COUNT_SW_TASK_CLOCK, // more likely to work on virtual machines than hardware events
+ .sample_period = 1'000'000'000, // Needs non-zero value or PERF_IOC_PERIOD gets confused
+ .sample_type = PERF_SAMPLE_CALLCHAIN,
+ .disabled = 1,
+ .exclude_callchain_user = 1, // we're using backtrace() to capture the user callchain
+ .wakeup_events = 1,
+ };
+ unsigned long flags = 0;
+ if (kernel_uname().whitelisted({"3.14"})) {
+ flags |= PERF_FLAG_FD_CLOEXEC;
+ }
+ int fd = perf_event_open(&pea, 0, -1, -1, flags);
+ if (fd == -1) {
+ throw std::system_error(errno, std::system_category(), "perf_event_open() failed");
+ }
+ auto desc = file_desc::from_fd(fd);
+ struct f_owner_ex sig_owner = {
+ .type = F_OWNER_TID,
+ .pid = static_cast<pid_t>(syscall(SYS_gettid)),
+ };
+ auto ret1 = ::fcntl(fd, F_SETOWN_EX, &sig_owner);
+ if (ret1 == -1) {
+ abort();
}
- auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
- if (r) {
- seastar_logger.error("Thread {}: failed to block signals. Aborting.", thread_name.c_str());
+ auto ret2 = ::fcntl(fd, F_SETSIG, signal_number());
+ if (ret2 == -1) {
abort();
}
-
- // We need to wait until task quota is set before we can calculate how many ticks are to
- // a minute. Technically task_quota is used from many threads, but since it is read-only here
- // and only used during initialization we will avoid complicating the code.
- {
- uint64_t events;
- _task_quota_timer.read(&events, 8);
- request_preemption();
+ auto fd_flags = ::fcntl(fd, F_GETFL);
+ if (fd_flags == -1) {
+ abort();
+ }
+ auto ret3 = ::fcntl(fd, F_SETFL, fd_flags | O_ASYNC);
+ if (ret3 == -1) {
+ abort();
}
+ return std::make_unique<cpu_stall_detector_linux_perf_event>(std::move(desc), std::move(cfg));
+}
- while (!_dying.load(std::memory_order_relaxed)) {
- uint64_t events;
- _task_quota_timer.read(&events, 8);
- request_preemption();
- // We're in a different thread, but guaranteed to be on the same core, so even
- // a signal fence is overdoing it
- std::atomic_signal_fence(std::memory_order_seq_cst);
+std::unique_ptr<cpu_stall_detector>
+internal::make_cpu_stall_detector(cpu_stall_detector_config cfg) {
+ try {
+ return cpu_stall_detector_linux_perf_event::try_make(cfg);
+ } catch (...) {
+ seastar_logger.warn("Creation of perf_event based stall detector failed, falling back to posix timer: {}", std::current_exception());
+ return std::make_unique<cpu_stall_detector_posix_timer>(cfg);
}
}
+
void
reactor::update_blocked_reactor_notify_ms(std::chrono::milliseconds ms) {
auto cfg = _cpu_stall_detector->get_config();
void
cpu_stall_detector::generate_trace() {
- auto delta = std::chrono::steady_clock::now() - _run_started_at;
+ auto delta = reactor::now() - _run_started_at;
_total_reported++;
if (_config.report) {
buf.append("Reactor stalled for ");
buf.append_decimal(uint64_t(delta / 1ms));
buf.append(" ms");
- print_with_backtrace(buf);
+ print_with_backtrace(buf, _config.oneline);
+ maybe_report_kernel_trace();
}
template <typename T, typename E, typename EnableFunc>
}
#endif
-class network_stack_registry {
-public:
- using options = boost::program_options::variables_map;
-private:
- static std::unordered_map<sstring,
- noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)>>& _map() {
- static std::unordered_map<sstring,
- noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)>> map;
- return map;
- }
- static sstring& _default() {
- static sstring def;
- return def;
- }
+class network_stack_factory {
+ network_stack_entry::factory_func _func;
+
public:
- static boost::program_options::options_description& options_description() {
- static boost::program_options::options_description opts;
- return opts;
- }
- static void register_stack(sstring name, boost::program_options::options_description opts,
- noncopyable_function<future<std::unique_ptr<network_stack>>(options opts)> create,
- bool make_default);
- static sstring default_stack();
- static std::vector<sstring> list();
- static future<std::unique_ptr<network_stack>> create(options opts);
- static future<std::unique_ptr<network_stack>> create(sstring name, options opts);
+ network_stack_factory(noncopyable_function<future<std::unique_ptr<network_stack>> (const program_options::option_group&)> func)
+ : _func(std::move(func)) { }
+ future<std::unique_ptr<network_stack>> operator()(const program_options::option_group& opts) { return _func(opts); }
};
-void reactor::configure(boost::program_options::variables_map vm) {
- _network_stack_ready = vm.count("network-stack")
- ? network_stack_registry::create(sstring(vm["network-stack"].as<std::string>()), vm)
- : network_stack_registry::create(vm);
+void reactor::configure(const reactor_options& opts) {
+ _network_stack_ready = opts.network_stack.get_selected_candidate()(*opts.network_stack.get_selected_candidate_opts());
- _handle_sigint = !vm.count("no-handle-interrupt");
- auto task_quota = vm["task-quota-ms"].as<double>() * 1ms;
+ _handle_sigint = !opts.no_handle_interrupt;
+ auto task_quota = opts.task_quota_ms.get_value() * 1ms;
_task_quota = std::chrono::duration_cast<sched_clock::duration>(task_quota);
- auto blocked_time = vm["blocked-reactor-notify-ms"].as<unsigned>() * 1ms;
+ auto blocked_time = opts.blocked_reactor_notify_ms.get_value() * 1ms;
cpu_stall_detector_config csdc;
csdc.threshold = blocked_time;
- csdc.stall_detector_reports_per_minute = vm["blocked-reactor-reports-per-minute"].as<unsigned>();
+ csdc.stall_detector_reports_per_minute = opts.blocked_reactor_reports_per_minute.get_value();
+ csdc.oneline = opts.blocked_reactor_report_format_oneline.get_value();
_cpu_stall_detector->update_config(csdc);
- _max_task_backlog = vm["max-task-backlog"].as<unsigned>();
- _max_poll_time = vm["idle-poll-time-us"].as<unsigned>() * 1us;
- if (vm.count("poll-mode")) {
+ _max_task_backlog = opts.max_task_backlog.get_value();
+ _max_poll_time = opts.idle_poll_time_us.get_value() * 1us;
+ if (opts.poll_mode) {
_max_poll_time = std::chrono::nanoseconds::max();
}
- if (vm.count("overprovisioned")
- && vm["idle-poll-time-us"].defaulted()
- && !vm.count("poll-mode")) {
+ if (opts.overprovisioned && opts.idle_poll_time_us.defaulted() && !opts.poll_mode) {
_max_poll_time = 0us;
}
- set_strict_dma(!vm.count("relaxed-dma"));
- if (!vm["poll-aio"].as<bool>()
- || (vm["poll-aio"].defaulted() && vm.count("overprovisioned"))) {
+ set_strict_dma(!opts.relaxed_dma);
+ if (!opts.poll_aio.get_value() || (opts.poll_aio.defaulted() && opts.overprovisioned)) {
_aio_eventfd = pollable_fd(file_desc::eventfd(0, 0));
}
- set_bypass_fsync(vm["unsafe-bypass-fsync"].as<bool>());
- _force_io_getevents_syscall = vm["force-aio-syscalls"].as<bool>();
- aio_nowait_supported = vm["linux-aio-nowait"].as<bool>();
- _have_aio_fsync = vm["aio-fsync"].as<bool>();
+ set_bypass_fsync(opts.unsafe_bypass_fsync.get_value());
+ _kernel_page_cache = opts.kernel_page_cache.get_value();
+ _force_io_getevents_syscall = opts.force_aio_syscalls.get_value();
+ aio_nowait_supported = opts.linux_aio_nowait.get_value();
+ _have_aio_fsync = opts.aio_fsync.get_value();
}
pollable_fd
}
}
-void
-reactor::submit_io(io_completion* desc, io_request req) noexcept {
- req.attach_kernel_completion(desc);
- try {
- _pending_io.push_back(std::move(req));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
-}
-
bool
reactor::flush_pending_aio() {
- for (auto& ioq : my_io_queues) {
- ioq->poll_io_queue();
+ for (auto& ioq : _io_queues) {
+ ioq.second->poll_io_queue();
}
return false;
}
+steady_clock_type::time_point reactor::next_pending_aio() const noexcept {
+ steady_clock_type::time_point next = steady_clock_type::time_point::max();
+
+ for (auto& ioq : _io_queues) {
+ steady_clock_type::time_point n = ioq.second->next_pending_aio();
+ if (n < next) {
+ next = std::move(n);
+ }
+ }
+
+ return next;
+}
+
bool
reactor::reap_kernel_completions() {
return _backend->reap_kernel_completions();
const io_priority_class& default_priority_class() {
static thread_local auto shard_default_class = [] {
- return engine().register_one_priority_class("default", 1);
+ return io_priority_class::register_one("default", 1);
}();
return shard_default_class;
}
future<size_t>
-reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
+reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req, io_intent* intent) noexcept {
++_io_stats.aio_reads;
_io_stats.aio_read_bytes += len;
- return ioq->queue_request(pc, len, std::move(req));
+ return ioq->queue_request(pc, len, std::move(req), intent);
}
future<size_t>
-reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
+reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req, io_intent* intent) noexcept {
++_io_stats.aio_writes;
_io_stats.aio_write_bytes += len;
- return ioq->queue_request(pc, len, std::move(req));
+ return ioq->queue_request(pc, len, std::move(req), intent);
}
namespace internal {
reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_options options) noexcept {
return do_with(static_cast<int>(flags), std::move(options), [this, nameref] (auto& open_flags, file_open_options& options) {
sstring name(nameref);
- return _thread_pool->submit<syscall_result<int>>([name, &open_flags, &options, strict_o_direct = _strict_o_direct, bypass_fsync = _bypass_fsync] () mutable {
- // We want O_DIRECT, except in two cases:
+ return _thread_pool->submit<syscall_result<int>>([this, name, &open_flags, &options, strict_o_direct = _strict_o_direct, bypass_fsync = _bypass_fsync] () mutable {
+ // We want O_DIRECT, except in three cases:
// - tmpfs (which doesn't support it, but works fine anyway)
// - strict_o_direct == false (where we forgive it being not supported)
+ // - _kernel_page_cache == true (where we disable it for short-lived test processes)
// Because open() with O_DIRECT will fail, we open it without O_DIRECT, try
// to update it to O_DIRECT with fcntl(), and if that fails, see if we
// can forgive it.
if (fd == -1) {
return wrap_syscall<int>(fd);
}
- int r = ::fcntl(fd, F_SETFL, open_flags | O_DIRECT);
+ int o_direct_flag = _kernel_page_cache ? 0 : O_DIRECT;
+ int r = ::fcntl(fd, F_SETFL, open_flags | o_direct_flag);
auto maybe_ret = wrap_syscall<int>(r); // capture errno (should be EINVAL)
if (r == -1 && strict_o_direct && !is_tmpfs(fd)) {
::close(fd);
}
if (fd != -1) {
fsxattr attr = {};
- if (options.extent_allocation_size_hint) {
+ // xfs delayed allocation is disabled when extent size hints are present.
+ // This causes tons of xfs log fsyncs. Given that extent size hints are
+ // unneeded when delayed allocation is available (which is the case
+ // when not using O_DIRECT), disable them.
+ if (options.extent_allocation_size_hint && !_kernel_page_cache) {
attr.fsx_xflags |= XFS_XFLAG_EXTSIZE;
- attr.fsx_extsize = options.extent_allocation_size_hint;
+ attr.fsx_extsize = std::min(options.extent_allocation_size_hint,
+ file_open_options::max_extent_allocation_size_hint);
}
// Ignore error; may be !xfs, and just a hint anyway
::ioctl(fd, XFS_IOC_FSSETXATTR, &attr);
auto desc = new fsync_io_desc;
auto fut = desc->get_future();
auto req = io_request::make_fdatasync(fd);
- submit_io(desc, std::move(req));
+ _io_sink.submit(desc, std::move(req));
return fut;
});
}
void reactor::stop() {
assert(_id == 0);
- smp::cleanup_cpu();
+ _smp->cleanup_cpu();
if (!_stopping) {
// Run exit tasks locally and then stop all other engines
// in the background and wait on semaphore for all to complete.
return do_with(semaphore(0), [this] (semaphore& sem) {
// Stop other cpus asynchronously, signal when done.
(void)smp::invoke_on_others(0, [] {
- smp::cleanup_cpu();
+ engine()._smp->cleanup_cpu();
return engine().run_exit_tasks().then([] {
engine()._stopped = true;
});
sm::make_derive("free_operations", [] { return memory::stats().frees(); }, sm::description("Total number of free operations")),
sm::make_derive("cross_cpu_free_operations", [] { return memory::stats().cross_cpu_frees(); }, sm::description("Total number of cross cpu free")),
sm::make_gauge("malloc_live_objects", [] { return memory::stats().live_objects(); }, sm::description("Number of live objects")),
- sm::make_current_bytes("free_memory", [] { return memory::stats().free_memory(); }, sm::description("Free memeory size in bytes")),
- sm::make_current_bytes("total_memory", [] { return memory::stats().total_memory(); }, sm::description("Total memeory size in bytes")),
- sm::make_current_bytes("allocated_memory", [] { return memory::stats().allocated_memory(); }, sm::description("Allocated memeory size in bytes")),
+ sm::make_current_bytes("free_memory", [] { return memory::stats().free_memory(); }, sm::description("Free memory size in bytes")),
+ sm::make_current_bytes("total_memory", [] { return memory::stats().total_memory(); }, sm::description("Total memory size in bytes")),
+ sm::make_current_bytes("allocated_memory", [] { return memory::stats().allocated_memory(); }, sm::description("Allocated memory size in bytes")),
sm::make_derive("reclaims_operations", [] { return memory::stats().reclaims(); }, sm::description("Total reclaims operations"))
});
}
};
-class reactor::io_queue_submission_pollfn final : public simple_pollfn<true> {
+class reactor::io_queue_submission_pollfn final : public reactor::pollfn {
reactor& _r;
+ // Wake-up the reactor with highres timer when the io-queue
+ // decides to delay dispatching until some time point in
+ // the future
+ timer<> _nearest_wakeup { [this] { _armed = false; } };
+ bool _armed = false;
public:
io_queue_submission_pollfn(reactor& r) : _r(r) {}
virtual bool poll() final override {
return _r.flush_pending_aio();
}
+ virtual bool pure_poll() override final {
+ return poll();
+ }
+ virtual bool try_enter_interrupt_mode() override {
+ auto next = _r.next_pending_aio();
+ auto now = steady_clock_type::now();
+ if (next <= now) {
+ return false;
+ }
+ _nearest_wakeup.arm(next);
+ _armed = true;
+ return true;
+ }
+ virtual void exit_interrupt_mode() override final {
+ if (_armed) {
+ _nearest_wakeup.cancel();
+ _armed = false;
+ }
+ }
};
// Other cpus can queue items for us to free; and they won't notify
smp_pollfn(reactor& r) : _r(r) {}
virtual bool poll() final override {
return (smp::poll_queues() |
- alien::smp::poll_queues());
+ _r._alien.poll_queues());
}
virtual bool pure_poll() final override {
return (smp::pure_poll_queues() ||
- alien::smp::pure_poll_queues());
+ _r._alien.pure_poll_queues());
}
virtual bool try_enter_interrupt_mode() override {
// systemwide_memory_barrier() is very slow if run concurrently,
sched_print("run_some_tasks: start");
reset_preemption_monitor();
- sched_clock::time_point t_run_completed = std::chrono::steady_clock::now();
+ sched_clock::time_point t_run_completed = now();
STAP_PROBE(seastar, reactor_run_tasks_start);
_cpu_stall_detector->start_task_run(t_run_completed);
do {
_last_vruntime = std::max(tq->_vruntime, _last_vruntime);
run_tasks(*tq);
tq->_current = false;
- t_run_completed = std::chrono::steady_clock::now();
+ t_run_completed = now();
auto delta = t_run_completed - t_run_started;
account_runtime(*tq, delta);
sched_print("run complete ({} {}); time consumed {} usec; final vruntime {} empty {}",
sched_print("tq {} {} losing vruntime {} due to sleep", (void*)&tq, tq._name, _last_vruntime - tq._vruntime);
}
tq._vruntime = std::max(_last_vruntime, tq._vruntime);
- auto now = std::chrono::steady_clock::now();
+ auto now = reactor::now();
tq._waittime += now - tq._ts;
tq._ts = now;
_activating_task_queues.push_back(&tq);
});
}
-int reactor::run() {
+int reactor::run() noexcept {
+ try {
+ return do_run();
+ } catch (const std::exception& e) {
+ seastar_logger.error(e.what());
+ print_with_backtrace("exception running reactor main loop");
+ _exit(1);
+ }
+}
+
+int reactor::do_run() {
#ifndef SEASTAR_ASAN_ENABLED
// SIGSTKSZ is too small when using asan. We also don't need to
// handle SIGSEGV ourselves when using asan, so just don't install
using namespace std::chrono_literals;
timer<lowres_clock> load_timer;
auto last_idle = _total_idle;
- auto idle_start = sched_clock::now(), idle_end = idle_start;
+ auto idle_start = now(), idle_end = idle_start;
load_timer.set_callback([this, &last_idle, &idle_start, &idle_end] () mutable {
_total_idle += idle_end - idle_start;
auto load = double((_total_idle - last_idle).count()) / double(std::chrono::duration_cast<sched_clock::duration>(1s).count());
run_tasks(*_at_destroy_tasks);
}
_finished_running_tasks = true;
- smp::arrive_at_event_loop_end();
+ _smp->arrive_at_event_loop_end();
if (_id == 0) {
- smp::join_all();
+ _smp->join_all();
}
break;
}
idle = false;
}
} else {
- idle_end = sched_clock::now();
+ idle_end = now();
if (!idle) {
idle_start = idle_end;
idle = true;
// Turn off the task quota timer to avoid spurious wakeups
struct itimerspec zero_itimerspec = {};
_task_quota_timer.timerfd_settime(0, zero_itimerspec);
- auto start_sleep = sched_clock::now();
+ auto start_sleep = now();
_cpu_stall_detector->start_sleep();
sleep();
_cpu_stall_detector->end_sleep();
// We may have slept for a while, so freshen idle_end
- idle_end = sched_clock::now();
+ idle_end = now();
_total_sleep += idle_end - start_sleep;
_task_quota_timer.timerfd_settime(0, task_quote_itimerspec);
}
// This is needed because the reactor is destroyed from the thread_local destructors. If
// the I/O queue happens to use any other infrastructure that is also kept this way (for
// instance, collectd), we will not have any way to guarantee who is destroyed first.
- my_io_queues.clear();
+ _io_queues.clear();
return _return;
}
namespace seastar {
-void network_stack_registry::register_stack(sstring name,
- boost::program_options::options_description opts,
- noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)> create, bool make_default) {
- if (_map().count(name)) {
- return;
- }
- _map()[name] = std::move(create);
- options_description().add(opts);
- if (make_default) {
- _default() = name;
- }
+static bool kernel_supports_aio_fsync() {
+ return kernel_uname().whitelisted({"4.18"});
}
-void register_network_stack(sstring name, boost::program_options::options_description opts,
- noncopyable_function<future<std::unique_ptr<network_stack>>(boost::program_options::variables_map)>
- create,
- bool make_default) {
- return network_stack_registry::register_stack(
- std::move(name), std::move(opts), std::move(create), make_default);
-}
+static program_options::selection_value<network_stack_factory> create_network_stacks_option(reactor_options& zis) {
+ using value_type = program_options::selection_value<network_stack_factory>;
+ value_type::candidates candidates;
+ std::vector<std::string> net_stack_names;
-sstring network_stack_registry::default_stack() {
- return _default();
-}
+ auto deleter = [] (network_stack_factory* p) { delete p; };
-std::vector<sstring> network_stack_registry::list() {
- std::vector<sstring> ret;
- for (auto&& ns : _map()) {
- ret.push_back(ns.first);
+ std::string default_stack;
+ for (auto reg_func : {register_native_stack, register_posix_stack}) {
+ auto s = reg_func();
+ if (s.is_default) {
+ default_stack = s.name;
+ }
+ candidates.push_back({s.name, {new network_stack_factory(std::move(s.factory)), deleter}, std::move(s.opts)});
+ net_stack_names.emplace_back(s.name);
}
- return ret;
-}
-future<std::unique_ptr<network_stack>>
-network_stack_registry::create(options opts) {
- return create(_default(), opts);
+ return program_options::selection_value<network_stack_factory>(zis, "network-stack", std::move(candidates), default_stack,
+ format("select network stack (valid values: {})", format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")));
}
-future<std::unique_ptr<network_stack>>
-network_stack_registry::create(sstring name, options opts) {
- if (!_map().count(name)) {
- throw std::runtime_error(format("network stack {} not registered", name));
- }
- return _map()[name](opts);
-}
+static program_options::selection_value<reactor_backend_selector>::candidates backend_selector_candidates() {
+ using value_type = program_options::selection_value<reactor_backend_selector>;
+ value_type::candidates candidates;
-static bool kernel_supports_aio_fsync() {
- return kernel_uname().whitelisted({"4.18"});
+ auto deleter = [] (reactor_backend_selector* p) { delete p; };
+
+ for (auto&& be : reactor_backend_selector::available()) {
+ auto name = be.name();
+ candidates.push_back({std::move(name), {new reactor_backend_selector(std::move(be)), deleter}, {}});
+ }
+ return candidates;
}
-boost::program_options::options_description
-reactor::get_options_description(reactor_config cfg) {
- namespace bpo = boost::program_options;
- bpo::options_description opts("Core options");
- auto net_stack_names = network_stack_registry::list();
- opts.add_options()
- ("network-stack", bpo::value<std::string>(),
- format("select network stack (valid values: {})",
- format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")).c_str())
- ("poll-mode", "poll continuously (100% cpu use)")
- ("idle-poll-time-us", bpo::value<unsigned>()->default_value(calculate_poll_time() / 1us),
+reactor_options::reactor_options(program_options::option_group* parent_group)
+ : program_options::option_group(parent_group, "Core options")
+ , network_stack(create_network_stacks_option(*this))
+ , poll_mode(*this, "poll-mode", "poll continuously (100% cpu use)")
+ , idle_poll_time_us(*this, "idle-poll-time-us", reactor::calculate_poll_time() / 1us,
"idle polling time in microseconds (reduce for overprovisioned environments or laptops)")
- ("poll-aio", bpo::value<bool>()->default_value(true),
+ , poll_aio(*this, "poll-aio", true,
"busy-poll for disk I/O (reduces latency and increases throughput)")
- ("task-quota-ms", bpo::value<double>()->default_value(cfg.task_quota / 1ms), "Max time (ms) between polls")
- ("max-task-backlog", bpo::value<unsigned>()->default_value(1000), "Maximum number of task backlog to allow; above this we ignore I/O")
- ("blocked-reactor-notify-ms", bpo::value<unsigned>()->default_value(200), "threshold in miliseconds over which the reactor is considered blocked if no progress is made")
- ("blocked-reactor-reports-per-minute", bpo::value<unsigned>()->default_value(5), "Maximum number of backtraces reported by stall detector per minute")
- ("relaxed-dma", "allow using buffered I/O if DMA is not available (reduces performance)")
- ("linux-aio-nowait",
- bpo::value<bool>()->default_value(aio_nowait_supported),
+ , task_quota_ms(*this, "task-quota-ms", 500, "Max time (ms) between polls")
+ , max_task_backlog(*this, "max-task-backlog", 1000, "Maximum number of task backlog to allow; above this we ignore I/O")
+ , blocked_reactor_notify_ms(*this, "blocked-reactor-notify-ms", 200, "threshold in miliseconds over which the reactor is considered blocked if no progress is made")
+ , blocked_reactor_reports_per_minute(*this, "blocked-reactor-reports-per-minute", 5, "Maximum number of backtraces reported by stall detector per minute")
+ , blocked_reactor_report_format_oneline(*this, "blocked-reactor-report-format-oneline", true, "Print a simplified backtrace on a single line")
+ , relaxed_dma(*this, "relaxed-dma", "allow using buffered I/O if DMA is not available (reduces performance)")
+ , linux_aio_nowait(*this, "linux-aio-nowait", aio_nowait_supported,
"use the Linux NOWAIT AIO feature, which reduces reactor stalls due to aio (autodetected)")
- ("unsafe-bypass-fsync", bpo::value<bool>()->default_value(false), "Bypass fsync(), may result in data loss. Use for testing on consumer drives")
- ("overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0")
- ("abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory")
- ("force-aio-syscalls", bpo::value<bool>()->default_value(false),
+ , unsafe_bypass_fsync(*this, "unsafe-bypass-fsync", false, "Bypass fsync(), may result in data loss. Use for testing on consumer drives")
+ , kernel_page_cache(*this, "kernel-page-cache", false,
+ "Use the kernel page cache. This disables DMA (O_DIRECT)."
+ " Useful for short-lived functional tests with a small data set.")
+ , overprovisioned(*this, "overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0")
+ , abort_on_seastar_bad_alloc(*this, "abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory")
+ , force_aio_syscalls(*this, "force-aio-syscalls", false,
"Force io_getevents(2) to issue a system call, instead of bypassing the kernel when possible."
" This makes strace output more useful, but slows down the application")
- ("dump-memory-diagnostics-on-alloc-failure-kind", bpo::value<std::string>()->default_value("critical"), "Dump diagnostics of the seastar allocator state on allocation failure."
+ , dump_memory_diagnostics_on_alloc_failure_kind(*this, "dump-memory-diagnostics-on-alloc-failure-kind", memory::alloc_failure_kind::critical,
+ "Dump diagnostics of the seastar allocator state on allocation failure."
" Accepted values: never, critical (default), always. When set to critical, only allocations marked as critical will trigger diagnostics dump."
" The diagnostics will be written to the seastar_memory logger, with error level."
" Note that if the seastar_memory logger is set to debug or trace level, the diagnostics will be logged irrespective of this setting.")
- ("reactor-backend", bpo::value<reactor_backend_selector>()->default_value(reactor_backend_selector::default_backend()),
- format("Internal reactor implementation ({})", reactor_backend_selector::available()).c_str())
- ("aio-fsync", bpo::value<bool>()->default_value(kernel_supports_aio_fsync()),
+ , reactor_backend(*this, "reactor-backend", backend_selector_candidates(), reactor_backend_selector::default_backend().name(),
+ format("Internal reactor implementation ({})", reactor_backend_selector::available()))
+ , aio_fsync(*this, "aio-fsync", kernel_supports_aio_fsync(),
"Use Linux aio for fsync() calls. This reduces latency; requires Linux 4.18 or later.")
+ , max_networking_io_control_blocks(*this, "max-networking-io-control-blocks", 10000,
+ "Maximum number of I/O control blocks (IOCBs) to allocate per shard. This translates to the number of sockets supported per shard."
+ " Requires tuning /proc/sys/fs/aio-max-nr. Only valid for the linux-aio reactor backend (see --reactor-backend).")
#ifdef SEASTAR_HEAPPROF
- ("heapprof", "enable seastar heap profiling")
+ , heapprof(*this, "heapprof", "enable seastar heap profiling")
+#else
+ , heapprof(*this, "heapprof", program_options::unused{})
#endif
- ;
- if (cfg.auto_handle_sigint_sigterm) {
- opts.add_options()
- ("no-handle-interrupt", "ignore SIGINT (for gdb)")
- ;
- }
- opts.add(network_stack_registry::options_description());
- return opts;
+ , no_handle_interrupt(*this, "no-handle-interrupt", "ignore SIGINT (for gdb)")
+{
}
-boost::program_options::options_description
-smp::get_options_description()
-{
- namespace bpo = boost::program_options;
- bpo::options_description opts("SMP options");
- opts.add_options()
- ("smp,c", bpo::value<unsigned>(), "number of threads (default: one per CPU)")
- ("cpuset", bpo::value<cpuset_bpo_wrapper>(), "CPUs to use (in cpuset(7) format; default: all))")
- ("memory,m", bpo::value<std::string>(), "memory to use, in bytes (ex: 4G) (default: all)")
- ("reserve-memory", bpo::value<std::string>(), "memory reserved to OS (if --memory not specified)")
- ("hugepages", bpo::value<std::string>(), "path to accessible hugetlbfs mount (typically /dev/hugepages/something)")
- ("lock-memory", bpo::value<bool>(), "lock all memory (prevents swapping)")
- ("thread-affinity", bpo::value<bool>()->default_value(true), "pin threads to their cpus (disable for overprovisioning)")
+smp_options::smp_options(program_options::option_group* parent_group)
+ : program_options::option_group(parent_group, "SMP options")
+ , smp(*this, "smp", {}, "number of threads (default: one per CPU)")
+ , cpuset(*this, "cpuset", {}, "CPUs to use (in cpuset(7) format; default: all))")
+ , memory(*this, "memory", std::nullopt, "memory to use, in bytes (ex: 4G) (default: all)")
+ , reserve_memory(*this, "reserve-memory", {}, "memory reserved to OS (if --memory not specified)")
+ , hugepages(*this, "hugepages", {}, "path to accessible hugetlbfs mount (typically /dev/hugepages/something)")
+ , lock_memory(*this, "lock-memory", {}, "lock all memory (prevents swapping)")
+ , thread_affinity(*this, "thread-affinity", true, "pin threads to their cpus (disable for overprovisioning)")
#ifdef SEASTAR_HAVE_HWLOC
- ("num-io-queues", bpo::value<unsigned>(), "Number of IO queues. Each IO unit will be responsible for a fraction of the IO requests. Defaults to the number of threads")
- ("max-io-requests", bpo::value<unsigned>(), "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of IO queues")
+ , num_io_queues(*this, "num-io-queues", {}, "Number of IO queues. Each IO unit will be responsible for a fraction of the IO requests. Defaults to the number of threads")
+ , num_io_groups(*this, "num-io-groups", {}, "Number of IO groups. Each IO group will be responsible for a fraction of the IO requests. Defaults to the number of NUMA nodes")
+ , max_io_requests(*this, "max-io-requests", {}, "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of IO queues")
#else
- ("max-io-requests", bpo::value<unsigned>(), "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of processors")
+ , num_io_queues(*this, "num-io-queues", program_options::unused{})
+ , num_io_groups(*this, "num-io-groups", program_options::unused{})
+ , max_io_requests(*this, "max-io-requests", {}, "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of processors")
#endif
- ("io-properties-file", bpo::value<std::string>(), "path to a YAML file describing the characteristics of the I/O Subsystem")
- ("io-properties", bpo::value<std::string>(), "a YAML string describing the characteristics of the I/O Subsystem")
- ("mbind", bpo::value<bool>()->default_value(true), "enable mbind")
+ , io_properties_file(*this, "io-properties-file", {}, "path to a YAML file describing the characteristics of the I/O Subsystem")
+ , io_properties(*this, "io-properties", {}, "a YAML string describing the characteristics of the I/O Subsystem")
+ , mbind(*this, "mbind", true, "enable mbind")
#ifndef SEASTAR_NO_EXCEPTION_HACK
- ("enable-glibc-exception-scaling-workaround", bpo::value<bool>()->default_value(true), "enable workaround for glibc/gcc c++ exception scalablity problem")
+ , enable_glibc_exception_scaling_workaround(*this, "enable-glibc-exception-scaling-workaround", true, "enable workaround for glibc/gcc c++ exception scalablity problem")
+#else
+ , enable_glibc_exception_scaling_workaround(*this, program_options::unused{})
#endif
#ifdef SEASTAR_HAVE_HWLOC
- ("allow-cpus-in-remote-numa-nodes", bpo::value<bool>()->default_value(true), "if some CPUs are found not to have any local NUMA nodes, allow assigning them to remote ones")
+ , allow_cpus_in_remote_numa_nodes(*this, "allow-cpus-in-remote-numa-nodes", true, "if some CPUs are found not to have any local NUMA nodes, allow assigning them to remote ones")
+#else
+ , allow_cpus_in_remote_numa_nodes(*this, "allow-cpus-in-remote-numa-nodes", program_options::unused{})
#endif
- ;
- return opts;
+{
}
thread_local scollectd::impl scollectd_impl;
thread_local std::unique_ptr<reactor, reactor_deleter> reactor_holder;
-std::vector<posix_thread> smp::_threads;
-std::vector<std::function<void ()>> smp::_thread_loops;
-std::optional<boost::barrier> smp::_all_event_loops_done;
-std::vector<reactor*> smp::_reactors;
-std::unique_ptr<smp_message_queue*[], smp::qs_deleter> smp::_qs;
-std::thread::id smp::_tmain;
-unsigned smp::count = 1;
-bool smp::_using_dpdk;
+thread_local smp_message_queue** smp::_qs;
+thread_local std::thread::id smp::_tmain;
+unsigned smp::count = 0;
void smp::start_all_queues()
{
_qs[c][this_shard_id()].start(c);
}
}
- alien::smp::_qs[this_shard_id()].start();
+ _alien._qs[this_shard_id()].start();
}
#ifdef SEASTAR_HAVE_DPDK
assert(r == 0);
local_engine = reinterpret_cast<reactor*>(buf);
*internal::this_shard_id_ptr() = id;
- new (buf) reactor(id, std::move(rbs), cfg);
+ new (buf) reactor(this->shared_from_this(), _alien, id, std::move(rbs), cfg);
reactor_holder.reset(local_engine);
}
-void smp::cleanup() {
+void smp::cleanup() noexcept {
smp::_threads = std::vector<posix_thread>();
_thread_loops.clear();
}
_qs[i][cpuid].stop();
}
}
- if (alien::smp::_qs) {
- alien::smp::_qs[cpuid].stop();
+ if (_alien._qs) {
+ _alien._qs[cpuid].stop();
}
}
class disk_config_params {
private:
- unsigned _num_io_queues = 0;
+ unsigned _num_io_groups = 0;
std::optional<unsigned> _capacity;
std::unordered_map<dev_t, mountpoint_params> _mountpoints;
std::chrono::duration<double> _latency_goal;
public:
- uint64_t per_io_queue(uint64_t qty, dev_t devid) const {
- const mountpoint_params& p = _mountpoints.at(devid);
- return std::max(qty / p.num_io_queues, 1ul);
+ uint64_t per_io_group(uint64_t qty, unsigned nr_groups) const noexcept {
+ return std::max(qty / nr_groups, 1ul);
}
- unsigned num_io_queues(dev_t devid) const {
- const mountpoint_params& p = _mountpoints.at(devid);
- return p.num_io_queues;
- }
+ unsigned num_io_groups() const noexcept { return _num_io_groups; }
std::chrono::duration<double> latency_goal() const {
return _latency_goal;
}
- void parse_config(boost::program_options::variables_map& configuration) {
+ void parse_config(const smp_options& smp_opts, const reactor_options& reactor_opts) {
seastar_logger.debug("smp::count: {}", smp::count);
- _latency_goal = std::chrono::duration_cast<std::chrono::duration<double>>(configuration["task-quota-ms"].as<double>() * 1.5 * 1ms);
+ _latency_goal = std::chrono::duration_cast<std::chrono::duration<double>>(reactor_opts.task_quota_ms.get_value() * 1.5 * 1ms);
seastar_logger.debug("latency_goal: {}", latency_goal().count());
- if (configuration.count("max-io-requests")) {
- _capacity = configuration["max-io-requests"].as<unsigned>();
+ if (smp_opts.max_io_requests) {
+ seastar_logger.warn("the --max-io-requests option is deprecated, switch to io properties file instead");
+ _capacity = smp_opts.max_io_requests.get_value();
}
- if (configuration.count("num-io-queues")) {
- _num_io_queues = configuration["num-io-queues"].as<unsigned>();
- if (!_num_io_queues) {
- throw std::runtime_error("num-io-queues must be greater than zero");
+ if (smp_opts.num_io_groups) {
+ _num_io_groups = smp_opts.num_io_groups.get_value();
+ if (!_num_io_groups) {
+ throw std::runtime_error("num-io-groups must be greater than zero");
}
+ } else if (smp_opts.num_io_queues) {
+ seastar_logger.warn("the --num-io-queues option is deprecated, switch to --num-io-groups instead");
}
- if (configuration.count("io-properties-file") && configuration.count("io-properties")) {
+ if (smp_opts.io_properties_file && smp_opts.io_properties) {
throw std::runtime_error("Both io-properties and io-properties-file specified. Don't know which to trust!");
}
std::optional<YAML::Node> doc;
- if (configuration.count("io-properties-file")) {
- doc = YAML::LoadFile(configuration["io-properties-file"].as<std::string>());
- } else if (configuration.count("io-properties")) {
- doc = YAML::Load(configuration["io-properties"].as<std::string>());
+ if (smp_opts.io_properties_file) {
+ doc = YAML::LoadFile(smp_opts.io_properties_file.get_value());
+ } else if (smp_opts.io_properties) {
+ doc = YAML::Load(smp_opts.io_properties.get_value());
}
if (doc) {
- static constexpr unsigned task_quotas_in_default_latency_goal = 3;
- unsigned auto_num_io_queues = smp::count;
-
for (auto&& section : *doc) {
auto sec_name = section.first.as<std::string>();
if (sec_name != "disks") {
throw std::runtime_error(fmt::format("R/W bytes and req rates must not be zero"));
}
- // Ideally we wouldn't have I/O Queues and would dispatch from every shard (https://github.com/scylladb/seastar/issues/485)
- // While we don't do that, we'll just be conservative and try to recommend values of I/O Queues that are close to what we
- // suggested before the I/O Scheduler rework. The I/O Scheduler has traditionally tried to make sure that each queue would have
- // at least 4 requests in depth, and all its requests were 4kB in size. Therefore, try to arrange the I/O Queues so that we would
- // end up in the same situation here (that's where the 4 comes from).
- //
- // For the bandwidth limit, we want that to be 4 * 4096, so each I/O Queue has the same bandwidth as before.
- if (!_num_io_queues) {
- unsigned dev_io_queues = smp::count;
- dev_io_queues = std::min(dev_io_queues, unsigned((task_quotas_in_default_latency_goal * d.write_req_rate * latency_goal().count()) / 4));
- dev_io_queues = std::min(dev_io_queues, unsigned((task_quotas_in_default_latency_goal * d.write_bytes_rate * latency_goal().count()) / (4 * 4096)));
- dev_io_queues = std::max(dev_io_queues, 1u);
- seastar_logger.debug("dev_io_queues: {}", dev_io_queues);
- d.num_io_queues = dev_io_queues;
- auto_num_io_queues = std::min(auto_num_io_queues, dev_io_queues);
- } else {
- d.num_io_queues = _num_io_queues;
- }
-
seastar_logger.debug("dev_id: {} mountpoint: {}", buf.st_dev, d.mountpoint);
_mountpoints.emplace(buf.st_dev, d);
}
}
- if (!_num_io_queues) {
- _num_io_queues = auto_num_io_queues;
- }
- } else if (!_num_io_queues) {
- _num_io_queues = smp::count;
}
// Placeholder for unconfigured disks.
mountpoint_params d = {};
- d.num_io_queues = _num_io_queues;
- seastar_logger.debug("num_io_queues: {}", d.num_io_queues);
_mountpoints.emplace(0, d);
}
- struct io_queue::config generate_config(dev_t devid) const {
+ struct io_queue::config generate_config(dev_t devid, unsigned nr_groups) const {
seastar_logger.debug("generate_config dev_id: {}", devid);
const mountpoint_params& p = _mountpoints.at(devid);
struct io_queue::config cfg;
- uint64_t max_bandwidth = std::max(p.read_bytes_rate, p.write_bytes_rate);
- uint64_t max_iops = std::max(p.read_req_rate, p.write_req_rate);
cfg.devid = devid;
- cfg.disk_bytes_write_to_read_multiplier = io_queue::read_request_base_count;
- cfg.disk_req_write_to_read_multiplier = io_queue::read_request_base_count;
if (!_capacity) {
- if (max_bandwidth != std::numeric_limits<uint64_t>::max()) {
+ if (p.read_bytes_rate != std::numeric_limits<uint64_t>::max()) {
+ cfg.max_bytes_count = io_queue::read_request_base_count * per_io_group(p.read_bytes_rate * latency_goal().count(), nr_groups);
cfg.disk_bytes_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_bytes_rate) / p.write_bytes_rate;
- cfg.max_bytes_count = io_queue::read_request_base_count * per_io_queue(max_bandwidth * latency_goal().count(), devid);
+ cfg.disk_us_per_byte = 1000000. / p.read_bytes_rate;
}
- if (max_iops != std::numeric_limits<uint64_t>::max()) {
- cfg.max_req_count = io_queue::read_request_base_count * per_io_queue(max_iops * latency_goal().count(), devid);
+ if (p.read_req_rate != std::numeric_limits<uint64_t>::max()) {
+ cfg.max_req_count = io_queue::read_request_base_count * per_io_group(p.read_req_rate * latency_goal().count(), nr_groups);
cfg.disk_req_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_req_rate) / p.write_req_rate;
+ cfg.disk_us_per_request = 1000000. / p.read_req_rate;
+ }
+ if (p.read_saturation_length != std::numeric_limits<uint64_t>::max()) {
+ cfg.disk_read_saturation_length = p.read_saturation_length;
+ }
+ if (p.write_saturation_length != std::numeric_limits<uint64_t>::max()) {
+ cfg.disk_write_saturation_length = p.write_saturation_length;
}
cfg.mountpoint = p.mountpoint;
+ cfg.duplex = p.duplex;
} else {
// For backwards compatibility
cfg.capacity = *_capacity;
// Legacy configuration when only concurrency is specified.
- cfg.max_req_count = io_queue::read_request_base_count * std::min(*_capacity, reactor::max_aio_per_queue);
+ unsigned max_req_count = std::min(*_capacity, reactor::max_aio_per_queue);
+ cfg.max_req_count = io_queue::read_request_base_count * max_req_count;
// specify size in terms of 16kB IOPS.
- cfg.max_bytes_count = io_queue::read_request_base_count * (cfg.max_req_count << 14);
+ static_assert(reactor::max_aio_per_queue << 14 <= std::numeric_limits<decltype(cfg.max_bytes_count)>::max() / io_queue::read_request_base_count);
+ cfg.max_bytes_count = io_queue::read_request_base_count * (max_req_count << 14);
}
return cfg;
}
}
};
-void smp::register_network_stacks() {
- register_posix_stack();
- register_native_stack();
+unsigned smp::adjust_max_networking_aio_io_control_blocks(unsigned network_iocbs)
+{
+ static unsigned constexpr storage_iocbs = reactor::max_aio;
+ static unsigned constexpr preempt_iocbs = 2;
+
+ auto aio_max_nr = read_first_line_as<unsigned>("/proc/sys/fs/aio-max-nr");
+ auto aio_nr = read_first_line_as<unsigned>("/proc/sys/fs/aio-nr");
+ auto available_aio = aio_max_nr - aio_nr;
+ auto requested_aio_network = network_iocbs * smp::count;
+ auto requested_aio_other = (storage_iocbs + preempt_iocbs) * smp::count;
+ auto requested_aio = requested_aio_network + requested_aio_other;
+ auto network_iocbs_old = network_iocbs;
+
+ if (available_aio < requested_aio) {
+ seastar_logger.warn("Requested AIO slots too large, please increase request capacity in /proc/sys/fs/aio-max-nr. available:{} requested:{}", available_aio, requested_aio);
+ if (available_aio >= requested_aio_other + smp::count) { // at least one queue for each shard
+ network_iocbs = (available_aio - requested_aio_other) / smp::count;
+ seastar_logger.warn("max-networking-io-control-blocks adjusted from {} to {}, since AIO slots are unavailable", network_iocbs_old, network_iocbs);
+ } else {
+ throw std::runtime_error("Could not setup Async I/O: Not enough request capacity in /proc/sys/fs/aio-max-nr. Try increasing that number or reducing the amount of logical CPUs available for your application");
+ }
+ }
+
+ return network_iocbs;
}
-void smp::configure(boost::program_options::variables_map configuration, reactor_config reactor_cfg)
+void smp::configure(const smp_options& smp_opts, const reactor_options& reactor_opts)
{
#ifndef SEASTAR_NO_EXCEPTION_HACK
- if (configuration["enable-glibc-exception-scaling-workaround"].as<bool>()) {
+ if (smp_opts.enable_glibc_exception_scaling_workaround.get_value()) {
init_phdr_cache();
}
#endif
SIGALRM, SIGCONT, SIGSTOP, SIGTSTP, SIGTTIN, SIGTTOU}) {
sigdelset(&sigs, sig);
}
- if (!reactor_cfg.auto_handle_sigint_sigterm) {
+ if (!reactor_opts._auto_handle_sigint_sigterm) {
sigdelset(&sigs, SIGINT);
sigdelset(&sigs, SIGTERM);
}
install_oneshot_signal_handler<SIGABRT, sigabrt_action>();
#ifdef SEASTAR_HAVE_DPDK
- _using_dpdk = configuration.count("dpdk-pmd");
+ const auto* native_stack = dynamic_cast<const net::native_stack_options*>(reactor_opts.network_stack.get_selected_candidate_opts());
+ _using_dpdk = native_stack && native_stack->dpdk_pmd;
#endif
- auto thread_affinity = configuration["thread-affinity"].as<bool>();
- if (configuration.count("overprovisioned")
- && configuration["thread-affinity"].defaulted()) {
+ auto thread_affinity = smp_opts.thread_affinity.get_value();
+ if (reactor_opts.overprovisioned
+ && smp_opts.thread_affinity.defaulted()) {
thread_affinity = false;
}
if (!thread_affinity && _using_dpdk) {
fmt::print("warning: --thread-affinity 0 ignored in dpdk mode\n");
}
- auto mbind = configuration["mbind"].as<bool>();
+ auto mbind = smp_opts.mbind.get_value();
if (!thread_affinity) {
mbind = false;
}
+ resource::configuration rc;
+
smp::count = 1;
smp::_tmain = std::this_thread::get_id();
- auto nr_cpus = resource::nr_processing_units();
+ auto nr_cpus = resource::nr_processing_units(rc);
resource::cpuset cpu_set;
auto cgroup_cpu_set = cgroup::cpu_set();
std::copy(boost::counting_iterator<unsigned>(0), boost::counting_iterator<unsigned>(nr_cpus),
std::inserter(cpu_set, cpu_set.end()));
- if (configuration.count("cpuset")) {
- cpu_set = configuration["cpuset"].as<cpuset_bpo_wrapper>().value;
+ if (smp_opts.cpuset) {
+ cpu_set = smp_opts.cpuset.get_value();
if (cgroup_cpu_set && *cgroup_cpu_set != cpu_set) {
// CPUs that are not available are those pinned by
// --cpuset but not by cgroups, if mounted.
cpu_set = *cgroup_cpu_set;
}
- if (configuration.count("smp")) {
- nr_cpus = configuration["smp"].as<unsigned>();
+ if (smp_opts.smp) {
+ nr_cpus = smp_opts.smp.get_value();
} else {
nr_cpus = cpu_set.size();
}
smp::count = nr_cpus;
- _reactors.resize(nr_cpus);
- resource::configuration rc;
- if (configuration.count("memory")) {
- rc.total_memory = parse_memory_size(configuration["memory"].as<std::string>());
+ std::vector<reactor*> reactors(nr_cpus);
+ if (smp_opts.memory) {
+ rc.total_memory = parse_memory_size(smp_opts.memory.get_value());
#ifdef SEASTAR_HAVE_DPDK
- if (configuration.count("hugepages") &&
- !configuration["network-stack"].as<std::string>().compare("native") &&
+ if (smp_opts.hugepages &&
+ !reactor_opts.network_stack.get_selected_candidate_name().compare("native") &&
_using_dpdk) {
size_t dpdk_memory = dpdk::eal::mem_size(smp::count);
if (dpdk_memory >= rc.total_memory) {
std::cerr<<"Can't run with the given amount of memory: ";
- std::cerr<<configuration["memory"].as<std::string>();
+ std::cerr<<smp_opts.memory.get_value();
std::cerr<<". Consider giving more."<<std::endl;
exit(1);
}
}
#endif
}
- if (configuration.count("reserve-memory")) {
- rc.reserve_memory = parse_memory_size(configuration["reserve-memory"].as<std::string>());
+ if (smp_opts.reserve_memory) {
+ rc.reserve_memory = parse_memory_size(smp_opts.reserve_memory.get_value());
}
std::optional<std::string> hugepages_path;
- if (configuration.count("hugepages")) {
- hugepages_path = configuration["hugepages"].as<std::string>();
+ if (smp_opts.hugepages) {
+ hugepages_path = smp_opts.hugepages.get_value();
}
auto mlock = false;
- if (configuration.count("lock-memory")) {
- mlock = configuration["lock-memory"].as<bool>();
+ if (smp_opts.lock_memory) {
+ mlock = smp_opts.lock_memory.get_value();
}
if (mlock) {
auto extra_flags = 0;
rc.cpu_set = std::move(cpu_set);
disk_config_params disk_config;
- disk_config.parse_config(configuration);
+ disk_config.parse_config(smp_opts, reactor_opts);
for (auto& id : disk_config.device_ids()) {
- rc.num_io_queues.emplace(id, disk_config.num_io_queues(id));
+ rc.devices.push_back(id);
}
+ rc.num_io_groups = disk_config.num_io_groups();
#ifdef SEASTAR_HAVE_HWLOC
- if (configuration["allow-cpus-in-remote-numa-nodes"].as<bool>()) {
+ if (smp_opts.allow_cpus_in_remote_numa_nodes.get_value()) {
rc.assign_orphan_cpus = true;
}
#endif
}
memory::configure(allocations[0].mem, mbind, hugepages_path);
- if (configuration.count("abort-on-seastar-bad-alloc")) {
+ if (reactor_opts.abort_on_seastar_bad_alloc) {
memory::enable_abort_on_allocation_failure();
}
- if (configuration.count("dump-memory-diagnostics-on-alloc-failure-kind")) {
- memory::set_dump_memory_diagnostics_on_alloc_failure_kind(configuration["dump-memory-diagnostics-on-alloc-failure-kind"].as<std::string>());
+ if (reactor_opts.dump_memory_diagnostics_on_alloc_failure_kind) {
+ memory::set_dump_memory_diagnostics_on_alloc_failure_kind(reactor_opts.dump_memory_diagnostics_on_alloc_failure_kind.get_value());
}
- bool heapprof_enabled = configuration.count("heapprof");
+ reactor_config reactor_cfg;
+ reactor_cfg.auto_handle_sigint_sigterm = reactor_opts._auto_handle_sigint_sigterm;
+ reactor_cfg.max_networking_aio_io_control_blocks = adjust_max_networking_aio_io_control_blocks(reactor_opts.max_networking_io_control_blocks.get_value());
+
+#ifdef SEASTAR_HEAPPROF
+ bool heapprof_enabled = reactor_opts.heapprof;
if (heapprof_enabled) {
memory::set_heap_profiling_enabled(heapprof_enabled);
}
+#else
+ bool heapprof_enabled = false;
+#endif
#ifdef SEASTAR_HAVE_DPDK
- if (smp::_using_dpdk) {
+ if (_using_dpdk) {
dpdk::eal::cpuset cpus;
for (auto&& a : allocations) {
cpus[a.cpu_id] = true;
}
- dpdk::eal::init(cpus, configuration);
+ dpdk::eal::init(cpus, reactor_opts._argv0, hugepages_path, native_stack ? bool(native_stack->dpdk_pmd) : false);
}
#endif
// Better to put it into the smp class, but at smp construction time
// correct smp::count is not known.
- static boost::barrier reactors_registered(smp::count);
- static boost::barrier smp_queues_constructed(smp::count);
- static boost::barrier inited(smp::count);
+ boost::barrier reactors_registered(smp::count);
+ boost::barrier smp_queues_constructed(smp::count);
+ // We use shared_ptr since this thread can exit while other threads are still unlocking
+ auto inited = std::make_shared<boost::barrier>(smp::count);
auto ioq_topology = std::move(resources.ioq_topology);
- std::unordered_map<dev_t, std::vector<io_queue*>> all_io_queues;
+ // ATTN: The ioq_topology value is referenced by below lambdas which are
+ // then copied to other shard's threads, so each shard has a copy of the
+ // ioq_topology on stack, but (!) still references and uses the value
+ // from shard-0. This access is race-free because
+ // 1. The .shard_to_group is not modified
+ // 2. The .queues is pre-resize()-d in advance, so the vector itself
+ // doesn't change; existing slots are accessed by owning shards only
+ // without interference
+ // 3. The .groups manipulations are guarded by the .lock lock (but it's
+ // also pre-resize()-d in advance)
+
+ auto alloc_io_queues = [&ioq_topology, &disk_config] (shard_id shard) {
+ for (auto& topo : ioq_topology) {
+ auto& io_info = topo.second;
+ auto group_idx = io_info.shard_to_group[shard];
+ std::shared_ptr<io_group> group;
+
+ {
+ std::lock_guard _(io_info.lock);
+ auto& iog = io_info.groups[group_idx];
+ if (!iog) {
+ struct io_queue::config qcfg = disk_config.generate_config(topo.first, io_info.groups.size());
+ iog = std::make_shared<io_group>(std::move(qcfg));
+ seastar_logger.debug("allocate {} IO group", group_idx);
+ }
+ group = iog;
+ }
- for (auto& id : disk_config.device_ids()) {
- auto io_info = ioq_topology.at(id);
- all_io_queues.emplace(id, io_info.nr_coordinators);
- }
-
- auto alloc_io_queue = [&ioq_topology, &all_io_queues, &disk_config] (unsigned shard, dev_t id) {
- auto io_info = ioq_topology.at(id);
- auto cid = io_info.shard_to_coordinator[shard];
- auto vec_idx = io_info.coordinator_to_idx[cid];
- assert(io_info.coordinator_to_idx_valid[cid]);
- if (shard == cid) {
- struct io_queue::config cfg = disk_config.generate_config(id);
- cfg.coordinator = cid;
- assert(vec_idx < all_io_queues[id].size());
- assert(!all_io_queues[id][vec_idx]);
- all_io_queues[id][vec_idx] = new io_queue(std::move(cfg));
+ io_info.queues[shard] = std::make_unique<io_queue>(std::move(group), engine()._io_sink);
+ seastar_logger.debug("attached {} queue to {} IO group", shard, group_idx);
}
};
- auto assign_io_queue = [&ioq_topology, &all_io_queues] (shard_id shard_id, dev_t dev_id) {
- auto io_info = ioq_topology.at(dev_id);
- auto cid = io_info.shard_to_coordinator[shard_id];
- auto queue_idx = io_info.coordinator_to_idx[cid];
- if (all_io_queues[dev_id][queue_idx]->coordinator() == shard_id) {
- engine().my_io_queues.emplace_back(all_io_queues[dev_id][queue_idx]);
+ auto assign_io_queues = [&ioq_topology] (shard_id shard) {
+ for (auto& topo : ioq_topology) {
+ auto queue = std::move(topo.second.queues[shard]);
+ assert(queue);
+ engine()._io_queues.emplace(topo.first, std::move(queue));
}
- engine()._io_queues.emplace(dev_id, all_io_queues[dev_id][queue_idx]);
};
_all_event_loops_done.emplace(smp::count);
- auto backend_selector = configuration["reactor-backend"].as<reactor_backend_selector>();
+ auto backend_selector = reactor_opts.reactor_backend.get_selected_candidate();
unsigned i;
+ auto smp_tmain = smp::_tmain;
for (i = 1; i < smp::count; i++) {
auto allocation = allocations[i];
- create_thread([configuration, &disk_config, hugepages_path, i, allocation, assign_io_queue, alloc_io_queue, thread_affinity, heapprof_enabled, mbind, backend_selector, reactor_cfg] {
+ create_thread([this, smp_tmain, inited, &reactors_registered, &smp_queues_constructed, &reactor_opts, &reactors, hugepages_path, i, allocation, assign_io_queues, alloc_io_queues, thread_affinity, heapprof_enabled, mbind, backend_selector, reactor_cfg] {
try {
+ // initialize thread_locals that are equal across all reacto threads of this smp instance
+ smp::_tmain = smp_tmain;
auto thread_name = seastar::format("reactor-{}", i);
pthread_setname_np(pthread_self(), thread_name.c_str());
if (thread_affinity) {
throw_pthread_error(r);
init_default_smp_service_group(i);
allocate_reactor(i, backend_selector, reactor_cfg);
- _reactors[i] = &engine();
- for (auto& dev_id : disk_config.device_ids()) {
- alloc_io_queue(i, dev_id);
- }
+ reactors[i] = &engine();
+ alloc_io_queues(i);
reactors_registered.wait();
smp_queues_constructed.wait();
+ // _qs_owner is only initialized here
+ _qs = _qs_owner.get();
start_all_queues();
- for (auto& dev_id : disk_config.device_ids()) {
- assign_io_queue(i, dev_id);
- }
- inited.wait();
- engine().configure(configuration);
- engine().run();
+ assign_io_queues(i);
+ inited->wait();
+ engine().configure(reactor_opts);
+ engine().do_run();
} catch (const std::exception& e) {
seastar_logger.error(e.what());
_exit(1);
_exit(1);
}
- _reactors[0] = &engine();
- for (auto& dev_id : disk_config.device_ids()) {
- alloc_io_queue(0, dev_id);
- }
+ reactors[0] = &engine();
+ alloc_io_queues(0);
#ifdef SEASTAR_HAVE_DPDK
if (_using_dpdk) {
#endif
reactors_registered.wait();
- smp::_qs = decltype(smp::_qs){new smp_message_queue* [smp::count], qs_deleter{}};
+ _qs_owner = decltype(smp::_qs_owner){new smp_message_queue* [smp::count], qs_deleter{}};
+ _qs = _qs_owner.get();
for(unsigned i = 0; i < smp::count; i++) {
- smp::_qs[i] = reinterpret_cast<smp_message_queue*>(operator new[] (sizeof(smp_message_queue) * smp::count));
+ smp::_qs_owner[i] = reinterpret_cast<smp_message_queue*>(operator new[] (sizeof(smp_message_queue) * smp::count));
for (unsigned j = 0; j < smp::count; ++j) {
- new (&smp::_qs[i][j]) smp_message_queue(_reactors[j], _reactors[i]);
+ new (&smp::_qs_owner[i][j]) smp_message_queue(reactors[j], reactors[i]);
}
}
- alien::smp::_qs = alien::smp::create_qs(_reactors);
+ _alien._qs = alien::instance::create_qs(reactors);
smp_queues_constructed.wait();
start_all_queues();
- for (auto& dev_id : disk_config.device_ids()) {
- assign_io_queue(0, dev_id);
- }
- inited.wait();
+ assign_io_queues(0);
+ inited->wait();
- engine().configure(configuration);
+ engine().configure(reactor_opts);
// The raw `new` is necessary because of the private constructor of `lowres_clock_impl`.
engine()._lowres_clock_impl = std::unique_ptr<lowres_clock_impl>(new lowres_clock_impl);
}
return false;
}
-internal::preemption_monitor bootstrap_preemption_monitor{};
-__thread const internal::preemption_monitor* g_need_preempt = &bootstrap_preemption_monitor;
-
__thread reactor* local_engine;
void report_exception(std::string_view message, std::exception_ptr eptr) noexcept {
engine()._flush_batching.emplace_back(os);
}
-reactor::sched_clock::duration reactor::total_idle_time() {
+steady_clock_type::duration reactor::total_idle_time() {
return _total_idle;
}
-reactor::sched_clock::duration reactor::total_busy_time() {
- return sched_clock::now() - _start_time - _total_idle;
+steady_clock_type::duration reactor::total_busy_time() {
+ return now() - _start_time - _total_idle;
}
std::chrono::nanoseconds reactor::total_steal_time() {
// steal time but we have no ways to account it.
//
// But what we have here should be good enough and at least has a well defined meaning.
- return std::chrono::duration_cast<std::chrono::nanoseconds>(sched_clock::now() - _start_time - _total_sleep) -
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(now() - _start_time - _total_sleep) -
std::chrono::duration_cast<std::chrono::nanoseconds>(thread_cputime_clock::now().time_since_epoch());
}
future<>
rename_priority_class(io_priority_class pc, sstring new_name) {
- return reactor::rename_priority_class(pc, new_name);
+ return pc.rename(std::move(new_name));
}
future<>
namespace internal {
inline
-std::chrono::steady_clock::duration
+sched_clock::duration
timeval_to_duration(::timeval tv) {
return std::chrono::seconds(tv.tv_sec) + std::chrono::microseconds(tv.tv_usec);
}
class reactor_stall_sampler : public reactor::pollfn {
- std::chrono::steady_clock::time_point _run_start;
+ sched_clock::time_point _run_start;
::rusage _run_start_rusage;
uint64_t _kernel_stalls = 0;
- std::chrono::steady_clock::duration _nonsleep_cpu_time = {};
- std::chrono::steady_clock::duration _nonsleep_wall_time = {};
+ sched_clock::duration _nonsleep_cpu_time = {};
+ sched_clock::duration _nonsleep_wall_time = {};
private:
static ::rusage get_rusage() {
struct ::rusage ru;
::getrusage(RUSAGE_THREAD, &ru);
return ru;
}
- static std::chrono::steady_clock::duration cpu_time(const ::rusage& ru) {
+ static sched_clock::duration cpu_time(const ::rusage& ru) {
return timeval_to_duration(ru.ru_stime) + timeval_to_duration(ru.ru_utime);
}
void mark_run_start() {
- _run_start = std::chrono::steady_clock::now();
+ _run_start = reactor::now();
_run_start_rusage = get_rusage();
}
void mark_run_end() {
auto start_nvcsw = _run_start_rusage.ru_nvcsw;
auto start_cpu_time = cpu_time(_run_start_rusage);
auto start_time = _run_start;
- _run_start = std::chrono::steady_clock::now();
+ _run_start = reactor::now();
_run_start_rusage = get_rusage();
_kernel_stalls += _run_start_rusage.ru_nvcsw - start_nvcsw;
_nonsleep_cpu_time += cpu_time(_run_start_rusage) - start_cpu_time;
}
std::ostream& operator<<(std::ostream& os, const stall_report& sr) {
- auto to_ms = [] (std::chrono::steady_clock::duration d) -> float {
+ auto to_ms = [] (sched_clock::duration d) -> float {
return std::chrono::duration<float>(d) / 1ms;
};
return os << format("{} stalls, {} ms stall time, {} ms run time", sr.kernel_stalls, to_ms(sr.stall_time), to_ms(sr.run_wall_time));
}
+size_t scheduling_group_count() {
+ auto b = s_used_scheduling_group_ids_bitmap.load(std::memory_order_relaxed);
+ return __builtin_popcountl(b);
+}
+
}
#ifdef SEASTAR_TASK_BACKTRACE