#include <boost/thread/barrier.hpp>
#include <boost/container/static_vector.hpp>
#include <set>
+#include <seastar/core/reactor_config.hh>
#include <seastar/core/linux-aio.hh>
#include <seastar/util/eclipse.hh>
#include <seastar/core/future.hh>
#include <seastar/core/manual_clock.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/scheduling.hh>
+#include <seastar/core/smp.hh>
+#include <seastar/core/internal/io_request.hh>
#include "internal/pollable_fd.hh"
+#include "internal/poll.hh"
#ifdef HAVE_OSV
#include <osv/sched.hh>
namespace seastar {
void register_network_stack(sstring name, boost::program_options::options_description opts,
- std::function<future<std::unique_ptr<network_stack>>(boost::program_options::variables_map opts)> create,
+ noncopyable_function<future<std::unique_ptr<network_stack>>(boost::program_options::variables_map opts)> create,
bool make_default = false);
class thread_pool;
class smp;
-class smp_message_queue {
- static constexpr size_t queue_length = 128;
- static constexpr size_t batch_size = 16;
- static constexpr size_t prefetch_cnt = 2;
- struct work_item;
- struct lf_queue_remote {
- reactor* remote;
- };
- using lf_queue_base = boost::lockfree::spsc_queue<work_item*,
- boost::lockfree::capacity<queue_length>>;
- // use inheritence to control placement order
- struct lf_queue : lf_queue_remote, lf_queue_base {
- lf_queue(reactor* remote) : lf_queue_remote{remote} {}
- void maybe_wakeup();
- };
- lf_queue _pending;
- lf_queue _completed;
- struct alignas(seastar::cache_line_size) {
- size_t _sent = 0;
- size_t _compl = 0;
- size_t _last_snt_batch = 0;
- size_t _last_cmpl_batch = 0;
- size_t _current_queue_length = 0;
- };
- // keep this between two structures with statistics
- // this makes sure that they have at least one cache line
- // between them, so hw prefetcher will not accidentally prefetch
- // cache line used by another cpu.
- metrics::metric_groups _metrics;
- struct alignas(seastar::cache_line_size) {
- size_t _received = 0;
- size_t _last_rcv_batch = 0;
- };
- struct work_item {
- scheduling_group sg = current_scheduling_group();
- virtual ~work_item() {}
- virtual void process() = 0;
- virtual void complete() = 0;
- };
- template <typename Func>
- struct async_work_item : work_item {
- smp_message_queue& _queue;
- Func _func;
- using futurator = futurize<std::result_of_t<Func()>>;
- using future_type = typename futurator::type;
- using value_type = typename future_type::value_type;
- compat::optional<value_type> _result;
- std::exception_ptr _ex; // if !_result
- typename futurator::promise_type _promise; // used on local side
- async_work_item(smp_message_queue& queue, Func&& func) : _queue(queue), _func(std::move(func)) {}
- virtual void process() override {
- try {
- with_scheduling_group(this->sg, [this] {
- futurator::apply(this->_func).then_wrapped([this] (auto f) {
- if (f.failed()) {
- _ex = f.get_exception();
- } else {
- _result = f.get();
- }
- _queue.respond(this);
- });
- });
- } catch (...) {
- _ex = std::current_exception();
- _queue.respond(this);
- }
- }
- virtual void complete() override {
- if (_result) {
- _promise.set_value(std::move(*_result));
- } else {
- // FIXME: _ex was allocated on another cpu
- _promise.set_exception(std::move(_ex));
- }
- }
- future_type get_future() { return _promise.get_future(); }
- };
- union tx_side {
- tx_side() {}
- ~tx_side() {}
- void init() { new (&a) aa; }
- struct aa {
- std::deque<work_item*> pending_fifo;
- } a;
- } _tx;
- std::vector<work_item*> _completed_fifo;
-public:
- smp_message_queue(reactor* from, reactor* to);
- ~smp_message_queue();
- template <typename Func>
- futurize_t<std::result_of_t<Func()>> submit(Func&& func) {
- auto wi = std::make_unique<async_work_item<Func>>(*this, std::forward<Func>(func));
- auto fut = wi->get_future();
- submit_item(std::move(wi));
- return fut;
- }
- void start(unsigned cpuid);
- template<size_t PrefetchCnt, typename Func>
- size_t process_queue(lf_queue& q, Func process);
- size_t process_incoming();
- size_t process_completions();
- void stop();
-private:
- void work();
- void submit_item(std::unique_ptr<work_item> wi);
- void respond(work_item* wi);
- void move_pending();
- void flush_request_batch();
- void flush_response_batch();
- bool has_unflushed_responses() const;
- bool pure_poll_rx() const;
- bool pure_poll_tx() const;
-
- friend class smp;
-};
-
class reactor_backend_selector;
-enum class open_flags {
- rw = O_RDWR,
- ro = O_RDONLY,
- wo = O_WRONLY,
- create = O_CREAT,
- truncate = O_TRUNC,
- exclusive = O_EXCL,
-};
-
-inline open_flags operator|(open_flags a, open_flags b) {
- return open_flags(std::underlying_type_t<open_flags>(a) | std::underlying_type_t<open_flags>(b));
-}
-
-inline open_flags operator&(open_flags a, open_flags b) {
- return open_flags(std::underlying_type_t<open_flags>(a) & std::underlying_type_t<open_flags>(b));
-}
-
class reactor_backend;
namespace internal {
}
-class io_desc;
+class kernel_completion;
+class io_queue;
class disk_config_params;
class reactor {
using sched_clock = std::chrono::steady_clock;
private:
- struct pollfn {
- virtual ~pollfn() {}
- // Returns true if work was done (false = idle)
- virtual bool poll() = 0;
- // Checks if work needs to be done, but without actually doing any
- // returns true if works needs to be done (false = idle)
- virtual bool pure_poll() = 0;
- // Tries to enter interrupt mode.
- //
- // If it returns true, then events from this poller will wake
- // a sleeping idle loop, and exit_interrupt_mode() must be called
- // to return to normal polling.
- //
- // If it returns false, the sleeping idle loop may not be entered.
- virtual bool try_enter_interrupt_mode() { return false; }
- virtual void exit_interrupt_mode() {}
- };
struct task_queue;
using task_queue_list = circular_buffer_fixed_capacity<task_queue*, max_scheduling_groups()>;
+ using pollfn = seastar::pollfn;
- class io_pollfn;
class signal_pollfn;
- class aio_batch_submit_pollfn;
class batch_flush_pollfn;
class smp_pollfn;
class drain_cross_cpu_freelist_pollfn;
class lowres_timer_pollfn;
class manual_timer_pollfn;
class epoll_pollfn;
+ class reap_kernel_completions_pollfn;
+ class kernel_submit_work_pollfn;
+ class io_queue_submission_pollfn;
class syscall_pollfn;
class execution_stage_pollfn;
- friend io_pollfn;
friend signal_pollfn;
- friend aio_batch_submit_pollfn;
friend batch_flush_pollfn;
friend smp_pollfn;
friend drain_cross_cpu_freelist_pollfn;
friend lowres_timer_pollfn;
friend class manual_clock;
friend class epoll_pollfn;
+ friend class reap_kernel_completions_pollfn;
+ friend class kernel_submit_work_pollfn;
+ friend class io_queue_submission_pollfn;
friend class syscall_pollfn;
friend class execution_stage_pollfn;
friend class file_data_source_impl; // for fstream statistics
friend class internal::reactor_stall_sampler;
+ friend class preempt_io_context;
+ friend struct hrtimer_aio_completion;
+ friend struct task_quota_aio_completion;
friend class reactor_backend_epoll;
friend class reactor_backend_aio;
+ friend class reactor_backend_selector;
+ friend class aio_storage_context;
public:
class poller {
std::unique_ptr<pollfn> _pollfn;
class registration_task;
class deregistration_task;
- registration_task* _registration_task;
+ registration_task* _registration_task = nullptr;
public:
template <typename Func> // signature: bool ()
static poller simple(Func&& poll) {
~poller();
poller(poller&& x);
poller& operator=(poller&& x);
- void do_register();
+ void do_register() noexcept;
friend class reactor;
};
enum class idle_cpu_handler_result {
no_more_work,
interrupted_by_higher_priority_task
};
- using work_waiting_on_reactor = const std::function<bool()>&;
- using idle_cpu_handler = std::function<idle_cpu_handler_result(work_waiting_on_reactor)>;
+ using work_waiting_on_reactor = const noncopyable_function<bool()>&;
+ using idle_cpu_handler = noncopyable_function<idle_cpu_handler_result(work_waiting_on_reactor)>;
struct io_stats {
uint64_t aio_reads = 0;
uint64_t fstream_read_ahead_discarded_bytes = 0;
};
private:
+ reactor_config _cfg;
file_desc _notify_eventfd;
file_desc _task_quota_timer;
#ifdef HAVE_OSV
static constexpr unsigned max_queues = 8;
static constexpr unsigned max_aio = max_aio_per_queue * max_queues;
friend disk_config_params;
+
// Not all reactors have IO queues. If the number of IO queues is less than the number of shards,
// some reactors will talk to foreign io_queues. If this reactor holds a valid IO queue, it will
// be stored here.
std::unordered_map<dev_t, io_queue*> _io_queues;
friend io_queue;
- std::vector<std::function<future<> ()>> _exit_funcs;
+ std::vector<noncopyable_function<future<> ()>> _exit_funcs;
unsigned _id = 0;
bool _stopping = false;
bool _stopped = false;
+ bool _finished_running_tasks = false;
condition_variable _stop_requested;
bool _handle_sigint = true;
- promise<std::unique_ptr<network_stack>> _network_stack_ready_promise;
+ compat::optional<future<std::unique_ptr<network_stack>>> _network_stack_ready;
int _return = 0;
promise<> _start_promise;
semaphore _cpu_started;
internal::preemption_monitor _preemption_monitor{};
- std::atomic<uint64_t> _tasks_processed = { 0 };
- std::atomic<uint64_t> _polls = { 0 };
+ uint64_t _global_tasks_processed = 0;
+ uint64_t _polls = 0;
std::unique_ptr<internal::cpu_stall_detector> _cpu_stall_detector;
unsigned _max_task_backlog = 1000;
timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
timer_set<timer<manual_clock>, &timer<manual_clock>::_link> _manual_timers;
timer_set<timer<manual_clock>, &timer<manual_clock>::_link>::timer_list_t _expired_manual_timers;
- internal::linux_abi::aio_context_t _io_context;
- alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
- std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
- boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio;
- boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry;
io_stats _io_stats;
uint64_t _fsyncs = 0;
uint64_t _cxx_exceptions = 0;
+ uint64_t _abandoned_failed_futures = 0;
struct task_queue {
explicit task_queue(unsigned id, sstring name, float shares);
int64_t _vruntime = 0;
uint8_t _id;
sched_clock::duration _runtime = {};
uint64_t _tasks_processed = 0;
- circular_buffer<std::unique_ptr<task>> _q;
+ circular_buffer<task*> _q;
sstring _name;
+ /**
+ * This array holds pointers to the scheduling group specific
+ * data. The pointer is not use as is but is cast to a reference
+ * to the appropriate type that is actually pointed to.
+ */
+ std::vector<void*> _scheduling_group_specific_vals;
int64_t to_vruntime(sched_clock::duration runtime) const;
void set_shares(float shares);
struct indirect_compare;
sched_clock::duration _time_spent_on_task_quota_violations = {};
seastar::metrics::metric_groups _metrics;
+ void rename(sstring new_name);
+ private:
+ void register_stats();
};
+
+ circular_buffer<internal::io_request> _pending_io;
boost::container::static_vector<std::unique_ptr<task_queue>, max_scheduling_groups()> _task_queues;
+ std::vector<scheduling_group_key_config> _scheduling_group_key_configs;
int64_t _last_vruntime = 0;
task_queue_list _active_task_queues;
task_queue_list _activating_task_queues;
bool _strict_o_direct = true;
bool _force_io_getevents_syscall = false;
bool _bypass_fsync = false;
+ bool _have_aio_fsync = false;
std::atomic<bool> _dying{false};
private:
static std::chrono::nanoseconds calculate_poll_time();
public:
/// Register a user-defined signal handler
- void handle_signal(int signo, std::function<void ()>&& handler);
+ void handle_signal(int signo, noncopyable_function<void ()>&& handler);
private:
class signals {
bool poll_signal();
bool pure_poll_signal() const;
- void handle_signal(int signo, std::function<void ()>&& handler);
- void handle_signal_once(int signo, std::function<void ()>&& handler);
+ void handle_signal(int signo, noncopyable_function<void ()>&& handler);
+ void handle_signal_once(int signo, noncopyable_function<void ()>&& handler);
static void action(int signo, siginfo_t* siginfo, void* ignore);
static void failed_to_handle(int signo);
private:
struct signal_handler {
- signal_handler(int signo, std::function<void ()>&& handler);
- std::function<void ()> _handler;
+ signal_handler(int signo, noncopyable_function<void ()>&& handler);
+ noncopyable_function<void ()> _handler;
};
std::atomic<uint64_t> _pending_signals;
std::unordered_map<int, signal_handler> _signal_handlers;
- friend void reactor::handle_signal(int, std::function<void ()>&&);
+ friend void reactor::handle_signal(int, noncopyable_function<void ()>&&);
};
signals _signals;
void insert_activating_task_queues();
void account_runtime(task_queue& tq, sched_clock::duration runtime);
void account_idle(sched_clock::duration idletime);
- void init_scheduling_group(scheduling_group sg, sstring name, float shares);
- void destroy_scheduling_group(scheduling_group sg);
+ void allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key);
+ future<> init_scheduling_group(scheduling_group sg, sstring name, float shares);
+ future<> init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg);
+ future<> destroy_scheduling_group(scheduling_group sg);
+ [[noreturn]] void no_such_scheduling_group(scheduling_group sg);
+ void* get_scheduling_group_specific_value(scheduling_group sg, scheduling_group_key key) {
+ if (!_task_queues[sg._id]) {
+ no_such_scheduling_group(sg);
+ }
+ return _task_queues[sg._id]->_scheduling_group_specific_vals[key.id()];
+ }
+ void* get_scheduling_group_specific_value(scheduling_group_key key) {
+ return get_scheduling_group_specific_value(*internal::current_scheduling_group_ptr(), key);
+ }
uint64_t tasks_processed() const;
uint64_t min_vruntime() const;
void request_preemption();
+ void start_handling_signal();
void reset_preemption_monitor();
void service_highres_timer();
+
+ future<std::tuple<pollable_fd, socket_address>>
+ do_accept(pollable_fd_state& listen_fd);
+ future<> do_connect(pollable_fd_state& pfd, socket_address& sa);
+
+ future<size_t>
+ do_read_some(pollable_fd_state& fd, void* buffer, size_t size);
+ future<size_t>
+ do_read_some(pollable_fd_state& fd, const std::vector<iovec>& iov);
+
+ future<size_t>
+ do_write_some(pollable_fd_state& fd, const void* buffer, size_t size);
+ future<size_t>
+ do_write_some(pollable_fd_state& fd, net::packet& p);
public:
- static boost::program_options::options_description get_options_description(std::chrono::duration<double> default_task_quota);
- explicit reactor(unsigned id, reactor_backend_selector rbs);
+ static boost::program_options::options_description get_options_description(reactor_config cfg);
+ explicit reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg);
reactor(const reactor&) = delete;
~reactor();
void operator=(const reactor&) = delete;
+ sched_clock::duration uptime() {
+ return sched_clock::now() - _start_time;
+ }
+
io_queue& get_io_queue(dev_t devid = 0) {
auto queue = _io_queues.find(devid);
if (queue == _io_queues.end()) {
/// \param shares the new shares value
/// \return a future that is ready when the share update is applied
future<> update_shares_for_class(io_priority_class pc, uint32_t shares);
+ static future<> rename_priority_class(io_priority_class pc, sstring new_name);
void configure(boost::program_options::variables_map config);
bool posix_reuseport_available() const { return _reuseport; }
- lw_shared_ptr<pollable_fd> make_pollable_fd(socket_address sa, transport proto = transport::TCP);
- future<> posix_connect(lw_shared_ptr<pollable_fd> pfd, socket_address sa, socket_address local);
-
- future<pollable_fd, socket_address> accept(pollable_fd_state& listen_fd);
+ lw_shared_ptr<pollable_fd> make_pollable_fd(socket_address sa, int proto);
- future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t size);
- future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov);
-
- future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t size);
+ future<> posix_connect(lw_shared_ptr<pollable_fd> pfd, socket_address sa, socket_address local);
future<> write_all(pollable_fd_state& fd, const void* buffer, size_t size);
future<file> open_file_dma(sstring name, open_flags flags, file_open_options options = {});
future<file> open_directory(sstring name);
- future<> make_directory(sstring name);
- future<> touch_directory(sstring name);
- future<compat::optional<directory_entry_type>> file_type(sstring name);
+ future<> make_directory(sstring name, file_permissions permissions = file_permissions::default_dir_permissions);
+ future<> touch_directory(sstring name, file_permissions permissions = file_permissions::default_dir_permissions);
+ future<compat::optional<directory_entry_type>> file_type(sstring name, follow_symlink = follow_symlink::yes);
+ future<stat_data> file_stat(sstring pathname, follow_symlink);
future<uint64_t> file_size(sstring pathname);
- future<bool> file_exists(sstring pathname);
+ future<bool> file_accessible(sstring pathname, access_flags flags);
+ future<bool> file_exists(sstring pathname) {
+ return file_accessible(pathname, access_flags::exists);
+ }
future<fs_type> file_system_at(sstring pathname);
future<struct statvfs> statvfs(sstring pathname);
future<> remove_file(sstring pathname);
future<> rename_file(sstring old_pathname, sstring new_pathname);
future<> link_file(sstring oldpath, sstring newpath);
+ future<> chmod(sstring name, file_permissions permissions);
// In the following three methods, prepare_io is not guaranteed to execute in the same processor
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
- template <typename Func>
- void submit_io(io_desc* desc, Func prepare_io);
-
- template <typename Func>
- future<internal::linux_abi::io_event> submit_io_read(io_queue* ioq, const io_priority_class& priority_class, size_t len, Func prepare_io);
- template <typename Func>
- future<internal::linux_abi::io_event> submit_io_write(io_queue* ioq, const io_priority_class& priority_class, size_t len, Func prepare_io);
-
- inline void handle_io_result(const internal::linux_abi::io_event& ev) {
- auto res = long(ev.res);
+ void submit_io(kernel_completion* desc, internal::io_request req);
+ future<size_t> submit_io_read(io_queue* ioq,
+ const io_priority_class& priority_class,
+ size_t len,
+ internal::io_request req);
+ future<size_t> submit_io_write(io_queue* ioq,
+ const io_priority_class& priority_class,
+ size_t len,
+ internal::io_request req);
+
+ inline void handle_io_result(ssize_t res) {
if (res < 0) {
++_io_stats.aio_errors;
throw_kernel_error(res);
return _stop_requested.wait(timeout, [this] { return _stopping; });
}
- void at_exit(std::function<future<> ()> func);
+ void at_exit(noncopyable_function<future<> ()> func);
template <typename Func>
void at_destroy(Func&& func) {
}
#ifdef SEASTAR_SHUFFLE_TASK_QUEUE
- void shuffle(std::unique_ptr<task>&, task_queue&);
+ void shuffle(task*&, task_queue&);
#endif
- void add_task(std::unique_ptr<task>&& t) {
+ void add_task(task* t) noexcept {
auto sg = t->group();
auto* q = _task_queues[sg._id].get();
bool was_empty = q->_q.empty();
activate(*q);
}
}
- void add_urgent_task(std::unique_ptr<task>&& t) {
+ void add_urgent_task(task* t) noexcept {
auto sg = t->group();
auto* q = _task_queues[sg._id].get();
bool was_empty = q->_q.empty();
}
void force_poll();
- void add_high_priority_task(std::unique_ptr<task>&&);
+ void add_high_priority_task(task*) noexcept;
network_stack& net() { return *_network_stack; }
shard_id cpu_id() const { return _id; }
- void start_epoll();
void sleep();
steady_clock_type::duration total_idle_time();
std::chrono::nanoseconds total_steal_time();
const io_stats& get_io_stats() const { return _io_stats; }
+ uint64_t abandoned_failed_futures() const { return _abandoned_failed_futures; }
#ifdef HAVE_OSV
void timer_thread_func();
void set_timer(sched::timer &tmr, s64 t);
void register_metrics();
future<> write_all_part(pollable_fd_state& fd, const void* buffer, size_t size, size_t completed);
- bool process_io();
+ future<> fdatasync(int fd);
void add_timer(timer<steady_clock_type>*);
bool queue_timer(timer<steady_clock_type>*);
friend class alien::message_queue;
friend class pollable_fd;
friend class pollable_fd_state;
+ friend struct pollable_fd_state_deleter;
friend class posix_file_impl;
friend class blockdev_file_impl;
friend class readable_eventfd;
friend class scheduling_group;
friend void add_to_flush_poller(output_stream<char>* os);
friend int ::_Unwind_RaiseException(struct _Unwind_Exception *h);
+ friend void report_failed_future(const std::exception_ptr& eptr) noexcept;
metrics::metric_groups _metric_groups;
friend future<scheduling_group> create_scheduling_group(sstring name, float shares);
friend future<> seastar::destroy_scheduling_group(scheduling_group);
+ friend future<> seastar::rename_scheduling_group(scheduling_group sg, sstring new_name);
+ friend future<scheduling_group_key> scheduling_group_key_create(scheduling_group_key_config cfg);
+
+ template<typename T>
+ friend T& scheduling_group_get_specific(scheduling_group sg, scheduling_group_key key);
+ template<typename T>
+ friend T& scheduling_group_get_specific(scheduling_group_key key);
+ template<typename SpecificValType, typename Mapper, typename Reducer, typename Initial>
+ GCC6_CONCEPT( requires requires(SpecificValType specific_val, Mapper mapper, Reducer reducer, Initial initial) {
+ {reducer(initial, mapper(specific_val))} -> Initial;
+ })
+ friend future<typename function_traits<Reducer>::return_type>
+ map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer, Initial initial_val, scheduling_group_key key);
+ template<typename SpecificValType, typename Reducer, typename Initial>
+ GCC6_CONCEPT( requires requires(SpecificValType specific_val, Reducer reducer, Initial initial) {
+ {reducer(initial, specific_val)} -> Initial;
+ })
+ friend future<typename function_traits<Reducer>::return_type>
+ reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, scheduling_group_key key);
public:
- bool wait_and_process(int timeout = 0, const sigset_t* active_sigmask = nullptr);
future<> readable(pollable_fd_state& fd);
future<> writeable(pollable_fd_state& fd);
future<> readable_or_writeable(pollable_fd_state& fd);
- void forget(pollable_fd_state& fd);
void abort_reader(pollable_fd_state& fd);
void abort_writer(pollable_fd_state& fd);
void enable_timer(steady_clock_type::time_point when);
return local_engine != nullptr;
}
-class smp {
- static std::vector<posix_thread> _threads;
- static std::vector<std::function<void ()>> _thread_loops; // for dpdk
- static compat::optional<boost::barrier> _all_event_loops_done;
- static std::vector<reactor*> _reactors;
- struct qs_deleter {
- void operator()(smp_message_queue** qs) const;
- };
- static std::unique_ptr<smp_message_queue*[], qs_deleter> _qs;
- static std::thread::id _tmain;
- static bool _using_dpdk;
-
- template <typename Func>
- using returns_future = is_future<std::result_of_t<Func()>>;
- template <typename Func>
- using returns_void = std::is_same<std::result_of_t<Func()>, void>;
-public:
- static boost::program_options::options_description get_options_description();
- static void configure(boost::program_options::variables_map vm);
- static void cleanup();
- static void cleanup_cpu();
- static void arrive_at_event_loop_end();
- static void join_all();
- static bool main_thread() { return std::this_thread::get_id() == _tmain; }
-
- /// Runs a function on a remote core.
- ///
- /// \param t designates the core to run the function on (may be a remote
- /// core or the local core).
- /// \param func a callable to run on core \c t. If \c func is a temporary object,
- /// its lifetime will be extended by moving it. If @func is a reference,
- /// the caller must guarantee that it will survive the call.
- /// \return whatever \c func returns, as a future<> (if \c func does not return a future,
- /// submit_to() will wrap it in a future<>).
- template <typename Func>
- static futurize_t<std::result_of_t<Func()>> submit_to(unsigned t, Func&& func) {
- using ret_type = std::result_of_t<Func()>;
- if (t == engine().cpu_id()) {
- try {
- if (!is_future<ret_type>::value) {
- // Non-deferring function, so don't worry about func lifetime
- return futurize<ret_type>::apply(std::forward<Func>(func));
- } else if (std::is_lvalue_reference<Func>::value) {
- // func is an lvalue, so caller worries about its lifetime
- return futurize<ret_type>::apply(func);
- } else {
- // Deferring call on rvalue function, make sure to preserve it across call
- auto w = std::make_unique<std::decay_t<Func>>(std::move(func));
- auto ret = futurize<ret_type>::apply(*w);
- return ret.finally([w = std::move(w)] {});
- }
- } catch (...) {
- // Consistently return a failed future rather than throwing, to simplify callers
- return futurize<std::result_of_t<Func()>>::make_exception_future(std::current_exception());
- }
- } else {
- return _qs[t][engine().cpu_id()].submit(std::forward<Func>(func));
- }
- }
- static bool poll_queues();
- static bool pure_poll_queues();
- static boost::integer_range<unsigned> all_cpus() {
- return boost::irange(0u, count);
- }
- // Invokes func on all shards.
- // The returned future resolves when all async invocations finish.
- // The func may return void or future<>.
- // Each async invocation will work with a separate copy of func.
- template<typename Func>
- static future<> invoke_on_all(Func&& func) {
- static_assert(std::is_same<future<>, typename futurize<std::result_of_t<Func()>>::type>::value, "bad Func signature");
- return parallel_for_each(all_cpus(), [&func] (unsigned id) {
- return smp::submit_to(id, Func(func));
- });
- }
-private:
- static void start_all_queues();
- static void pin(unsigned cpu_id);
- static void allocate_reactor(unsigned id, reactor_backend_selector rbs);
- static void create_thread(std::function<void ()> thread_loop);
-public:
- static unsigned count;
-};
-
-inline
-pollable_fd_state::~pollable_fd_state() {
- engine().forget(*this);
-}
-
inline
size_t iovec_len(const iovec* begin, size_t len)
{
return ret;
}
-template <typename Clock>
-inline
-timer<Clock>::timer(callback_t&& callback) : _callback(std::move(callback)) {
-}
-
-template <typename Clock>
-inline
-timer<Clock>::~timer() {
- if (_queued) {
- engine().del_timer(this);
- }
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::set_callback(callback_t&& callback) {
- _callback = std::move(callback);
+inline int hrtimer_signal() {
+ // We don't want to use SIGALRM, because the boost unit test library
+ // also plays with it.
+ return SIGRTMIN;
}
-template <typename Clock>
-inline
-void timer<Clock>::arm_state(time_point until, compat::optional<duration> period) {
- assert(!_armed);
- _period = period;
- _armed = true;
- _expired = false;
- _expiry = until;
- _queued = true;
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::arm(time_point until, compat::optional<duration> period) {
- arm_state(until, period);
- engine().add_timer(this);
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::rearm(time_point until, compat::optional<duration> period) {
- if (_armed) {
- cancel();
- }
- arm(until, period);
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::arm(duration delta) {
- return arm(Clock::now() + delta);
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::arm_periodic(duration delta) {
- arm(Clock::now() + delta, {delta});
-}
-
-template <typename Clock>
-inline
-void timer<Clock>::readd_periodic() {
- arm_state(Clock::now() + _period.value(), {_period.value()});
- engine().queue_timer(this);
-}
-
-template <typename Clock>
-inline
-bool timer<Clock>::cancel() {
- if (!_armed) {
- return false;
- }
- _armed = false;
- if (_queued) {
- engine().del_timer(this);
- _queued = false;
- }
- return true;
-}
-
-template <typename Clock>
-inline
-typename timer<Clock>::time_point timer<Clock>::get_timeout() {
- return _expiry;
-}
extern logger seastar_logger;