2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2014 Cloudius Systems
22 #define __user /* empty */ // for xfs includes, below
26 #include <sys/syscall.h>
28 #include <sys/statfs.h>
30 #include <sys/resource.h>
31 #include <sys/inotify.h>
33 #include <fmt/ranges.h>
34 #include <seastar/core/task.hh>
35 #include <seastar/core/reactor.hh>
36 #include <seastar/core/memory.hh>
37 #include <seastar/core/posix.hh>
38 #include <seastar/core/sleep.hh>
39 #include <seastar/net/packet.hh>
40 #include <seastar/net/stack.hh>
41 #include <seastar/net/posix-stack.hh>
42 #include <seastar/net/native-stack.hh>
43 #include <seastar/core/resource.hh>
44 #include <seastar/core/print.hh>
45 #include "core/scollectd-impl.hh"
46 #include <seastar/util/conversions.hh>
47 #include <seastar/util/process.hh>
48 #include <seastar/core/loop.hh>
49 #include <seastar/core/when_all.hh>
50 #include <seastar/core/with_scheduling_group.hh>
51 #include <seastar/core/thread.hh>
52 #include <seastar/core/make_task.hh>
53 #include <seastar/core/systemwide_memory_barrier.hh>
54 #include <seastar/core/report_exception.hh>
55 #include <seastar/core/stall_sampler.hh>
56 #include <seastar/core/thread_cputime_clock.hh>
57 #include <seastar/core/abort_on_ebadf.hh>
58 #include <seastar/core/io_queue.hh>
59 #include <seastar/core/internal/buffer_allocator.hh>
60 #include <seastar/core/internal/io_desc.hh>
61 #include <seastar/core/internal/uname.hh>
62 #include <seastar/core/scheduling_specific.hh>
63 #include <seastar/core/smp_options.hh>
64 #include <seastar/util/log.hh>
65 #include <seastar/util/read_first_line.hh>
66 #include "core/file-impl.hh"
67 #include "core/reactor_backend.hh"
68 #include "core/syscall_result.hh"
69 #include "core/thread_pool.hh"
70 #include "syscall_work_queue.hh"
76 #include <sys/eventfd.h>
78 #include <boost/lexical_cast.hpp>
79 #include <boost/thread/barrier.hpp>
80 #include <boost/algorithm/string/classification.hpp>
81 #include <boost/algorithm/string/split.hpp>
82 #include <boost/iterator/counting_iterator.hpp>
83 #include <boost/range/numeric.hpp>
84 #include <boost/range/algorithm/sort.hpp>
85 #include <boost/range/algorithm/remove_if.hpp>
86 #include <boost/range/algorithm/find_if.hpp>
87 #include <boost/algorithm/clamp.hpp>
88 #include <boost/range/adaptor/transformed.hpp>
89 #include <boost/range/adaptor/map.hpp>
90 #include <boost/version.hpp>
93 #include <linux/types.h> // for xfs, below
94 #include <sys/ioctl.h>
95 #include <linux/perf_event.h>
96 #include <xfs/linux.h>
97 #define min min /* prevent xfs.h from defining min() as a macro */
100 #ifdef SEASTAR_HAVE_DPDK
101 #include <seastar/core/dpdk_rte.hh>
102 #include <rte_lcore.h>
103 #include <rte_launch.h>
105 #include <seastar/core/prefetch.hh>
111 #include <system_error>
115 #ifdef SEASTAR_SHUFFLE_TASK_QUEUE
119 #include <sys/mman.h>
120 #include <sys/utsname.h>
121 #include <linux/falloc.h>
122 #include <seastar/util/backtrace.hh>
123 #include <seastar/util/spinlock.hh>
124 #include <seastar/util/print_safe.hh>
128 #include <osv/newpoll.hh>
131 #if defined(__x86_64__) || defined(__i386__)
132 #include <xmmintrin.h>
135 #include <seastar/util/defer.hh>
136 #include <seastar/core/alien.hh>
137 #include <seastar/core/internal/stall_detector.hh>
138 #include <seastar/core/metrics.hh>
139 #include <seastar/core/execution_stage.hh>
140 #include <seastar/core/exception_hacks.hh>
141 #include <seastar/util/memory_diagnostics.hh>
142 #include <seastar/util/internal/iovec_utils.hh>
143 #include <seastar/util/internal/magic.hh>
145 #include <yaml-cpp/yaml.h>
147 #ifdef SEASTAR_TASK_HISTOGRAM
153 static_assert(posix::shutdown_mask(SHUT_RD
) == posix::rcv_shutdown
);
154 static_assert(posix::shutdown_mask(SHUT_WR
) == posix::snd_shutdown
);
155 static_assert(posix::shutdown_mask(SHUT_RDWR
) == (posix::snd_shutdown
| posix::rcv_shutdown
));
157 struct mountpoint_params
{
158 std::string mountpoint
= "none";
159 uint64_t read_bytes_rate
= std::numeric_limits
<uint64_t>::max();
160 uint64_t write_bytes_rate
= std::numeric_limits
<uint64_t>::max();
161 uint64_t read_req_rate
= std::numeric_limits
<uint64_t>::max();
162 uint64_t write_req_rate
= std::numeric_limits
<uint64_t>::max();
163 uint64_t read_saturation_length
= std::numeric_limits
<uint64_t>::max();
164 uint64_t write_saturation_length
= std::numeric_limits
<uint64_t>::max();
166 float rate_factor
= 1.0;
173 struct convert
<seastar::mountpoint_params
> {
174 static bool decode(const Node
& node
, seastar::mountpoint_params
& mp
) {
175 using namespace seastar
;
176 mp
.mountpoint
= node
["mountpoint"].as
<std::string
>().c_str();
177 mp
.read_bytes_rate
= parse_memory_size(node
["read_bandwidth"].as
<std::string
>());
178 mp
.read_req_rate
= parse_memory_size(node
["read_iops"].as
<std::string
>());
179 mp
.write_bytes_rate
= parse_memory_size(node
["write_bandwidth"].as
<std::string
>());
180 mp
.write_req_rate
= parse_memory_size(node
["write_iops"].as
<std::string
>());
181 if (node
["read_saturation_length"]) {
182 mp
.read_saturation_length
= parse_memory_size(node
["read_saturation_length"].as
<std::string
>());
184 if (node
["write_saturation_length"]) {
185 mp
.write_saturation_length
= parse_memory_size(node
["write_saturation_length"].as
<std::string
>());
187 if (node
["duplex"]) {
188 mp
.duplex
= node
["duplex"].as
<bool>();
190 if (node
["rate_factor"]) {
191 mp
.rate_factor
= node
["rate_factor"].as
<float>();
200 seastar::logger
seastar_logger("seastar");
201 seastar::logger
sched_logger("scheduler");
203 shard_id
reactor::cpu_id() const {
204 assert(_id
== this_shard_id());
209 reactor::register_one_priority_class(sstring name
, uint32_t shares
) {
210 return io_priority_class::register_one(std::move(name
), shares
);
214 reactor::update_shares_for_class(io_priority_class pc
, uint32_t shares
) {
215 return pc
.update_shares(shares
);
219 reactor::rename_priority_class(io_priority_class pc
, sstring new_name
) noexcept
{
220 return pc
.rename(std::move(new_name
));
223 void reactor::update_shares_for_queues(io_priority_class pc
, uint32_t shares
) {
224 for (auto&& q
: _io_queues
) {
225 q
.second
->update_shares_for_class(pc
, shares
);
229 future
<> reactor::update_bandwidth_for_queues(io_priority_class pc
, uint64_t bandwidth
) {
230 return smp::invoke_on_all([pc
, bandwidth
= bandwidth
/ _num_io_groups
] {
231 return parallel_for_each(engine()._io_queues
, [pc
, bandwidth
] (auto& queue
) {
232 return queue
.second
->update_bandwidth_for_class(pc
, bandwidth
);
237 void reactor::rename_queues(io_priority_class pc
, sstring new_name
) {
238 for (auto&& queue
: _io_queues
) {
239 queue
.second
->rename_priority_class(pc
, new_name
);
243 future
<std::tuple
<pollable_fd
, socket_address
>>
244 reactor::do_accept(pollable_fd_state
& listenfd
) {
245 return readable_or_writeable(listenfd
).then([this, &listenfd
] () mutable {
247 listenfd
.maybe_no_more_recv();
248 auto maybe_fd
= listenfd
.fd
.try_accept(sa
, SOCK_NONBLOCK
| SOCK_CLOEXEC
);
250 // We speculated that we will have an another connection, but got a false
251 // positive. Try again without speculation.
252 return do_accept(listenfd
);
254 // Speculate that there is another connection on this listening socket, to avoid
255 // a task-quota delay. Usually this will fail, but accept is a rare-enough operation
256 // that it is worth the false positive in order to withstand a connection storm
257 // without having to accept at a rate of 1 per task quota.
258 listenfd
.speculate_epoll(EPOLLIN
);
259 pollable_fd
pfd(std::move(*maybe_fd
), pollable_fd::speculation(EPOLLOUT
));
260 return make_ready_future
<std::tuple
<pollable_fd
, socket_address
>>(std::make_tuple(std::move(pfd
), std::move(sa
)));
264 future
<> reactor::do_connect(pollable_fd_state
& pfd
, socket_address
& sa
) {
265 pfd
.fd
.connect(sa
.u
.sa
, sa
.length());
266 return pfd
.writeable().then([&pfd
]() mutable {
267 auto err
= pfd
.fd
.getsockopt
<int>(SOL_SOCKET
, SO_ERROR
);
269 throw std::system_error(err
, std::system_category());
271 return make_ready_future
<>();
276 reactor::do_read(pollable_fd_state
& fd
, void* buffer
, size_t len
) {
277 return readable(fd
).then([this, &fd
, buffer
, len
] () mutable {
278 auto r
= fd
.fd
.read(buffer
, len
);
280 return do_read(fd
, buffer
, len
);
282 if (size_t(*r
) == len
) {
283 fd
.speculate_epoll(EPOLLIN
);
285 return make_ready_future
<size_t>(*r
);
289 future
<temporary_buffer
<char>>
290 reactor::do_read_some(pollable_fd_state
& fd
, internal::buffer_allocator
* ba
) {
291 return fd
.readable().then([this, &fd
, ba
] {
292 auto buffer
= ba
->allocate_buffer();
293 auto r
= fd
.fd
.read(buffer
.get_write(), buffer
.size());
295 // Speculation failure, try again with real polling this time
296 // Note we release the buffer and will reallocate it when poll
298 return do_read_some(fd
, ba
);
300 if (size_t(*r
) == buffer
.size()) {
301 fd
.speculate_epoll(EPOLLIN
);
304 return make_ready_future
<temporary_buffer
<char>>(std::move(buffer
));
309 reactor::do_recvmsg(pollable_fd_state
& fd
, const std::vector
<iovec
>& iov
) {
310 return readable(fd
).then([this, &fd
, iov
= iov
] () mutable {
312 mh
.msg_iov
= &iov
[0];
313 mh
.msg_iovlen
= iov
.size();
314 auto r
= fd
.fd
.recvmsg(&mh
, 0);
316 return do_recvmsg(fd
, iov
);
318 if (size_t(*r
) == internal::iovec_len(iov
)) {
319 fd
.speculate_epoll(EPOLLIN
);
321 return make_ready_future
<size_t>(*r
);
326 reactor::do_send(pollable_fd_state
& fd
, const void* buffer
, size_t len
) {
327 return writeable(fd
).then([this, &fd
, buffer
, len
] () mutable {
328 auto r
= fd
.fd
.send(buffer
, len
, MSG_NOSIGNAL
);
330 return do_send(fd
, buffer
, len
);
332 if (size_t(*r
) == len
) {
333 fd
.speculate_epoll(EPOLLOUT
);
335 return make_ready_future
<size_t>(*r
);
340 reactor::do_sendmsg(pollable_fd_state
& fd
, net::packet
& p
) {
341 return writeable(fd
).then([this, &fd
, &p
] () mutable {
342 static_assert(offsetof(iovec
, iov_base
) == offsetof(net::fragment
, base
) &&
343 sizeof(iovec::iov_base
) == sizeof(net::fragment::base
) &&
344 offsetof(iovec
, iov_len
) == offsetof(net::fragment
, size
) &&
345 sizeof(iovec::iov_len
) == sizeof(net::fragment::size
) &&
346 alignof(iovec
) == alignof(net::fragment
) &&
347 sizeof(iovec
) == sizeof(net::fragment
)
348 , "net::fragment and iovec should be equivalent");
350 iovec
* iov
= reinterpret_cast<iovec
*>(p
.fragment_array());
353 mh
.msg_iovlen
= std::min
<size_t>(p
.nr_frags(), IOV_MAX
);
354 auto r
= fd
.fd
.sendmsg(&mh
, MSG_NOSIGNAL
);
356 return do_sendmsg(fd
, p
);
358 if (size_t(*r
) == p
.len()) {
359 fd
.speculate_epoll(EPOLLOUT
);
361 return make_ready_future
<size_t>(*r
);
366 reactor::send_all_part(pollable_fd_state
& fd
, const void* buffer
, size_t len
, size_t completed
) {
367 if (completed
== len
) {
368 return make_ready_future
<>();
370 return _backend
->send(fd
, static_cast<const char*>(buffer
) + completed
, len
- completed
).then(
371 [&fd
, buffer
, len
, completed
, this] (size_t part
) mutable {
372 return send_all_part(fd
, buffer
, len
, completed
+ part
);
378 future
<temporary_buffer
<char>>
379 reactor::do_recv_some(pollable_fd_state
& fd
, internal::buffer_allocator
* ba
) {
380 return fd
.readable().then([this, &fd
, ba
] {
381 auto buffer
= ba
->allocate_buffer();
382 auto r
= fd
.fd
.recv(buffer
.get_write(), buffer
.size(), MSG_DONTWAIT
);
384 return do_recv_some(fd
, ba
);
386 if (size_t(*r
) == buffer
.size()) {
387 fd
.speculate_epoll(EPOLLIN
);
390 return make_ready_future
<temporary_buffer
<char>>(std::move(buffer
));
395 reactor::send_all(pollable_fd_state
& fd
, const void* buffer
, size_t len
) {
397 return send_all_part(fd
, buffer
, len
, 0);
400 future
<size_t> pollable_fd_state::read_some(char* buffer
, size_t size
) {
401 return engine()._backend
->read(*this, buffer
, size
);
404 future
<size_t> pollable_fd_state::read_some(uint8_t* buffer
, size_t size
) {
405 return engine()._backend
->read(*this, buffer
, size
);
408 future
<size_t> pollable_fd_state::read_some(const std::vector
<iovec
>& iov
) {
409 return engine()._backend
->recvmsg(*this, iov
);
412 future
<temporary_buffer
<char>> pollable_fd_state::read_some(internal::buffer_allocator
* ba
) {
413 return engine()._backend
->read_some(*this, ba
);
416 future
<size_t> pollable_fd_state::write_some(net::packet
& p
) {
417 return engine()._backend
->sendmsg(*this, p
);
420 future
<> pollable_fd_state::write_all(const char* buffer
, size_t size
) {
421 return engine().send_all(*this, buffer
, size
);
424 future
<> pollable_fd_state::write_all(const uint8_t* buffer
, size_t size
) {
425 return engine().send_all(*this, buffer
, size
);
428 future
<> pollable_fd_state::write_all(net::packet
& p
) {
429 return write_some(p
).then([this, &p
] (size_t size
) {
430 if (p
.len() == size
) {
431 return make_ready_future
<>();
438 future
<> pollable_fd_state::readable() {
439 return engine().readable(*this);
442 future
<> pollable_fd_state::writeable() {
443 return engine().writeable(*this);
446 future
<> pollable_fd_state::poll_rdhup() {
447 return engine().poll_rdhup(*this);
450 future
<> pollable_fd_state::readable_or_writeable() {
451 return engine().readable_or_writeable(*this);
454 future
<std::tuple
<pollable_fd
, socket_address
>> pollable_fd_state::accept() {
455 return engine()._backend
->accept(*this);
458 future
<> pollable_fd_state::connect(socket_address
& sa
) {
459 return engine()._backend
->connect(*this, sa
);
462 future
<temporary_buffer
<char>> pollable_fd_state::recv_some(internal::buffer_allocator
* ba
) {
463 maybe_no_more_recv();
464 return engine()._backend
->recv_some(*this, ba
);
467 future
<size_t> pollable_fd_state::recvmsg(struct msghdr
*msg
) {
468 maybe_no_more_recv();
469 return engine().readable(*this).then([this, msg
] {
470 auto r
= fd
.recvmsg(msg
, 0);
474 // We always speculate here to optimize for throughput in a workload
475 // with multiple outstanding requests. This way the caller can consume
476 // all messages without resorting to epoll. However this adds extra
477 // recvmsg() call when we hit the empty queue condition, so it may
478 // hurt request-response workload in which the queue is empty when we
479 // initially enter recvmsg(). If that turns out to be a problem, we can
480 // improve speculation by using recvmmsg().
481 speculate_epoll(EPOLLIN
);
482 return make_ready_future
<size_t>(*r
);
486 future
<size_t> pollable_fd_state::sendmsg(struct msghdr
* msg
) {
487 maybe_no_more_send();
488 return engine().writeable(*this).then([this, msg
] () mutable {
489 auto r
= fd
.sendmsg(msg
, 0);
493 // For UDP this will always speculate. We can't know if there's room
494 // or not, but most of the time there should be so the cost of mis-
495 // speculation is amortized.
496 if (size_t(*r
) == internal::iovec_len(msg
->msg_iov
, msg
->msg_iovlen
)) {
497 speculate_epoll(EPOLLOUT
);
499 return make_ready_future
<size_t>(*r
);
503 future
<size_t> pollable_fd_state::sendto(socket_address addr
, const void* buf
, size_t len
) {
504 maybe_no_more_send();
505 return engine().writeable(*this).then([this, buf
, len
, addr
] () mutable {
506 auto r
= fd
.sendto(addr
, buf
, len
, 0);
508 return sendto(std::move(addr
), buf
, len
);
510 // See the comment about speculation in sendmsg().
511 if (size_t(*r
) == len
) {
512 speculate_epoll(EPOLLOUT
);
514 return make_ready_future
<size_t>(*r
);
520 void set_need_preempt_var(const preemption_monitor
* np
) {
521 get_need_preempt_var() = np
;
524 #ifdef SEASTAR_TASK_HISTOGRAM
526 class task_histogram
{
527 static constexpr unsigned max_countdown
= 1'000'000;
528 std::unordered_map
<std::type_index
, uint64_t> _histogram
;
529 unsigned _countdown_to_print
= max_countdown
;
531 void add(const task
& t
) {
532 ++_histogram
[std::type_index(typeid(t
))];
533 if (!--_countdown_to_print
) {
535 _countdown_to_print
= max_countdown
;
540 seastar::fmt::print("task histogram, {:d} task types {:d} tasks\n", _histogram
.size(), max_countdown
- _countdown_to_print
);
541 for (auto&& type_count
: _histogram
) {
542 auto&& type
= type_count
.first
;
543 auto&& count
= type_count
.second
;
544 seastar::fmt::print(" {:10d} {}\n", count
, type
.name());
549 thread_local task_histogram this_thread_task_histogram
;
553 void task_histogram_add_task(const task
& t
) {
554 #ifdef SEASTAR_TASK_HISTOGRAM
555 this_thread_task_histogram
.add(t
);
561 using namespace std::chrono_literals
;
562 namespace fs
= std::filesystem
;
566 using namespace internal::linux_abi
;
568 std::atomic
<manual_clock::rep
> manual_clock::_now
;
570 constexpr unsigned reactor::max_queues
;
571 constexpr unsigned reactor::max_aio_per_queue
;
573 // Base version where this works; some filesystems were only fixed later, so
574 // this value is mixed in with filesystem-provided values later.
575 bool aio_nowait_supported
= internal::kernel_uname().whitelisted({"4.13"});
577 static bool sched_debug() {
581 template <typename
... Args
>
583 sched_print(const char* fmt
, Args
&&... args
) {
585 sched_logger
.trace(fmt
, std::forward
<Args
>(args
)...);
589 static std::atomic
<bool> abort_on_ebadf
= { false };
591 void set_abort_on_ebadf(bool do_abort
) {
592 abort_on_ebadf
.store(do_abort
);
595 bool is_abort_on_ebadf_enabled() {
596 return abort_on_ebadf
.load();
599 timespec
to_timespec(steady_clock_type::time_point t
) {
600 using ns
= std::chrono::nanoseconds
;
601 auto n
= std::chrono::duration_cast
<ns
>(t
.time_since_epoch()).count();
602 return { n
/ 1'000'000'000, n
% 1'000'000'000 };
605 void lowres_clock::update() noexcept
{
606 lowres_clock::_now
= lowres_clock::time_point(std::chrono::steady_clock::now().time_since_epoch());
607 lowres_system_clock::_now
= lowres_system_clock::time_point(std::chrono::system_clock::now().time_since_epoch());
610 template <typename Clock
>
612 timer
<Clock
>::~timer() {
614 engine().del_timer(this);
618 template <typename Clock
>
620 void timer
<Clock
>::arm(time_point until
, std::optional
<duration
> period
) noexcept
{
621 arm_state(until
, period
);
622 engine().add_timer(this);
625 template <typename Clock
>
627 void timer
<Clock
>::readd_periodic() noexcept
{
628 arm_state(Clock::now() + _period
.value(), {_period
.value()});
629 engine().queue_timer(this);
632 template <typename Clock
>
634 bool timer
<Clock
>::cancel() noexcept
{
640 engine().del_timer(this);
646 template class timer
<steady_clock_type
>;
647 template class timer
<lowres_clock
>;
648 template class timer
<manual_clock
>;
650 reactor::signals::signals() : _pending_signals(0) {
653 reactor::signals::~signals() {
656 ::pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
659 reactor::signals::signal_handler::signal_handler(int signo
, noncopyable_function
<void ()>&& handler
)
660 : _handler(std::move(handler
)) {
664 reactor::signals::handle_signal(int signo
, noncopyable_function
<void ()>&& handler
) {
665 _signal_handlers
.emplace(std::piecewise_construct
,
666 std::make_tuple(signo
), std::make_tuple(signo
, std::move(handler
)));
669 sa
.sa_sigaction
= [](int sig
, siginfo_t
*info
, void *p
) {
670 engine()._backend
->signal_received(sig
, info
, p
);
672 sa
.sa_mask
= make_empty_sigset_mask();
673 sa
.sa_flags
= SA_SIGINFO
| SA_RESTART
;
674 auto r
= ::sigaction(signo
, &sa
, nullptr);
675 throw_system_error_on(r
== -1);
676 auto mask
= make_sigset_mask(signo
);
677 r
= ::pthread_sigmask(SIG_UNBLOCK
, &mask
, NULL
);
678 throw_pthread_error(r
);
682 reactor::signals::handle_signal_once(int signo
, noncopyable_function
<void ()>&& handler
) {
683 return handle_signal(signo
, [fired
= false, handler
= std::move(handler
)] () mutable {
691 bool reactor::signals::poll_signal() {
692 auto signals
= _pending_signals
.load(std::memory_order_relaxed
);
694 _pending_signals
.fetch_and(~signals
, std::memory_order_relaxed
);
695 for (size_t i
= 0; i
< sizeof(signals
)*8; i
++) {
696 if (signals
& (1ull << i
)) {
697 _signal_handlers
.at(i
)._handler();
704 bool reactor::signals::pure_poll_signal() const {
705 return _pending_signals
.load(std::memory_order_relaxed
);
708 void reactor::signals::action(int signo
, siginfo_t
* siginfo
, void* ignore
) {
709 engine().start_handling_signal();
710 engine()._signals
._pending_signals
.fetch_or(1ull << signo
, std::memory_order_relaxed
);
713 void reactor::signals::failed_to_handle(int signo
) {
715 pthread_getname_np(pthread_self(), tname
, sizeof(tname
));
716 auto tid
= syscall(SYS_gettid
);
717 seastar_logger
.error("Failed to handle signal {} on thread {} ({}): engine not ready", signo
, tid
, tname
);
720 void reactor::handle_signal(int signo
, noncopyable_function
<void ()>&& handler
) {
721 _signals
.handle_signal(signo
, std::move(handler
));
724 // Accumulates an in-memory backtrace and flush to stderr eventually.
725 // Async-signal safe.
726 class backtrace_buffer
{
727 static constexpr unsigned _max_size
= 8 << 10;
729 char _buf
[_max_size
];
731 void flush() noexcept
{
732 print_safe(_buf
, _pos
);
736 void reserve(size_t len
) noexcept
{
737 assert(len
< _max_size
);
738 if (_pos
+ len
>= _max_size
) {
743 void append(const char* str
, size_t len
) noexcept
{
745 memcpy(_buf
+ _pos
, str
, len
);
749 void append(const char* str
) noexcept
{ append(str
, strlen(str
)); }
751 template <typename Integral
>
752 void append_decimal(Integral n
) noexcept
{
753 char buf
[sizeof(n
) * 3];
754 auto len
= convert_decimal_safe(buf
, sizeof(buf
), n
);
758 template <typename Integral
>
759 void append_hex(Integral ptr
) noexcept
{
760 char buf
[sizeof(ptr
) * 2];
761 auto p
= convert_hex_safe(buf
, sizeof(buf
), ptr
);
762 append(p
, (buf
+ sizeof(buf
)) - p
);
765 void append_backtrace() noexcept
{
766 backtrace([this] (frame f
) {
768 if (!f
.so
->name
.empty()) {
769 append(f
.so
->name
.c_str(), f
.so
->name
.size());
779 void append_backtrace_oneline() noexcept
{
780 backtrace([this] (frame f
) noexcept
{
781 reserve(3 + sizeof(f
.addr
) * 2);
788 static void print_with_backtrace(backtrace_buffer
& buf
, bool oneline
) noexcept
{
790 buf
.append(" on shard ");
791 buf
.append_decimal(this_shard_id());
795 buf
.append(".\nBacktrace:\n");
796 buf
.append_backtrace();
798 buf
.append(". Backtrace:");
799 buf
.append_backtrace_oneline();
805 static void print_with_backtrace(const char* cause
, bool oneline
= false) noexcept
{
806 backtrace_buffer buf
;
808 print_with_backtrace(buf
, oneline
);
811 // Installs signal handler stack for current thread.
812 // The stack remains installed as long as the returned object is kept alive.
813 // When it goes out of scope the previous handler is restored.
814 static decltype(auto) install_signal_handler_stack() {
815 size_t size
= SIGSTKSZ
;
816 auto mem
= std::make_unique
<char[]>(size
);
819 stack
.ss_sp
= mem
.get();
821 stack
.ss_size
= size
;
822 auto r
= sigaltstack(&stack
, &prev_stack
);
823 throw_system_error_on(r
== -1);
824 return defer([mem
= std::move(mem
), prev_stack
] () mutable noexcept
{
826 auto r
= sigaltstack(&prev_stack
, NULL
);
827 throw_system_error_on(r
== -1);
829 mem
.release(); // We failed to restore previous stack, must leak it.
830 seastar_logger
.error("Failed to restore signal stack: {}", std::current_exception());
835 reactor::task_queue::task_queue(unsigned id
, sstring name
, float shares
)
836 : _shares(std::max(shares
, 1.0f
))
837 , _reciprocal_shares_times_2_power_32((uint64_t(1) << 32) / _shares
)
840 , _name(std::move(name
)) {
845 reactor::task_queue::register_stats() {
846 seastar::metrics::metric_groups new_metrics
;
847 namespace sm
= seastar::metrics
;
848 static auto group
= sm::label("group");
849 auto group_label
= group(_name
);
850 new_metrics
.add_group("scheduler", {
851 sm::make_counter("runtime_ms", [this] {
852 return std::chrono::duration_cast
<std::chrono::milliseconds
>(_runtime
).count();
853 }, sm::description("Accumulated runtime of this task queue; an increment rate of 1000ms per second indicates full utilization"),
855 sm::make_counter("waittime_ms", [this] {
856 return std::chrono::duration_cast
<std::chrono::milliseconds
>(_waittime
).count();
857 }, sm::description("Accumulated waittime of this task queue; an increment rate of 1000ms per second indicates queue is waiting for something (e.g. IO)"),
859 sm::make_counter("starvetime_ms", [this] {
860 return std::chrono::duration_cast
<std::chrono::milliseconds
>(_starvetime
).count();
861 }, sm::description("Accumulated starvation time of this task queue; an increment rate of 1000ms per second indicates the scheduler feels really bad"),
863 sm::make_counter("tasks_processed", _tasks_processed
,
864 sm::description("Count of tasks executing on this queue; indicates together with runtime_ms indicates length of tasks"),
866 sm::make_gauge("queue_length", [this] { return _q
.size(); },
867 sm::description("Size of backlog on this queue, in tasks; indicates whether the queue is busy and/or contended"),
869 sm::make_gauge("shares", [this] { return _shares
; },
870 sm::description("Shares allocated to this queue"),
872 sm::make_counter("time_spent_on_task_quota_violations_ms", [this] {
873 return _time_spent_on_task_quota_violations
/ 1ms
;
874 }, sm::description("Total amount in milliseconds we were in violation of the task quota"),
877 _metrics
= std::exchange(new_metrics
, {});
881 reactor::task_queue::rename(sstring new_name
) {
882 if (_name
!= new_name
) {
889 __attribute__((no_sanitize("undefined"))) // multiplication below may overflow; we check for that
890 #elif defined(__GNUC__)
891 [[gnu::no_sanitize_undefined
]]
895 reactor::task_queue::to_vruntime(sched_clock::duration runtime
) const {
896 auto scaled
= (runtime
.count() * _reciprocal_shares_times_2_power_32
) >> 32;
897 // Prevent overflow from returning ridiculous values
898 return std::max
<int64_t>(scaled
, 0);
902 reactor::task_queue::set_shares(float shares
) noexcept
{
903 _shares
= std::max(shares
, 1.0f
);
904 _reciprocal_shares_times_2_power_32
= (uint64_t(1) << 32) / _shares
;
908 reactor::account_runtime(task_queue
& tq
, sched_clock::duration runtime
) {
909 if (runtime
> (2 * _task_quota
)) {
910 tq
._time_spent_on_task_quota_violations
+= runtime
- _task_quota
;
912 tq
._vruntime
+= tq
.to_vruntime(runtime
);
913 tq
._runtime
+= runtime
;
917 reactor::account_idle(sched_clock::duration runtime
) {
918 // anything to do here?
921 struct reactor::task_queue::indirect_compare
{
922 bool operator()(const task_queue
* tq1
, const task_queue
* tq2
) const {
923 return tq1
->_vruntime
< tq2
->_vruntime
;
927 reactor::reactor(std::shared_ptr
<smp
> smp
, alien::instance
& alien
, unsigned id
, reactor_backend_selector rbs
, reactor_config cfg
)
928 : _smp(std::move(smp
))
931 , _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC
))
932 , _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC
, TFD_CLOEXEC
))
936 [&] { timer_thread_func(); }, sched::thread::attr().stack(4096).name("timer_thread").pin(sched::cpu::current()))
937 , _engine_thread(sched::thread::current())
940 , _cpu_stall_detector(internal::make_cpu_stall_detector())
941 , _reuseport(posix_reuseport_detect())
942 , _thread_pool(std::make_unique
<thread_pool
>(this, seastar::format("syscall-{}", id
))) {
944 * The _backend assignment is here, not on the initialization list as
945 * the chosen backend constructor may want to handle signals and thus
946 * needs the _signals._signal_handlers map to be initialized.
948 _backend
= rbs
.create(*this);
949 *internal::get_scheduling_group_specific_thread_local_data_ptr() = &_scheduling_group_specific_data
;
950 _task_queues
.push_back(std::make_unique
<task_queue
>(0, "main", 1000));
951 _task_queues
.push_back(std::make_unique
<task_queue
>(1, "atexit", 1000));
952 _at_destroy_tasks
= _task_queues
.back().get();
953 set_need_preempt_var(&_preemption_monitor
);
954 seastar::thread_impl::init();
955 _backend
->start_tick();
958 _timer_thread
.start();
962 sigaddset(&mask
, internal::cpu_stall_detector::signal_number());
963 auto r
= ::pthread_sigmask(SIG_UNBLOCK
, &mask
, NULL
);
966 memory::set_reclaim_hook([this] (std::function
<void ()> reclaim_fn
) {
967 add_high_priority_task(make_task(default_scheduling_group(), [fn
= std::move(reclaim_fn
)] {
973 reactor::~reactor() {
976 sigaddset(&mask
, internal::cpu_stall_detector::signal_number());
977 auto r
= ::pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
980 _backend
->stop_tick();
981 auto eraser
= [](auto& list
) {
982 while (!list
.empty()) {
983 auto& timer
= *list
.begin();
987 eraser(_expired_timers
);
988 eraser(_expired_lowres_timers
);
989 eraser(_expired_manual_timers
);
990 auto& sg_data
= _scheduling_group_specific_data
;
991 for (auto&& tq
: _task_queues
) {
993 auto& this_sg
= sg_data
.per_scheduling_group_data
[tq
->_id
];
994 // The following line will preserve the convention that constructor and destructor functions
995 // for the per sg values are called in the context of the containing scheduling group.
996 *internal::current_scheduling_group_ptr() = scheduling_group(tq
->_id
);
997 for (size_t key
: boost::irange
<size_t>(0, sg_data
.scheduling_group_key_configs
.size())) {
998 void* val
= this_sg
.specific_vals
[key
];
1000 if (sg_data
.scheduling_group_key_configs
[key
].destructor
) {
1001 sg_data
.scheduling_group_key_configs
[key
].destructor(val
);
1004 this_sg
.specific_vals
[key
] = nullptr;
1011 reactor::sched_stats
1012 reactor::get_sched_stats() const {
1014 ret
.tasks_processed
= tasks_processed();
1018 future
<> reactor::readable(pollable_fd_state
& fd
) {
1019 return _backend
->readable(fd
);
1022 future
<> reactor::writeable(pollable_fd_state
& fd
) {
1023 return _backend
->writeable(fd
);
1026 future
<> reactor::readable_or_writeable(pollable_fd_state
& fd
) {
1027 return _backend
->readable_or_writeable(fd
);
1030 future
<> reactor::poll_rdhup(pollable_fd_state
& fd
) {
1031 return _backend
->poll_rdhup(fd
);
1034 void reactor::set_strict_dma(bool value
) {
1035 _strict_o_direct
= value
;
1038 void reactor::set_bypass_fsync(bool value
) {
1039 _bypass_fsync
= value
;
1043 reactor::reset_preemption_monitor() {
1044 return _backend
->reset_preemption_monitor();
1048 reactor::request_preemption() {
1049 return _backend
->request_preemption();
1052 void reactor::start_handling_signal() {
1053 return _backend
->start_handling_signal();
1056 namespace internal
{
1058 cpu_stall_detector::cpu_stall_detector(cpu_stall_detector_config cfg
)
1059 : _shard_id(this_shard_id()) {
1060 // glib's backtrace() calls dlopen("libgcc_s.so.1") once to resolve unwind related symbols.
1061 // If first stall detector invocation happens during another dlopen() call the calling thread
1062 // will deadlock. The dummy call here makes sure that backtrace's initialization happens in
1064 backtrace([] (frame
) {});
1067 namespace sm
= seastar::metrics
;
1069 _metrics
.add_group("stall_detector", {
1070 sm::make_counter("reported", _total_reported
, sm::description("Total number of reported stalls, look in the traces for the exact reason"))});
1072 // note: if something is added here that can, it should take care to destroy _timer.
1075 cpu_stall_detector_posix_timer::cpu_stall_detector_posix_timer(cpu_stall_detector_config cfg
) : cpu_stall_detector(cfg
) {
1076 struct sigevent sev
= {};
1077 sev
.sigev_notify
= SIGEV_THREAD_ID
;
1078 sev
.sigev_signo
= signal_number();
1079 sev
._sigev_un
._tid
= syscall(SYS_gettid
);
1080 int err
= timer_create(CLOCK_THREAD_CPUTIME_ID
, &sev
, &_timer
);
1082 throw std::system_error(std::error_code(err
, std::system_category()));
1086 cpu_stall_detector_posix_timer::~cpu_stall_detector_posix_timer() {
1087 timer_delete(_timer
);
1090 cpu_stall_detector_config
1091 cpu_stall_detector::get_config() const {
1095 void cpu_stall_detector::update_config(cpu_stall_detector_config cfg
) {
1097 _threshold
= std::chrono::duration_cast
<sched_clock::duration
>(cfg
.threshold
);
1098 _slack
= std::chrono::duration_cast
<sched_clock::duration
>(cfg
.threshold
* cfg
.slack
);
1099 _stall_detector_reports_per_minute
= cfg
.stall_detector_reports_per_minute
;
1100 _max_reports_per_minute
= cfg
.stall_detector_reports_per_minute
;
1101 _rearm_timer_at
= reactor::now();
1104 void cpu_stall_detector::maybe_report() {
1105 if (_reported
++ < _max_reports_per_minute
) {
1109 // We use a tick at every timer firing so we can report suppressed backtraces.
1110 // Best case it's a correctly predicted branch. If a backtrace had happened in
1111 // the near past it's an increment and two branches.
1113 // We can do it a cheaper if we don't report suppressed backtraces.
1114 void cpu_stall_detector::on_signal() {
1115 auto tasks_processed
= engine().tasks_processed();
1116 auto last_seen
= _last_tasks_processed_seen
.load(std::memory_order_relaxed
);
1118 return; // stall detector in not active
1119 } else if (last_seen
== tasks_processed
) {
1120 // we defer to the check of spuriousness to inside this unlikely condition
1121 // since the check itself can be costly, involving a syscall (reactor::now())
1122 if (is_spurious_signal()) {
1125 // no task was processed - report unless supressed
1129 _last_tasks_processed_seen
.store(tasks_processed
, std::memory_order_relaxed
);
1134 void cpu_stall_detector::report_suppressions(sched_clock::time_point now
) {
1135 if (now
> _minute_mark
+ 60s
) {
1136 if (_reported
> _max_reports_per_minute
) {
1137 auto suppressed
= _reported
- _max_reports_per_minute
;
1138 backtrace_buffer buf
;
1139 // Reuse backtrace buffer infrastructure so we don't have to allocate here
1140 buf
.append("Rate-limit: suppressed ");
1141 buf
.append_decimal(suppressed
);
1142 suppressed
== 1 ? buf
.append(" backtrace") : buf
.append(" backtraces");
1143 buf
.append(" on shard ");
1144 buf
.append_decimal(_shard_id
);
1148 reset_suppression_state(now
);
1153 cpu_stall_detector::reset_suppression_state(sched_clock::time_point now
) {
1158 void cpu_stall_detector_posix_timer::arm_timer() {
1159 auto its
= posix::to_relative_itimerspec(_threshold
* _report_at
+ _slack
, 0s
);
1160 timer_settime(_timer
, 0, &its
, nullptr);
1163 void cpu_stall_detector::start_task_run(sched_clock::time_point now
) {
1164 if (now
> _rearm_timer_at
) {
1165 report_suppressions(now
);
1167 _run_started_at
= now
;
1168 _rearm_timer_at
= now
+ _threshold
* _report_at
;
1171 _last_tasks_processed_seen
.store(engine().tasks_processed(), std::memory_order_relaxed
);
1172 std::atomic_signal_fence(std::memory_order_release
); // Don't delay this write, so the signal handler can see it
1175 void cpu_stall_detector::end_task_run(sched_clock::time_point now
) {
1176 std::atomic_signal_fence(std::memory_order_acquire
); // Don't hoist this write, so the signal handler can see it
1177 _last_tasks_processed_seen
.store(0, std::memory_order_relaxed
);
1180 void cpu_stall_detector_posix_timer::start_sleep() {
1181 auto its
= posix::to_relative_itimerspec(0s
, 0s
);
1182 timer_settime(_timer
, 0, &its
, nullptr);
1183 _rearm_timer_at
= reactor::now();
1186 void cpu_stall_detector::end_sleep() {
1190 perf_event_open(struct perf_event_attr
* hw_event
, pid_t pid
, int cpu
, int group_fd
, unsigned long flags
) {
1191 return syscall(__NR_perf_event_open
, hw_event
, pid
, cpu
, group_fd
, flags
);
1194 cpu_stall_detector_linux_perf_event::cpu_stall_detector_linux_perf_event(file_desc fd
, cpu_stall_detector_config cfg
)
1195 : cpu_stall_detector(cfg
), _fd(std::move(fd
)) {
1196 void* ret
= ::mmap(nullptr, 2*getpagesize(), PROT_READ
|PROT_WRITE
, MAP_SHARED
, _fd
.get(), 0);
1197 if (ret
== MAP_FAILED
) {
1200 _mmap
= static_cast<struct ::perf_event_mmap_page
*>(ret
);
1201 _data_area
= reinterpret_cast<char*>(_mmap
) + getpagesize();
1202 _data_area_mask
= getpagesize() - 1;
1205 cpu_stall_detector_linux_perf_event::~cpu_stall_detector_linux_perf_event() {
1206 ::munmap(_mmap
, 2*getpagesize());
1210 cpu_stall_detector_linux_perf_event::arm_timer() {
1211 auto period
= _threshold
* _report_at
+ _slack
;
1212 uint64_t ns
= period
/ 1ns
;
1213 _next_signal_time
= reactor::now() + period
;
1215 // clear out any existing records in the ring buffer, so when we get interrupted next time
1216 // we have only the stack associated with that interrupt, and so we don't overflow.
1217 data_area_reader(*this).skip_all();
1218 if (__builtin_expect(_enabled
&& _current_period
== ns
, 1)) {
1219 // Common case - we're re-arming with the same period, the counter
1220 // is already enabled.
1222 // We want to set the next interrupt to ns from now, and somewhat oddly the
1223 // way to do this is PERF_EVENT_IOC_PERIOD, even with the same period as
1224 // already configured, see the code at:
1226 // https://elixir.bootlin.com/linux/v5.15.86/source/kernel/events/core.c#L5636
1228 // Ths change is intentional: kernel commit bad7192b842c83e580747ca57104dd51fe08c223
1229 // so we can resumably rely on it.
1230 _fd
.ioctl(PERF_EVENT_IOC_PERIOD
, ns
);
1233 // Uncommon case - we're moving from disabled to enabled, or changing
1234 // the period. Issue more calls and be careful.
1235 _fd
.ioctl(PERF_EVENT_IOC_DISABLE
, 0); // avoid false alarms while we modify stuff
1236 _fd
.ioctl(PERF_EVENT_IOC_PERIOD
, ns
);
1237 _fd
.ioctl(PERF_EVENT_IOC_RESET
, 0);
1238 _fd
.ioctl(PERF_EVENT_IOC_ENABLE
, 0);
1240 _current_period
= ns
;
1245 cpu_stall_detector_linux_perf_event::start_sleep() {
1246 _fd
.ioctl(PERF_EVENT_IOC_DISABLE
, 0);
1251 cpu_stall_detector_linux_perf_event::is_spurious_signal() {
1252 // If the current time is before the expected signal time, it is
1253 // probably a spurious signal. One reason this could occur is that
1254 // PERF_EVENT_IOC_PERIOD does not reset the current overflow point
1255 // on kernels prior to 3.14 (or 3.7 on Arm).
1256 return reactor::now() < _next_signal_time
;
1260 cpu_stall_detector_linux_perf_event::maybe_report_kernel_trace() {
1261 data_area_reader
reader(*this);
1262 auto current_record
= [&] () -> ::perf_event_header
{
1263 return reader
.read_struct
<perf_event_header
>();
1266 while (reader
.have_data()) {
1267 auto record
= current_record();
1269 if (record
.type
!= PERF_RECORD_SAMPLE
) {
1270 reader
.skip(record
.size
- sizeof(record
));
1274 auto nr
= reader
.read_u64();
1275 backtrace_buffer buf
;
1276 buf
.append("kernel callstack:");
1277 for (uint64_t i
= 0; i
< nr
; ++i
) {
1279 buf
.append_hex(uintptr_t(reader
.read_u64()));
1286 std::unique_ptr
<cpu_stall_detector_linux_perf_event
>
1287 cpu_stall_detector_linux_perf_event::try_make(cpu_stall_detector_config cfg
) {
1288 ::perf_event_attr pea
= {
1289 .type
= PERF_TYPE_SOFTWARE
,
1290 .size
= sizeof(pea
),
1291 .config
= PERF_COUNT_SW_TASK_CLOCK
, // more likely to work on virtual machines than hardware events
1292 .sample_period
= 1'000'000'000, // Needs non-zero value or PERF_IOC_PERIOD gets confused
1293 .sample_type
= PERF_SAMPLE_CALLCHAIN
,
1295 .exclude_callchain_user
= 1, // we're using backtrace() to capture the user callchain
1298 unsigned long flags
= 0;
1299 if (internal::kernel_uname().whitelisted({"3.14"})) {
1300 flags
|= PERF_FLAG_FD_CLOEXEC
;
1302 int fd
= perf_event_open(&pea
, 0, -1, -1, flags
);
1304 throw std::system_error(errno
, std::system_category(), "perf_event_open() failed");
1306 auto desc
= file_desc::from_fd(fd
);
1307 struct f_owner_ex sig_owner
= {
1308 .type
= F_OWNER_TID
,
1309 .pid
= static_cast<pid_t
>(syscall(SYS_gettid
)),
1311 auto ret1
= ::fcntl(fd
, F_SETOWN_EX
, &sig_owner
);
1315 auto ret2
= ::fcntl(fd
, F_SETSIG
, signal_number());
1319 auto fd_flags
= ::fcntl(fd
, F_GETFL
);
1320 if (fd_flags
== -1) {
1323 auto ret3
= ::fcntl(fd
, F_SETFL
, fd_flags
| O_ASYNC
);
1327 return std::make_unique
<cpu_stall_detector_linux_perf_event
>(std::move(desc
), std::move(cfg
));
1331 std::unique_ptr
<cpu_stall_detector
> make_cpu_stall_detector(cpu_stall_detector_config cfg
) {
1333 return cpu_stall_detector_linux_perf_event::try_make(cfg
);
1335 seastar_logger
.warn("Creation of perf_event based stall detector failed, falling back to posix timer: {}", std::current_exception());
1336 return std::make_unique
<cpu_stall_detector_posix_timer
>(cfg
);
1340 void cpu_stall_detector::generate_trace() {
1341 auto delta
= reactor::now() - _run_started_at
;
1344 if (_config
.report
) {
1349 backtrace_buffer buf
;
1350 buf
.append("Reactor stalled for ");
1351 buf
.append_decimal(uint64_t(delta
/ 1ms
));
1353 print_with_backtrace(buf
, _config
.oneline
);
1354 maybe_report_kernel_trace();
1357 } // internal namespace
1360 reactor::update_blocked_reactor_notify_ms(std::chrono::milliseconds ms
) {
1361 auto cfg
= _cpu_stall_detector
->get_config();
1362 if (ms
!= cfg
.threshold
) {
1364 _cpu_stall_detector
->update_config(cfg
);
1365 seastar_logger
.info("updated: blocked-reactor-notify-ms={}", ms
.count());
1369 std::chrono::milliseconds
1370 reactor::get_blocked_reactor_notify_ms() const {
1371 auto d
= _cpu_stall_detector
->get_config().threshold
;
1372 return std::chrono::duration_cast
<std::chrono::milliseconds
>(d
);
1376 reactor::set_stall_detector_report_function(std::function
<void ()> report
) {
1377 auto cfg
= _cpu_stall_detector
->get_config();
1378 cfg
.report
= std::move(report
);
1379 _cpu_stall_detector
->update_config(std::move(cfg
));
1380 _cpu_stall_detector
->reset_suppression_state(reactor::now());
1383 std::function
<void ()>
1384 reactor::get_stall_detector_report_function() const {
1385 return _cpu_stall_detector
->get_config().report
;
1389 reactor::block_notifier(int) {
1390 engine()._cpu_stall_detector
->on_signal();
1393 template <typename T
, typename E
, typename EnableFunc
>
1394 void reactor::complete_timers(T
& timers
, E
& expired_timers
, EnableFunc
&& enable_fn
) noexcept(noexcept(enable_fn())) {
1395 expired_timers
= timers
.expire(timers
.now());
1396 for (auto& t
: expired_timers
) {
1399 const auto prev_sg
= current_scheduling_group();
1400 while (!expired_timers
.empty()) {
1401 auto t
= &*expired_timers
.begin();
1402 expired_timers
.pop_front();
1407 t
->readd_periodic();
1410 *internal::current_scheduling_group_ptr() = t
->_sg
;
1413 seastar_logger
.error("Timer callback failed: {}", std::current_exception());
1417 // complete_timers() can be called from the context of run_tasks()
1418 // as well so we need to restore the previous scheduling group (set by run_tasks()).
1419 *internal::current_scheduling_group_ptr() = prev_sg
;
1424 void reactor::timer_thread_func() {
1425 sched::timer
tmr(*sched::thread::current());
1426 WITH_LOCK(_timer_mutex
) {
1428 if (_timer_due
!= 0) {
1429 set_timer(tmr
, _timer_due
);
1430 _timer_cond
.wait(_timer_mutex
, &tmr
);
1431 if (tmr
.expired()) {
1433 _engine_thread
->unsafe_stop();
1434 _pending_tasks
.push_front(make_task(default_scheduling_group(), [this] {
1435 complete_timers(_timers
, _expired_timers
, [this] {
1436 if (!_timers
.empty()) {
1437 enable_timer(_timers
.get_next_timeout());
1441 _engine_thread
->wake();
1446 _timer_cond
.wait(_timer_mutex
);
1452 void reactor::set_timer(sched::timer
&tmr
, s64 t
) {
1453 using namespace osv::clock
;
1454 tmr
.set(wall::time_point(std::chrono::nanoseconds(t
)));
1458 class network_stack_factory
{
1459 network_stack_entry::factory_func _func
;
1462 network_stack_factory(noncopyable_function
<future
<std::unique_ptr
<network_stack
>> (const program_options::option_group
&)> func
)
1463 : _func(std::move(func
)) { }
1464 future
<std::unique_ptr
<network_stack
>> operator()(const program_options::option_group
& opts
) { return _func(opts
); }
1467 void reactor::configure(const reactor_options
& opts
) {
1468 _network_stack_ready
= opts
.network_stack
.get_selected_candidate()(*opts
.network_stack
.get_selected_candidate_opts());
1470 _handle_sigint
= !opts
.no_handle_interrupt
;
1471 auto task_quota
= opts
.task_quota_ms
.get_value() * 1ms
;
1472 _task_quota
= std::chrono::duration_cast
<sched_clock::duration
>(task_quota
);
1474 auto blocked_time
= opts
.blocked_reactor_notify_ms
.get_value() * 1ms
;
1475 internal::cpu_stall_detector_config csdc
;
1476 csdc
.threshold
= blocked_time
;
1477 csdc
.stall_detector_reports_per_minute
= opts
.blocked_reactor_reports_per_minute
.get_value();
1478 csdc
.oneline
= opts
.blocked_reactor_report_format_oneline
.get_value();
1479 _cpu_stall_detector
->update_config(csdc
);
1481 _max_task_backlog
= opts
.max_task_backlog
.get_value();
1482 _max_poll_time
= opts
.idle_poll_time_us
.get_value() * 1us
;
1483 if (opts
.poll_mode
) {
1484 _max_poll_time
= std::chrono::nanoseconds::max();
1486 if (opts
.overprovisioned
&& opts
.idle_poll_time_us
.defaulted() && !opts
.poll_mode
) {
1487 _max_poll_time
= 0us
;
1489 set_strict_dma(!opts
.relaxed_dma
);
1490 if (!opts
.poll_aio
.get_value() || (opts
.poll_aio
.defaulted() && opts
.overprovisioned
)) {
1491 _aio_eventfd
= pollable_fd(file_desc::eventfd(0, 0));
1493 set_bypass_fsync(opts
.unsafe_bypass_fsync
.get_value());
1494 _kernel_page_cache
= opts
.kernel_page_cache
.get_value();
1495 _force_io_getevents_syscall
= opts
.force_aio_syscalls
.get_value();
1496 aio_nowait_supported
= opts
.linux_aio_nowait
.get_value();
1497 _have_aio_fsync
= opts
.aio_fsync
.get_value();
1501 reactor::posix_listen(socket_address sa
, listen_options opts
) {
1502 auto specific_protocol
= (int)(opts
.proto
);
1503 if (sa
.is_af_unix()) {
1504 // no type-safe way to create listen_opts with proto=0
1505 specific_protocol
= 0;
1507 static auto somaxconn
= [] {
1508 std::optional
<int> result
;
1509 std::ifstream
ifs("/proc/sys/net/core/somaxconn");
1516 if (somaxconn
&& *somaxconn
< opts
.listen_backlog
) {
1518 "Warning: /proc/sys/net/core/somaxconn is set to {:d} "
1519 "which is lower than the backlog parameter {:d} used for listen(), "
1520 "please change it with `sysctl -w net.core.somaxconn={:d}`\n",
1521 *somaxconn
, opts
.listen_backlog
, opts
.listen_backlog
);
1524 file_desc fd
= file_desc::socket(sa
.u
.sa
.sa_family
, SOCK_STREAM
| SOCK_NONBLOCK
| SOCK_CLOEXEC
, specific_protocol
);
1525 if (opts
.reuse_address
) {
1526 fd
.setsockopt(SOL_SOCKET
, SO_REUSEADDR
, 1);
1528 if (_reuseport
&& !sa
.is_af_unix())
1529 fd
.setsockopt(SOL_SOCKET
, SO_REUSEPORT
, 1);
1532 fd
.bind(sa
.u
.sa
, sa
.length());
1533 fd
.listen(opts
.listen_backlog
);
1534 } catch (const std::system_error
& s
) {
1535 throw std::system_error(s
.code(), fmt::format("posix_listen failed for address {}", sa
));
1538 return pollable_fd(std::move(fd
));
1542 reactor::posix_reuseport_detect() {
1543 return false; // FIXME: reuseport currently leads to heavy load imbalance. Until we fix that, just
1544 // disable it unconditionally.
1546 file_desc fd
= file_desc::socket(AF_INET
, SOCK_STREAM
| SOCK_NONBLOCK
| SOCK_CLOEXEC
, 0);
1547 fd
.setsockopt(SOL_SOCKET
, SO_REUSEPORT
, 1);
1549 } catch(std::system_error
& e
) {
1554 void pollable_fd_state::maybe_no_more_recv() {
1555 if (shutdown_mask
& posix::rcv_shutdown
) {
1556 throw std::system_error(std::error_code(ECONNABORTED
, std::system_category()));
1560 void pollable_fd_state::maybe_no_more_send() {
1561 if (shutdown_mask
& posix::snd_shutdown
) {
1562 throw std::system_error(std::error_code(ECONNABORTED
, std::system_category()));
1566 void pollable_fd_state::forget() {
1567 engine()._backend
->forget(*this);
1570 void intrusive_ptr_release(pollable_fd_state
* fd
) {
1576 pollable_fd::pollable_fd(file_desc fd
, pollable_fd::speculation speculate
)
1577 : _s(engine()._backend
->make_pollable_fd_state(std::move(fd
), speculate
))
1580 void pollable_fd::shutdown(int how
, shutdown_kernel_only kernel_only
) {
1582 // TCP will respond to shutdown() by returning ECONNABORT on the next IO,
1583 // but UDP responds by returning AGAIN. The shutdown_mask tells us to convert
1584 // EAGAIN to ECONNABORT in that case.
1585 _s
->shutdown_mask
|= posix::shutdown_mask(how
);
1587 engine()._backend
->shutdown(*_s
, how
);
1591 reactor::make_pollable_fd(socket_address sa
, int proto
) {
1592 int maybe_nonblock
= _backend
->do_blocking_io() ? 0 : SOCK_NONBLOCK
;
1593 file_desc fd
= file_desc::socket(sa
.u
.sa
.sa_family
, SOCK_STREAM
| maybe_nonblock
| SOCK_CLOEXEC
, proto
);
1594 return pollable_fd(std::move(fd
));
1598 reactor::posix_connect(pollable_fd pfd
, socket_address sa
, socket_address local
) {
1599 #ifdef IP_BIND_ADDRESS_NO_PORT
1600 if (!sa
.is_af_unix()) {
1602 // do not reserve an ephemeral port when using bind() with port number 0.
1603 // connect() will handle it later. The reason for that is that bind() may fail
1604 // to allocate a port while connect will success, this is because bind() does not
1605 // know dst address and has to find globally unique local port.
1606 pfd
.get_file_desc().setsockopt(SOL_IP
, IP_BIND_ADDRESS_NO_PORT
, 1);
1607 } catch (std::system_error
& err
) {
1608 if (err
.code() != std::error_code(ENOPROTOOPT
, std::system_category())) {
1614 if (!local
.is_wildcard()) {
1615 // call bind() only if local address is not wildcard
1616 pfd
.get_file_desc().bind(local
.u
.sa
, local
.length());
1618 return pfd
.connect(sa
).finally([pfd
] {});
1622 reactor::listen(socket_address sa
, listen_options opt
) {
1623 return server_socket(_network_stack
->listen(sa
, opt
));
1626 future
<connected_socket
>
1627 reactor::connect(socket_address sa
) {
1628 return _network_stack
->connect(sa
);
1631 future
<connected_socket
>
1632 reactor::connect(socket_address sa
, socket_address local
, transport proto
) {
1633 return _network_stack
->connect(sa
, local
, proto
);
1636 void io_completion::complete_with(ssize_t res
) {
1642 ++engine()._io_stats
.aio_errors
;
1644 throw_kernel_error(res
);
1646 set_exception(std::current_exception());
1651 reactor::flush_pending_aio() {
1652 for (auto& ioq
: _io_queues
) {
1653 ioq
.second
->poll_io_queue();
1658 steady_clock_type::time_point
reactor::next_pending_aio() const noexcept
{
1659 steady_clock_type::time_point next
= steady_clock_type::time_point::max();
1661 for (auto& ioq
: _io_queues
) {
1662 steady_clock_type::time_point n
= ioq
.second
->next_pending_aio();
1664 next
= std::move(n
);
1672 reactor::reap_kernel_completions() {
1673 return _backend
->reap_kernel_completions();
1676 const io_priority_class
& default_priority_class() {
1677 static thread_local
auto shard_default_class
= [] {
1678 return io_priority_class::register_one("default", 1);
1680 return shard_default_class
;
1683 namespace internal
{
1685 size_t sanitize_iovecs(std::vector
<iovec
>& iov
, size_t disk_alignment
) noexcept
{
1686 if (iov
.size() > IOV_MAX
) {
1687 iov
.resize(IOV_MAX
);
1689 auto length
= iovec_len(iov
);
1690 while (auto rest
= length
& (disk_alignment
- 1)) {
1691 if (iov
.back().iov_len
<= rest
) {
1692 length
-= iov
.back().iov_len
;
1695 iov
.back().iov_len
-= rest
;
1705 reactor::open_file_dma(std::string_view nameref
, open_flags flags
, file_open_options options
) noexcept
{
1706 return do_with(static_cast<int>(flags
), std::move(options
), [this, nameref
] (auto& open_flags
, file_open_options
& options
) {
1707 sstring
name(nameref
);
1708 return _thread_pool
->submit
<syscall_result
<int>>([this, name
, &open_flags
, &options
, strict_o_direct
= _strict_o_direct
, bypass_fsync
= _bypass_fsync
] () mutable {
1709 // We want O_DIRECT, except in three cases:
1710 // - tmpfs (which doesn't support it, but works fine anyway)
1711 // - strict_o_direct == false (where we forgive it being not supported)
1712 // - _kernel_page_cache == true (where we disable it for short-lived test processes)
1713 // Because open() with O_DIRECT will fail, we open it without O_DIRECT, try
1714 // to update it to O_DIRECT with fcntl(), and if that fails, see if we
1716 auto is_tmpfs
= [] (int fd
) {
1717 struct ::statfs buf
;
1718 auto r
= ::fstatfs(fd
, &buf
);
1722 return buf
.f_type
== internal::fs_magic::tmpfs
;
1724 open_flags
|= O_CLOEXEC
;
1726 open_flags
&= ~O_DSYNC
;
1728 auto mode
= static_cast<mode_t
>(options
.create_permissions
);
1729 int fd
= ::open(name
.c_str(), open_flags
, mode
);
1731 return wrap_syscall
<int>(fd
);
1733 int o_direct_flag
= _kernel_page_cache
? 0 : O_DIRECT
;
1734 int r
= ::fcntl(fd
, F_SETFL
, open_flags
| o_direct_flag
);
1735 auto maybe_ret
= wrap_syscall
<int>(r
); // capture errno (should be EINVAL)
1736 if (r
== -1 && strict_o_direct
&& !is_tmpfs(fd
)) {
1740 if (fd
!= -1 && options
.extent_allocation_size_hint
&& !_kernel_page_cache
) {
1742 int r
= ::ioctl(fd
, XFS_IOC_FSGETXATTR
, &attr
);
1743 // xfs delayed allocation is disabled when extent size hints are present.
1744 // This causes tons of xfs log fsyncs. Given that extent size hints are
1745 // unneeded when delayed allocation is available (which is the case
1746 // when not using O_DIRECT), disable them.
1748 // Ignore error; may be !xfs, and just a hint anyway
1750 attr
.fsx_xflags
|= XFS_XFLAG_EXTSIZE
;
1751 attr
.fsx_extsize
= std::min(options
.extent_allocation_size_hint
,
1752 file_open_options::max_extent_allocation_size_hint
);
1754 // Ignore error; may be !xfs, and just a hint anyway
1755 ::ioctl(fd
, XFS_IOC_FSSETXATTR
, &attr
);
1758 return wrap_syscall
<int>(fd
);
1759 }).then([&options
, name
= std::move(name
), &open_flags
] (syscall_result
<int> sr
) {
1760 sr
.throw_fs_exception_if_error("open failed", name
);
1761 return make_file_impl(sr
.result
, options
, open_flags
);
1762 }).then([] (shared_ptr
<file_impl
> impl
) {
1763 return make_ready_future
<file
>(std::move(impl
));
1769 reactor::remove_file(std::string_view pathname
) noexcept
{
1770 // Allocating memory for a sstring can throw, hence the futurize_invoke
1771 return futurize_invoke([pathname
] {
1772 return engine()._thread_pool
->submit
<syscall_result
<int>>([pathname
= sstring(pathname
)] {
1773 return wrap_syscall
<int>(::remove(pathname
.c_str()));
1774 }).then([pathname
= sstring(pathname
)] (syscall_result
<int> sr
) {
1775 sr
.throw_fs_exception_if_error("remove failed", pathname
);
1776 return make_ready_future
<>();
1782 reactor::rename_file(std::string_view old_pathname
, std::string_view new_pathname
) noexcept
{
1783 // Allocating memory for a sstring can throw, hence the futurize_invoke
1784 return futurize_invoke([old_pathname
, new_pathname
] {
1785 return engine()._thread_pool
->submit
<syscall_result
<int>>([old_pathname
= sstring(old_pathname
), new_pathname
= sstring(new_pathname
)] {
1786 return wrap_syscall
<int>(::rename(old_pathname
.c_str(), new_pathname
.c_str()));
1787 }).then([old_pathname
= sstring(old_pathname
), new_pathname
= sstring(new_pathname
)] (syscall_result
<int> sr
) {
1788 sr
.throw_fs_exception_if_error("rename failed", old_pathname
, new_pathname
);
1789 return make_ready_future
<>();
1795 reactor::link_file(std::string_view oldpath
, std::string_view newpath
) noexcept
{
1796 // Allocating memory for a sstring can throw, hence the futurize_invoke
1797 return futurize_invoke([oldpath
, newpath
] {
1798 return engine()._thread_pool
->submit
<syscall_result
<int>>([oldpath
= sstring(oldpath
), newpath
= sstring(newpath
)] {
1799 return wrap_syscall
<int>(::link(oldpath
.c_str(), newpath
.c_str()));
1800 }).then([oldpath
= sstring(oldpath
), newpath
= sstring(newpath
)] (syscall_result
<int> sr
) {
1801 sr
.throw_fs_exception_if_error("link failed", oldpath
, newpath
);
1802 return make_ready_future
<>();
1808 reactor::chmod(std::string_view name
, file_permissions permissions
) noexcept
{
1809 auto mode
= static_cast<mode_t
>(permissions
);
1810 // Allocating memory for a sstring can throw, hence the futurize_invoke
1811 return futurize_invoke([name
, mode
, this] {
1812 return _thread_pool
->submit
<syscall_result
<int>>([name
= sstring(name
), mode
] {
1813 return wrap_syscall
<int>(::chmod(name
.c_str(), mode
));
1814 }).then([name
= sstring(name
), mode
] (syscall_result
<int> sr
) {
1815 if (sr
.result
== -1) {
1816 auto reason
= format("chmod(0{:o}) failed", mode
);
1817 sr
.throw_fs_exception(reason
, fs::path(name
));
1819 return make_ready_future
<>();
1824 directory_entry_type
stat_to_entry_type(__mode_t type
) {
1825 if (S_ISDIR(type
)) {
1826 return directory_entry_type::directory
;
1828 if (S_ISBLK(type
)) {
1829 return directory_entry_type::block_device
;
1831 if (S_ISCHR(type
)) {
1832 return directory_entry_type::char_device
;
1834 if (S_ISFIFO(type
)) {
1835 return directory_entry_type::fifo
;
1837 if (S_ISLNK(type
)) {
1838 return directory_entry_type::link
;
1840 if (S_ISSOCK(type
)) {
1841 return directory_entry_type::socket
;
1843 if (S_ISREG(type
)) {
1844 return directory_entry_type::regular
;
1846 return directory_entry_type::unknown
;
1849 future
<std::optional
<directory_entry_type
>>
1850 reactor::file_type(std::string_view name
, follow_symlink follow
) noexcept
{
1851 // Allocating memory for a sstring can throw, hence the futurize_invoke
1852 return futurize_invoke([name
, follow
, this] {
1853 return _thread_pool
->submit
<syscall_result_extra
<struct stat
>>([name
= sstring(name
), follow
] {
1855 auto stat_syscall
= follow
? stat
: lstat
;
1856 auto ret
= stat_syscall(name
.c_str(), &st
);
1857 return wrap_syscall(ret
, st
);
1858 }).then([name
= sstring(name
)] (syscall_result_extra
<struct stat
> sr
) {
1859 if (long(sr
.result
) == -1) {
1860 if (sr
.error
!= ENOENT
&& sr
.error
!= ENOTDIR
) {
1861 sr
.throw_fs_exception_if_error("stat failed", name
);
1863 return make_ready_future
<std::optional
<directory_entry_type
> >
1864 (std::optional
<directory_entry_type
>() );
1866 return make_ready_future
<std::optional
<directory_entry_type
> >
1867 (std::optional
<directory_entry_type
>(stat_to_entry_type(sr
.extra
.st_mode
)) );
1872 future
<std::optional
<directory_entry_type
>>
1873 file_type(std::string_view name
, follow_symlink follow
) noexcept
{
1874 return engine().file_type(name
, follow
);
1877 static std::chrono::system_clock::time_point
1878 timespec_to_time_point(const timespec
& ts
) {
1879 auto d
= std::chrono::duration_cast
<std::chrono::system_clock::duration
>(
1880 ts
.tv_sec
* 1s
+ ts
.tv_nsec
* 1ns
);
1881 return std::chrono::system_clock::time_point(d
);
1885 reactor::fstat(int fd
) noexcept
{
1886 return _thread_pool
->submit
<syscall_result_extra
<struct stat
>>([fd
] {
1888 auto ret
= ::fstat(fd
, &st
);
1889 return wrap_syscall(ret
, st
);
1890 }).then([] (syscall_result_extra
<struct stat
> ret
) {
1891 ret
.throw_if_error();
1892 return make_ready_future
<struct stat
>(ret
.extra
);
1897 reactor::inotify_add_watch(int fd
, std::string_view path
, uint32_t flags
) {
1898 // Allocating memory for a sstring can throw, hence the futurize_invoke
1899 return futurize_invoke([path
, fd
, flags
, this] {
1900 return _thread_pool
->submit
<syscall_result
<int>>([fd
, path
= sstring(path
), flags
] {
1901 auto ret
= ::inotify_add_watch(fd
, path
.c_str(), flags
);
1902 return wrap_syscall(ret
);
1903 }).then([] (syscall_result
<int> ret
) {
1904 ret
.throw_if_error();
1905 return make_ready_future
<int>(ret
.result
);
1910 future
<std::tuple
<file_desc
, file_desc
>>
1911 reactor::make_pipe() {
1912 return do_with(std::array
<int, 2>{}, [this] (auto& pipe
) {
1913 return _thread_pool
->submit
<syscall_result
<int>>([&pipe
] {
1914 return wrap_syscall
<int>(::pipe2(pipe
.data(), O_NONBLOCK
));
1915 }).then([&pipe
] (syscall_result
<int> ret
) {
1916 ret
.throw_if_error();
1917 return make_ready_future
<std::tuple
<file_desc
, file_desc
>>(file_desc::from_fd(pipe
[0]),
1918 file_desc::from_fd(pipe
[1]));
1923 future
<std::tuple
<pid_t
, file_desc
, file_desc
, file_desc
>>
1924 reactor::spawn(std::string_view pathname
,
1925 std::vector
<sstring
> argv
,
1926 std::vector
<sstring
> env
) {
1927 return when_all_succeed(make_pipe(),
1929 make_pipe()).then_unpack([pathname
= sstring(pathname
),
1930 argv
= std::move(argv
),
1931 env
= std::move(env
), this] (std::tuple
<file_desc
, file_desc
> cin_pipe
,
1932 std::tuple
<file_desc
, file_desc
> cout_pipe
,
1933 std::tuple
<file_desc
, file_desc
> cerr_pipe
) mutable {
1934 return do_with(pid_t
{},
1935 std::move(cin_pipe
),
1936 std::move(cout_pipe
),
1937 std::move(cerr_pipe
),
1938 std::move(pathname
),
1939 posix_spawn_file_actions_t
{},
1940 posix_spawnattr_t
{},
1943 [this](auto& child_pid
, auto& cin_pipe
, auto& cout_pipe
, auto& cerr_pipe
, auto& pathname
, auto& actions
, auto& attr
, auto& argv
, auto& env
) {
1944 static constexpr int pipefd_read_end
= 0;
1945 static constexpr int pipefd_write_end
= 1;
1946 // Allocating memory for spawn {file actions,attributes} objects can throw, hence the futurize_invoke
1947 return futurize_invoke([&child_pid
, &cin_pipe
, &cout_pipe
, &cerr_pipe
, &pathname
, &actions
, &attr
, &argv
, &env
, this] {
1948 // the args and envs parameters passed to posix_spawn() should be array of pointers, and
1949 // the last one should be a null pointer.
1950 std::vector
<const char*> argvp
;
1951 std::transform(argv
.cbegin(), argv
.cend(), std::back_inserter(argvp
),
1952 [](auto& s
) { return s
.c_str(); });
1953 argvp
.push_back(nullptr);
1955 std::vector
<const char*> envp
;
1956 std::transform(env
.cbegin(), env
.cend(), std::back_inserter(envp
),
1957 [](auto& s
) { return s
.c_str(); });
1958 envp
.push_back(nullptr);
1961 r
= ::posix_spawn_file_actions_init(&actions
);
1962 throw_pthread_error(r
);
1963 // the child process does not write to stdin
1964 std::get
<pipefd_write_end
>(cin_pipe
).spawn_actions_add_close(&actions
);
1965 // the child process does not read from stdout
1966 std::get
<pipefd_read_end
>(cout_pipe
).spawn_actions_add_close(&actions
);
1967 // the child process does not read from stderr
1968 std::get
<pipefd_read_end
>(cerr_pipe
).spawn_actions_add_close(&actions
);
1969 // redirect stdin, stdout and stderr to cin_pipe, cout_pipe and cerr_pipe respectively
1970 std::get
<pipefd_read_end
>(cin_pipe
).spawn_actions_add_dup2(&actions
, STDIN_FILENO
);
1971 std::get
<pipefd_write_end
>(cout_pipe
).spawn_actions_add_dup2(&actions
, STDOUT_FILENO
);
1972 std::get
<pipefd_write_end
>(cerr_pipe
).spawn_actions_add_dup2(&actions
, STDERR_FILENO
);
1973 // after dup2() the interesting ends of pipes, close them
1974 std::get
<pipefd_read_end
>(cin_pipe
).spawn_actions_add_close(&actions
);
1975 std::get
<pipefd_write_end
>(cout_pipe
).spawn_actions_add_close(&actions
);
1976 std::get
<pipefd_write_end
>(cerr_pipe
).spawn_actions_add_close(&actions
);
1977 r
= ::posix_spawnattr_init(&attr
);
1978 throw_pthread_error(r
);
1979 // make sure the following signals are not ignored by the child process
1980 sigset_t default_signals
;
1981 sigemptyset(&default_signals
);
1982 sigaddset(&default_signals
, SIGINT
);
1983 sigaddset(&default_signals
, SIGTERM
);
1984 r
= ::posix_spawnattr_setsigdefault(&attr
, &default_signals
);
1985 throw_pthread_error(r
);
1986 // make sure no signals are marked in the child process
1987 sigset_t mask_signals
;
1988 sigemptyset(&mask_signals
);
1989 r
= ::posix_spawnattr_setsigmask(&attr
, &mask_signals
);
1990 throw_pthread_error(r
);
1991 r
= ::posix_spawnattr_setflags(&attr
, POSIX_SPAWN_SETSIGDEF
| POSIX_SPAWN_SETSIGMASK
);
1992 throw_pthread_error(r
);
1994 return _thread_pool
->submit
<syscall_result
<int>>([&child_pid
, &pathname
, &actions
, &attr
,
1995 argv
= std::move(argvp
),
1996 env
= std::move(envp
)] {
1997 return wrap_syscall
<int>(::posix_spawn(&child_pid
, pathname
.c_str(), &actions
, &attr
,
1998 const_cast<char* const *>(argv
.data()),
1999 const_cast<char* const *>(env
.data())));
2001 }).finally([&actions
, &attr
] {
2002 posix_spawn_file_actions_destroy(&actions
);
2003 posix_spawnattr_destroy(&attr
);
2004 }).then([&child_pid
, &cin_pipe
, &cout_pipe
, &cerr_pipe
] (syscall_result
<int> ret
) {
2005 throw_pthread_error(ret
.result
);
2006 return make_ready_future
<std::tuple
<pid_t
, file_desc
, file_desc
, file_desc
>>(
2008 std::get
<pipefd_write_end
>(std::move(cin_pipe
)),
2009 std::get
<pipefd_read_end
>(std::move(cout_pipe
)),
2010 std::get
<pipefd_read_end
>(std::move(cerr_pipe
)));
2016 static auto next_waitpid_timeout(std::chrono::milliseconds this_timeout
) {
2017 static const std::chrono::milliseconds
step_timeout(20);
2018 static const std::chrono::milliseconds
max_timeout(1000);
2019 if (this_timeout
>= max_timeout
) {
2022 return this_timeout
+ step_timeout
;
2025 #ifndef __NR_pidfd_open
2027 # if defined(__alpha__)
2028 # define __NR_pidfd_open 544
2030 # define __NR_pidfd_open 434
2035 future
<int> reactor::waitpid(pid_t pid
) {
2036 return _thread_pool
->submit
<syscall_result
<int>>([pid
] {
2037 return wrap_syscall
<int>(syscall(__NR_pidfd_open
, pid
, O_NONBLOCK
));
2038 }).then([pid
, this] (syscall_result
<int> pidfd
) {
2039 if (pidfd
.result
== -1) {
2040 // pidfd_open() was introduced in linux 5.3, so the pidfd.error could be ENOSYS on
2041 // older kernels. But it could be other error like EMFILE or ENFILE. anyway, we
2042 // should always waitpid().
2043 return do_with(int{}, std::chrono::milliseconds(0), [pid
, this](int& wstatus
,
2044 std::chrono::milliseconds
& wait_timeout
) {
2045 return repeat_until_value([this,
2049 return _thread_pool
->submit
<syscall_result
<pid_t
>>([pid
, &wstatus
] {
2050 return wrap_syscall
<pid_t
>(::waitpid(pid
, &wstatus
, WNOHANG
));
2051 }).then([&wstatus
, &wait_timeout
] (syscall_result
<pid_t
> ret
) mutable {
2052 if (ret
.result
== 0) {
2053 wait_timeout
= next_waitpid_timeout(wait_timeout
);
2054 return ::seastar::sleep(wait_timeout
).then([] {
2055 return make_ready_future
<std::optional
<int>>();
2057 } else if (ret
.result
> 0) {
2058 return make_ready_future
<std::optional
<int>>(wstatus
);
2060 ret
.throw_if_error();
2061 return make_ready_future
<std::optional
<int>>(-1);
2067 return do_with(pollable_fd(file_desc::from_fd(pidfd
.result
)), int{}, [pid
, this](auto& pidfd
, int& wstatus
) {
2068 return pidfd
.readable().then([pid
, &wstatus
, this] {
2069 return _thread_pool
->submit
<syscall_result
<pid_t
>>([pid
, &wstatus
] {
2070 return wrap_syscall
<pid_t
>(::waitpid(pid
, &wstatus
, WNOHANG
));
2072 }).then([&wstatus
] (syscall_result
<pid_t
> ret
) {
2073 ret
.throw_if_error();
2074 assert(ret
.result
> 0);
2075 return make_ready_future
<int>(wstatus
);
2082 void reactor::kill(pid_t pid
, int sig
) {
2083 auto ret
= wrap_syscall
<int>(::kill(pid
, sig
));
2084 ret
.throw_if_error();
2088 reactor::file_stat(std::string_view pathname
, follow_symlink follow
) noexcept
{
2089 // Allocating memory for a sstring can throw, hence the futurize_invoke
2090 return futurize_invoke([pathname
, follow
, this] {
2091 return _thread_pool
->submit
<syscall_result_extra
<struct stat
>>([pathname
= sstring(pathname
), follow
] {
2093 auto stat_syscall
= follow
? stat
: lstat
;
2094 auto ret
= stat_syscall(pathname
.c_str(), &st
);
2095 return wrap_syscall(ret
, st
);
2096 }).then([pathname
= sstring(pathname
)] (syscall_result_extra
<struct stat
> sr
) {
2097 sr
.throw_fs_exception_if_error("stat failed", pathname
);
2098 struct stat
& st
= sr
.extra
;
2100 sd
.device_id
= st
.st_dev
;
2101 sd
.inode_number
= st
.st_ino
;
2102 sd
.mode
= st
.st_mode
;
2103 sd
.type
= stat_to_entry_type(st
.st_mode
);
2104 sd
.number_of_links
= st
.st_nlink
;
2107 sd
.rdev
= st
.st_rdev
;
2108 sd
.size
= st
.st_size
;
2109 sd
.block_size
= st
.st_blksize
;
2110 sd
.allocated_size
= st
.st_blocks
* 512UL;
2111 sd
.time_accessed
= timespec_to_time_point(st
.st_atim
);
2112 sd
.time_modified
= timespec_to_time_point(st
.st_mtim
);
2113 sd
.time_changed
= timespec_to_time_point(st
.st_ctim
);
2114 return make_ready_future
<stat_data
>(std::move(sd
));
2120 reactor::file_size(std::string_view pathname
) noexcept
{
2121 return file_stat(pathname
, follow_symlink::yes
).then([] (stat_data sd
) {
2122 return make_ready_future
<uint64_t>(sd
.size
);
2127 reactor::file_accessible(std::string_view pathname
, access_flags flags
) noexcept
{
2128 // Allocating memory for a sstring can throw, hence the futurize_invoke
2129 return futurize_invoke([pathname
, flags
, this] {
2130 return _thread_pool
->submit
<syscall_result
<int>>([pathname
= sstring(pathname
), flags
] {
2131 auto aflags
= std::underlying_type_t
<access_flags
>(flags
);
2132 auto ret
= ::access(pathname
.c_str(), aflags
);
2133 return wrap_syscall(ret
);
2134 }).then([pathname
= sstring(pathname
), flags
] (syscall_result
<int> sr
) {
2135 if (sr
.result
< 0) {
2136 if ((sr
.error
== ENOENT
&& flags
== access_flags::exists
) ||
2137 (sr
.error
== EACCES
&& flags
!= access_flags::exists
)) {
2138 return make_ready_future
<bool>(false);
2140 sr
.throw_fs_exception("access failed", fs::path(pathname
));
2143 return make_ready_future
<bool>(true);
2149 reactor::file_system_at(std::string_view pathname
) noexcept
{
2150 // Allocating memory for a sstring can throw, hence the futurize_invoke
2151 return futurize_invoke([pathname
, this] {
2152 return _thread_pool
->submit
<syscall_result_extra
<struct statfs
>>([pathname
= sstring(pathname
)] {
2154 auto ret
= statfs(pathname
.c_str(), &st
);
2155 return wrap_syscall(ret
, st
);
2156 }).then([pathname
= sstring(pathname
)] (syscall_result_extra
<struct statfs
> sr
) {
2157 static std::unordered_map
<long int, fs_type
> type_mapper
= {
2158 { internal::fs_magic::xfs
, fs_type::xfs
},
2159 { internal::fs_magic::ext2
, fs_type::ext2
},
2160 { internal::fs_magic::ext3
, fs_type::ext3
},
2161 { internal::fs_magic::ext4
, fs_type::ext4
},
2162 { internal::fs_magic::btrfs
, fs_type::btrfs
},
2163 { internal::fs_magic::hfs
, fs_type::hfs
},
2164 { internal::fs_magic::tmpfs
, fs_type::tmpfs
},
2166 sr
.throw_fs_exception_if_error("statfs failed", pathname
);
2168 fs_type ret
= fs_type::other
;
2169 if (type_mapper
.count(sr
.extra
.f_type
) != 0) {
2170 ret
= type_mapper
.at(sr
.extra
.f_type
);
2172 return make_ready_future
<fs_type
>(ret
);
2177 future
<struct statfs
>
2178 reactor::fstatfs(int fd
) noexcept
{
2179 return _thread_pool
->submit
<syscall_result_extra
<struct statfs
>>([fd
] {
2181 auto ret
= ::fstatfs(fd
, &st
);
2182 return wrap_syscall(ret
, st
);
2183 }).then([] (syscall_result_extra
<struct statfs
> sr
) {
2184 sr
.throw_if_error();
2185 struct statfs st
= sr
.extra
;
2186 return make_ready_future
<struct statfs
>(std::move(st
));
2190 future
<struct statvfs
>
2191 reactor::statvfs(std::string_view pathname
) noexcept
{
2192 // Allocating memory for a sstring can throw, hence the futurize_invoke
2193 return futurize_invoke([pathname
, this] {
2194 return _thread_pool
->submit
<syscall_result_extra
<struct statvfs
>>([pathname
= sstring(pathname
)] {
2196 auto ret
= ::statvfs(pathname
.c_str(), &st
);
2197 return wrap_syscall(ret
, st
);
2198 }).then([pathname
= sstring(pathname
)] (syscall_result_extra
<struct statvfs
> sr
) {
2199 sr
.throw_fs_exception_if_error("statvfs failed", pathname
);
2200 struct statvfs st
= sr
.extra
;
2201 return make_ready_future
<struct statvfs
>(std::move(st
));
2207 reactor::open_directory(std::string_view name
) noexcept
{
2208 // Allocating memory for a sstring can throw, hence the futurize_invoke
2209 return futurize_invoke([name
, this] {
2210 auto oflags
= O_DIRECTORY
| O_CLOEXEC
| O_RDONLY
;
2211 return _thread_pool
->submit
<syscall_result
<int>>([name
= sstring(name
), oflags
] {
2212 return wrap_syscall
<int>(::open(name
.c_str(), oflags
));
2213 }).then([name
= sstring(name
), oflags
] (syscall_result
<int> sr
) {
2214 sr
.throw_fs_exception_if_error("open failed", name
);
2215 return make_file_impl(sr
.result
, file_open_options(), oflags
);
2216 }).then([] (shared_ptr
<file_impl
> file_impl
) {
2217 return make_ready_future
<file
>(std::move(file_impl
));
2223 reactor::make_directory(std::string_view name
, file_permissions permissions
) noexcept
{
2224 // Allocating memory for a sstring can throw, hence the futurize_invoke
2225 return futurize_invoke([name
, permissions
, this] {
2226 return _thread_pool
->submit
<syscall_result
<int>>([name
= sstring(name
), permissions
] {
2227 auto mode
= static_cast<mode_t
>(permissions
);
2228 return wrap_syscall
<int>(::mkdir(name
.c_str(), mode
));
2229 }).then([name
= sstring(name
)] (syscall_result
<int> sr
) {
2230 sr
.throw_fs_exception_if_error("mkdir failed", name
);
2236 reactor::touch_directory(std::string_view name
, file_permissions permissions
) noexcept
{
2237 // Allocating memory for a sstring can throw, hence the futurize_invoke
2238 return futurize_invoke([name
, permissions
] {
2239 return engine()._thread_pool
->submit
<syscall_result
<int>>([name
= sstring(name
), permissions
] {
2240 auto mode
= static_cast<mode_t
>(permissions
);
2241 return wrap_syscall
<int>(::mkdir(name
.c_str(), mode
));
2242 }).then([name
= sstring(name
)] (syscall_result
<int> sr
) {
2243 if (sr
.result
== -1 && sr
.error
!= EEXIST
) {
2244 sr
.throw_fs_exception("mkdir failed", fs::path(name
));
2246 return make_ready_future
<>();
2252 reactor::fdatasync(int fd
) noexcept
{
2254 if (_bypass_fsync
) {
2255 return make_ready_future
<>();
2257 if (_have_aio_fsync
) {
2258 // Does not go through the I/O queue, but has to be deleted
2259 struct fsync_io_desc final
: public io_completion
{
2262 virtual void complete(size_t res
) noexcept override
{
2267 virtual void set_exception(std::exception_ptr eptr
) noexcept override
{
2268 _pr
.set_exception(std::move(eptr
));
2272 future
<> get_future() {
2273 return _pr
.get_future();
2277 return futurize_invoke([this, fd
] {
2278 auto desc
= new fsync_io_desc
;
2279 auto fut
= desc
->get_future();
2280 auto req
= internal::io_request::make_fdatasync(fd
);
2281 _io_sink
.submit(desc
, std::move(req
));
2285 return _thread_pool
->submit
<syscall_result
<int>>([fd
] {
2286 return wrap_syscall
<int>(::fdatasync(fd
));
2287 }).then([] (syscall_result
<int> sr
) {
2288 sr
.throw_if_error();
2289 return make_ready_future
<>();
2293 // Note: terminate if arm_highres_timer throws
2294 // `when` should always be valid
2295 void reactor::enable_timer(steady_clock_type::time_point when
) noexcept
2299 its
.it_interval
= {};
2300 its
.it_value
= to_timespec(when
);
2301 _backend
->arm_highres_timer(its
);
2303 using ns
= std::chrono::nanoseconds
;
2304 WITH_LOCK(_timer_mutex
) {
2305 _timer_due
= std::chrono::duration_cast
<ns
>(when
.time_since_epoch()).count();
2306 _timer_cond
.wake_one();
2311 void reactor::add_timer(timer
<steady_clock_type
>* tmr
) noexcept
{
2312 if (queue_timer(tmr
)) {
2313 enable_timer(_timers
.get_next_timeout());
2317 bool reactor::queue_timer(timer
<steady_clock_type
>* tmr
) noexcept
{
2318 return _timers
.insert(*tmr
);
2321 void reactor::del_timer(timer
<steady_clock_type
>* tmr
) noexcept
{
2322 if (tmr
->_expired
) {
2323 _expired_timers
.erase(_expired_timers
.iterator_to(*tmr
));
2324 tmr
->_expired
= false;
2326 _timers
.remove(*tmr
);
2330 void reactor::add_timer(timer
<lowres_clock
>* tmr
) noexcept
{
2331 if (queue_timer(tmr
)) {
2332 _lowres_next_timeout
= _lowres_timers
.get_next_timeout();
2336 bool reactor::queue_timer(timer
<lowres_clock
>* tmr
) noexcept
{
2337 return _lowres_timers
.insert(*tmr
);
2340 void reactor::del_timer(timer
<lowres_clock
>* tmr
) noexcept
{
2341 if (tmr
->_expired
) {
2342 _expired_lowres_timers
.erase(_expired_lowres_timers
.iterator_to(*tmr
));
2343 tmr
->_expired
= false;
2345 _lowres_timers
.remove(*tmr
);
2349 void reactor::add_timer(timer
<manual_clock
>* tmr
) noexcept
{
2353 bool reactor::queue_timer(timer
<manual_clock
>* tmr
) noexcept
{
2354 return _manual_timers
.insert(*tmr
);
2357 void reactor::del_timer(timer
<manual_clock
>* tmr
) noexcept
{
2358 if (tmr
->_expired
) {
2359 _expired_manual_timers
.erase(_expired_manual_timers
.iterator_to(*tmr
));
2360 tmr
->_expired
= false;
2362 _manual_timers
.remove(*tmr
);
2366 void reactor::at_exit(noncopyable_function
<future
<> ()> func
) {
2368 _exit_funcs
.push_back(std::move(func
));
2371 future
<> reactor::run_exit_tasks() {
2372 _stop_requested
.broadcast();
2374 stop_aio_eventfd_loop();
2375 return do_for_each(_exit_funcs
.rbegin(), _exit_funcs
.rend(), [] (auto& func
) {
2380 void reactor::stop() {
2382 _smp
->cleanup_cpu();
2384 // Run exit tasks locally and then stop all other engines
2385 // in the background and wait on semaphore for all to complete.
2386 // Finally, set _stopped on cpu 0.
2387 (void)run_exit_tasks().then([this] {
2388 return do_with(semaphore(0), [this] (semaphore
& sem
) {
2389 // Stop other cpus asynchronously, signal when done.
2390 (void)smp::invoke_on_others(0, [] {
2391 engine()._smp
->cleanup_cpu();
2392 return engine().run_exit_tasks().then([] {
2393 engine()._stopped
= true;
2398 return sem
.wait().then([this] {
2406 void reactor::exit(int ret
) {
2407 // Run stop() asynchronously on cpu 0.
2408 (void)smp::submit_to(0, [this, ret
] { _return
= ret
; stop(); });
2412 reactor::pending_task_count() const {
2414 for (auto&& tq
: _task_queues
) {
2415 ret
+= tq
->_q
.size();
2421 reactor::tasks_processed() const {
2422 return _global_tasks_processed
;
2425 void reactor::register_metrics() {
2427 namespace sm
= seastar::metrics
;
2429 _metric_groups
.add_group("reactor", {
2430 sm::make_gauge("tasks_pending", std::bind(&reactor::pending_task_count
, this), sm::description("Number of pending tasks in the queue")),
2431 // total_operations value:DERIVE:0:U
2432 sm::make_counter("tasks_processed", std::bind(&reactor::tasks_processed
, this), sm::description("Total tasks processed")),
2433 sm::make_counter("polls", _polls
, sm::description("Number of times pollers were executed")),
2434 sm::make_gauge("timers_pending", std::bind(&decltype(_timers
)::size
, &_timers
), sm::description("Number of tasks in the timer-pending queue")),
2435 sm::make_gauge("utilization", [this] { return (1-_load
) * 100; }, sm::description("CPU utilization")),
2436 sm::make_counter("cpu_busy_ms", [this] () -> int64_t { return total_busy_time() / 1ms
; },
2437 sm::description("Total cpu busy time in milliseconds")),
2438 sm::make_counter("cpu_steal_time_ms", [this] () -> int64_t { return total_steal_time() / 1ms
; },
2439 sm::description("Total steal time, the time in which some other process was running while Seastar was not trying to run (not sleeping)."
2440 "Because this is in userspace, some time that could be legitimally thought as steal time is not accounted as such. For example, if we are sleeping and can wake up but the kernel hasn't woken us up yet.")),
2441 // total_operations value:DERIVE:0:U
2442 sm::make_counter("aio_reads", _io_stats
.aio_reads
, sm::description("Total aio-reads operations")),
2444 sm::make_total_bytes("aio_bytes_read", _io_stats
.aio_read_bytes
, sm::description("Total aio-reads bytes")),
2445 // total_operations value:DERIVE:0:U
2446 sm::make_counter("aio_writes", _io_stats
.aio_writes
, sm::description("Total aio-writes operations")),
2447 sm::make_total_bytes("aio_bytes_write", _io_stats
.aio_write_bytes
, sm::description("Total aio-writes bytes")),
2448 sm::make_counter("aio_outsizes", _io_stats
.aio_outsizes
, sm::description("Total number of aio operations that exceed IO limit")),
2449 sm::make_counter("aio_errors", _io_stats
.aio_errors
, sm::description("Total aio errors")),
2450 // total_operations value:DERIVE:0:U
2451 sm::make_counter("fsyncs", _fsyncs
, sm::description("Total number of fsync operations")),
2452 // total_operations value:DERIVE:0:U
2453 sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::operation_count
, _thread_pool
.get()),
2454 sm::description("Total number of io-threaded-fallbacks operations")),
2458 _metric_groups
.add_group("memory", {
2459 sm::make_counter("malloc_operations", [] { return memory::stats().mallocs(); },
2460 sm::description("Total number of malloc operations")),
2461 sm::make_counter("free_operations", [] { return memory::stats().frees(); }, sm::description("Total number of free operations")),
2462 sm::make_counter("cross_cpu_free_operations", [] { return memory::stats().cross_cpu_frees(); }, sm::description("Total number of cross cpu free")),
2463 sm::make_gauge("malloc_live_objects", [] { return memory::stats().live_objects(); }, sm::description("Number of live objects")),
2464 sm::make_current_bytes("free_memory", [] { return memory::stats().free_memory(); }, sm::description("Free memory size in bytes")),
2465 sm::make_current_bytes("total_memory", [] { return memory::stats().total_memory(); }, sm::description("Total memory size in bytes")),
2466 sm::make_current_bytes("allocated_memory", [] { return memory::stats().allocated_memory(); }, sm::description("Allocated memory size in bytes")),
2467 sm::make_counter("reclaims_operations", [] { return memory::stats().reclaims(); }, sm::description("Total reclaims operations")),
2468 sm::make_counter("malloc_failed", [] { return memory::stats().failed_allocations(); }, sm::description("Total count of failed memory allocations"))
2471 _metric_groups
.add_group("reactor", {
2472 sm::make_counter("logging_failures", [] { return logging_failures
; }, sm::description("Total number of logging failures")),
2473 // total_operations value:DERIVE:0:U
2474 sm::make_counter("cpp_exceptions", _cxx_exceptions
, sm::description("Total number of C++ exceptions")),
2475 sm::make_counter("abandoned_failed_futures", _abandoned_failed_futures
, sm::description("Total number of abandoned failed futures, futures destroyed while still containing an exception")),
2478 using namespace seastar::metrics
;
2479 _metric_groups
.add_group("reactor", {
2480 make_counter("fstream_reads", _io_stats
.fstream_reads
,
2482 "Counts reads from disk file streams. A high rate indicates high disk activity."
2483 " Contrast with other fstream_read* counters to locate bottlenecks.")),
2484 make_counter("fstream_read_bytes", _io_stats
.fstream_read_bytes
,
2486 "Counts bytes read from disk file streams. A high rate indicates high disk activity."
2487 " Divide by fstream_reads to determine average read size.")),
2488 make_counter("fstream_reads_blocked", _io_stats
.fstream_reads_blocked
,
2490 "Counts the number of times a disk read could not be satisfied from read-ahead buffers, and had to block."
2491 " Indicates short streams, or incorrect read ahead configuration.")),
2492 make_counter("fstream_read_bytes_blocked", _io_stats
.fstream_read_bytes_blocked
,
2494 "Counts the number of bytes read from disk that could not be satisfied from read-ahead buffers, and had to block."
2495 " Indicates short streams, or incorrect read ahead configuration.")),
2496 make_counter("fstream_reads_aheads_discarded", _io_stats
.fstream_read_aheads_discarded
,
2498 "Counts the number of times a buffer that was read ahead of time and was discarded because it was not needed, wasting disk bandwidth."
2499 " Indicates over-eager read ahead configuration.")),
2500 make_counter("fstream_reads_ahead_bytes_discarded", _io_stats
.fstream_read_ahead_discarded_bytes
,
2502 "Counts the number of buffered bytes that were read ahead of time and were discarded because they were not needed, wasting disk bandwidth."
2503 " Indicates over-eager read ahead configuration.")),
2507 void reactor::run_tasks(task_queue
& tq
) {
2508 // Make sure new tasks will inherit our scheduling group
2509 *internal::current_scheduling_group_ptr() = scheduling_group(tq
._id
);
2510 auto& tasks
= tq
._q
;
2511 while (!tasks
.empty()) {
2512 auto tsk
= tasks
.front();
2514 STAP_PROBE(seastar
, reactor_run_tasks_single_start
);
2515 internal::task_histogram_add_task(*tsk
);
2516 _current_task
= tsk
;
2517 tsk
->run_and_dispose();
2518 _current_task
= nullptr;
2519 STAP_PROBE(seastar
, reactor_run_tasks_single_end
);
2520 ++tq
._tasks_processed
;
2521 ++_global_tasks_processed
;
2522 // check at end of loop, to allow at least one task to run
2523 if (need_preempt()) {
2524 if (tasks
.size() <= _max_task_backlog
) {
2527 // While need_preempt() is set, task execution is inefficient due to
2528 // need_preempt() checks breaking out of loops and .then() calls. See
2530 reset_preemption_monitor();
2538 #ifdef SEASTAR_SHUFFLE_TASK_QUEUE
2539 void shuffle(task
*& t
, circular_buffer
<task
*>& q
) {
2540 static thread_local
std::mt19937 gen
= std::mt19937(std::default_random_engine()());
2541 std::uniform_int_distribution
<size_t> tasks_dist
{0, q
.size() - 1};
2542 auto& to_swap
= q
[tasks_dist(gen
)];
2543 std::swap(to_swap
, t
);
2546 void shuffle(task
*&, circular_buffer
<task
*>&) {
2552 void reactor::force_poll() {
2553 request_preemption();
2557 reactor::flush_tcp_batches() {
2558 bool work
= !_flush_batching
.empty();
2559 while (!_flush_batching
.empty()) {
2560 auto& os
= _flush_batching
.front();
2561 _flush_batching
.pop_front();
2568 reactor::do_expire_lowres_timers() noexcept
{
2569 auto now
= lowres_clock::now();
2570 if (now
>= _lowres_next_timeout
) {
2571 complete_timers(_lowres_timers
, _expired_lowres_timers
, [this] () noexcept
{
2572 if (!_lowres_timers
.empty()) {
2573 _lowres_next_timeout
= _lowres_timers
.get_next_timeout();
2575 _lowres_next_timeout
= lowres_clock::time_point::max();
2584 reactor::expire_manual_timers() noexcept
{
2585 complete_timers(_manual_timers
, _expired_manual_timers
, [] () noexcept
{});
2589 manual_clock::expire_timers() noexcept
{
2590 local_engine
->expire_manual_timers();
2594 manual_clock::advance(manual_clock::duration d
) noexcept
{
2595 _now
.fetch_add(d
.count());
2597 schedule_urgent(make_task(default_scheduling_group(), &manual_clock::expire_timers
));
2598 // Expire timers on all cores in the background.
2599 (void)smp::invoke_on_all(&manual_clock::expire_timers
);
2604 reactor::do_check_lowres_timers() const noexcept
{
2605 return lowres_clock::now() > _lowres_next_timeout
;
2610 class reactor::kernel_submit_work_pollfn final
: public simple_pollfn
<true> {
2613 kernel_submit_work_pollfn(reactor
& r
) : _r(r
) {}
2614 virtual bool poll() override final
{
2615 return _r
._backend
->kernel_submit_work();
2621 class reactor::signal_pollfn final
: public reactor::pollfn
{
2624 signal_pollfn(reactor
& r
) : _r(r
) {}
2625 virtual bool poll() final override
{
2626 return _r
._signals
.poll_signal();
2628 virtual bool pure_poll() override final
{
2629 return _r
._signals
.pure_poll_signal();
2631 virtual bool try_enter_interrupt_mode() override
{
2632 // Signals will interrupt our epoll_pwait() call, but
2633 // disable them now to avoid a signal between this point
2634 // and epoll_pwait()
2636 sigfillset(&block_all
);
2637 ::pthread_sigmask(SIG_SETMASK
, &block_all
, &_r
._active_sigmask
);
2639 // raced already, and lost
2640 exit_interrupt_mode();
2645 virtual void exit_interrupt_mode() override final
{
2646 ::pthread_sigmask(SIG_SETMASK
, &_r
._active_sigmask
, nullptr);
2650 class reactor::batch_flush_pollfn final
: public simple_pollfn
<true> {
2653 batch_flush_pollfn(reactor
& r
) : _r(r
) {}
2654 virtual bool poll() final override
{
2655 return _r
.flush_tcp_batches();
2659 class reactor::reap_kernel_completions_pollfn final
: public reactor::pollfn
{
2662 reap_kernel_completions_pollfn(reactor
& r
) : _r(r
) {}
2663 virtual bool poll() final override
{
2664 return _r
.reap_kernel_completions();
2666 virtual bool pure_poll() override final
{
2667 return poll(); // actually performs work, but triggers no user continuations, so okay
2669 virtual bool try_enter_interrupt_mode() override
{
2670 return _r
._backend
->kernel_events_can_sleep();
2672 virtual void exit_interrupt_mode() override final
{
2676 class reactor::io_queue_submission_pollfn final
: public reactor::pollfn
{
2678 // Wake-up the reactor with highres timer when the io-queue
2679 // decides to delay dispatching until some time point in
2681 timer
<> _nearest_wakeup
{ [this] { _armed
= false; } };
2682 bool _armed
= false;
2684 io_queue_submission_pollfn(reactor
& r
) : _r(r
) {}
2685 virtual bool poll() final override
{
2686 return _r
.flush_pending_aio();
2688 virtual bool pure_poll() override final
{
2691 virtual bool try_enter_interrupt_mode() override
{
2692 auto next
= _r
.next_pending_aio();
2693 auto now
= steady_clock_type::now();
2697 _nearest_wakeup
.arm(next
);
2701 virtual void exit_interrupt_mode() override final
{
2703 _nearest_wakeup
.cancel();
2709 // Other cpus can queue items for us to free; and they won't notify
2710 // us about them. But it's okay to ignore those items, freeing them
2711 // doesn't have any side effects.
2713 // We'll take care of those items when we wake up for another reason.
2714 class reactor::drain_cross_cpu_freelist_pollfn final
: public simple_pollfn
<true> {
2716 virtual bool poll() final override
{
2717 return memory::drain_cross_cpu_freelist();
2721 class reactor::lowres_timer_pollfn final
: public reactor::pollfn
{
2723 // A highres timer is implemented as a waking signal; so
2724 // we arm one when we have a lowres timer during sleep, so
2725 // it can wake us up.
2726 timer
<> _nearest_wakeup
{ [this] { _armed
= false; } };
2727 bool _armed
= false;
2729 lowres_timer_pollfn(reactor
& r
) : _r(r
) {}
2730 virtual bool poll() final override
{
2731 return _r
.do_expire_lowres_timers();
2733 virtual bool pure_poll() final override
{
2734 return _r
.do_check_lowres_timers();
2736 virtual bool try_enter_interrupt_mode() override
{
2737 // arm our highres timer so a signal will wake us up
2738 auto next
= _r
._lowres_next_timeout
;
2739 if (next
== lowres_clock::time_point::max()) {
2740 // no pending timers
2743 auto now
= lowres_clock::now();
2748 _nearest_wakeup
.arm(next
- now
);
2752 virtual void exit_interrupt_mode() override final
{
2754 _nearest_wakeup
.cancel();
2760 class reactor::smp_pollfn final
: public reactor::pollfn
{
2763 smp_pollfn(reactor
& r
) : _r(r
) {}
2764 virtual bool poll() final override
{
2765 // Avoid short-circuiting with `||` since there are side effects
2766 // we want to take place (instantiating tasks from the alien queue).
2767 // Cast to int to silence gcc -Wbitwise-instead-of-logical.
2768 return (int(smp::poll_queues()) |
2769 int(_r
._alien
.poll_queues()));
2771 virtual bool pure_poll() final override
{
2772 return (smp::pure_poll_queues() ||
2773 _r
._alien
.pure_poll_queues());
2775 virtual bool try_enter_interrupt_mode() override
{
2776 // systemwide_memory_barrier() is very slow if run concurrently,
2777 // so don't go to sleep if it is running now.
2778 _r
._sleeping
.store(true, std::memory_order_relaxed
);
2779 bool barrier_done
= try_systemwide_memory_barrier();
2780 if (!barrier_done
) {
2781 _r
._sleeping
.store(false, std::memory_order_relaxed
);
2786 _r
._sleeping
.store(false, std::memory_order_relaxed
);
2791 virtual void exit_interrupt_mode() override final
{
2792 _r
._sleeping
.store(false, std::memory_order_relaxed
);
2796 class reactor::execution_stage_pollfn final
: public reactor::pollfn
{
2797 internal::execution_stage_manager
& _esm
;
2799 execution_stage_pollfn() : _esm(internal::execution_stage_manager::get()) { }
2801 virtual bool poll() override
{
2802 return _esm
.flush();
2804 virtual bool pure_poll() override
{
2807 virtual bool try_enter_interrupt_mode() override
{
2808 // This is a passive poller, so if a previous poll
2809 // returned false (idle), there's no more work to do.
2812 virtual void exit_interrupt_mode() override
{ }
2815 class reactor::syscall_pollfn final
: public reactor::pollfn
{
2818 syscall_pollfn(reactor
& r
) : _r(r
) {}
2819 virtual bool poll() final override
{
2820 return _r
._thread_pool
->complete();
2822 virtual bool pure_poll() override final
{
2823 return poll(); // actually performs work, but triggers no user continuations, so okay
2825 virtual bool try_enter_interrupt_mode() override
{
2826 _r
._thread_pool
->enter_interrupt_mode();
2829 _r
._thread_pool
->exit_interrupt_mode();
2834 virtual void exit_interrupt_mode() override final
{
2835 _r
._thread_pool
->exit_interrupt_mode();
2842 ::write(_notify_eventfd
.get(), &one
, sizeof(one
));
2845 void reactor::start_aio_eventfd_loop() {
2846 if (!_aio_eventfd
) {
2849 future
<> loop_done
= repeat([this] {
2850 return _aio_eventfd
->readable().then([this] {
2852 ::read(_aio_eventfd
->get_fd(), garbage
, 8); // totally uninteresting
2853 return _stopping
? stop_iteration::yes
: stop_iteration::no
;
2856 // must use make_lw_shared, because at_exit expects a copyable function
2857 at_exit([loop_done
= make_lw_shared(std::move(loop_done
))] {
2858 return std::move(*loop_done
);
2862 void reactor::stop_aio_eventfd_loop() {
2863 if (!_aio_eventfd
) {
2867 ::write(_aio_eventfd
->get_fd(), &one
, 8);
2872 reactor::have_more_tasks() const {
2873 return _active_task_queues
.size() + _activating_task_queues
.size();
2876 void reactor::insert_active_task_queue(task_queue
* tq
) {
2878 auto& atq
= _active_task_queues
;
2879 auto less
= task_queue::indirect_compare();
2880 if (atq
.empty() || less(atq
.back(), tq
)) {
2881 // Common case: idle->working
2882 // Common case: CPU intensive task queue going to the back
2885 // Common case: newly activated queue preempting everything else
2887 // Less common case: newly activated queue behind something already active
2889 while (i
+ 1 != atq
.size() && !less(atq
[i
], atq
[i
+1])) {
2890 std::swap(atq
[i
], atq
[i
+1]);
2896 reactor::task_queue
* reactor::pop_active_task_queue(sched_clock::time_point now
) {
2897 task_queue
* tq
= _active_task_queues
.front();
2898 _active_task_queues
.pop_front();
2899 tq
->_starvetime
+= now
- tq
->_ts
;
2904 reactor::insert_activating_task_queues() {
2905 // Quadratic, but since we expect the common cases in insert_active_task_queue() to dominate, faster
2906 for (auto&& tq
: _activating_task_queues
) {
2907 insert_active_task_queue(tq
);
2909 _activating_task_queues
.clear();
2912 void reactor::add_task(task
* t
) noexcept
{
2913 auto sg
= t
->group();
2914 auto* q
= _task_queues
[sg
._id
].get();
2915 bool was_empty
= q
->_q
.empty();
2916 q
->_q
.push_back(std::move(t
));
2917 shuffle(q
->_q
.back(), q
->_q
);
2923 void reactor::add_urgent_task(task
* t
) noexcept
{
2924 memory::scoped_critical_alloc_section _
;
2925 auto sg
= t
->group();
2926 auto* q
= _task_queues
[sg
._id
].get();
2927 bool was_empty
= q
->_q
.empty();
2928 q
->_q
.push_front(std::move(t
));
2929 shuffle(q
->_q
.front(), q
->_q
);
2936 reactor::run_some_tasks() {
2937 if (!have_more_tasks()) {
2940 sched_print("run_some_tasks: start");
2941 reset_preemption_monitor();
2942 lowres_clock::update();
2944 sched_clock::time_point t_run_completed
= now();
2945 STAP_PROBE(seastar
, reactor_run_tasks_start
);
2946 _cpu_stall_detector
->start_task_run(t_run_completed
);
2948 auto t_run_started
= t_run_completed
;
2949 insert_activating_task_queues();
2950 task_queue
* tq
= pop_active_task_queue(t_run_started
);
2951 sched_print("running tq {} {}", (void*)tq
, tq
->_name
);
2952 tq
->_current
= true;
2953 _last_vruntime
= std::max(tq
->_vruntime
, _last_vruntime
);
2955 tq
->_current
= false;
2956 t_run_completed
= now();
2957 auto delta
= t_run_completed
- t_run_started
;
2958 account_runtime(*tq
, delta
);
2959 sched_print("run complete ({} {}); time consumed {} usec; final vruntime {} empty {}",
2960 (void*)tq
, tq
->_name
, delta
/ 1us
, tq
->_vruntime
, tq
->_q
.empty());
2961 tq
->_ts
= t_run_completed
;
2962 if (!tq
->_q
.empty()) {
2963 insert_active_task_queue(tq
);
2965 tq
->_active
= false;
2967 } while (have_more_tasks() && !need_preempt());
2968 _cpu_stall_detector
->end_task_run(t_run_completed
);
2969 STAP_PROBE(seastar
, reactor_run_tasks_end
);
2970 *internal::current_scheduling_group_ptr() = default_scheduling_group(); // Prevent inheritance from last group run
2971 sched_print("run_some_tasks: end");
2975 reactor::activate(task_queue
& tq
) {
2979 sched_print("activating {} {}", (void*)&tq
, tq
._name
);
2980 // If activate() was called, the task queue is likely network-bound or I/O bound, not CPU-bound. As
2981 // such its vruntime will be low, and it will have a large advantage over other task queues. Limit
2982 // the advantage so it doesn't dominate scheduling for a long time, in case it _does_ become CPU
2985 // FIXME: different scheduling groups have different sensitivity to jitter, take advantage
2986 if (_last_vruntime
> tq
._vruntime
) {
2987 sched_print("tq {} {} losing vruntime {} due to sleep", (void*)&tq
, tq
._name
, _last_vruntime
- tq
._vruntime
);
2989 tq
._vruntime
= std::max(_last_vruntime
, tq
._vruntime
);
2990 auto now
= reactor::now();
2991 tq
._waittime
+= now
- tq
._ts
;
2993 _activating_task_queues
.push_back(&tq
);
2996 void reactor::service_highres_timer() noexcept
{
2997 complete_timers(_timers
, _expired_timers
, [this] () noexcept
{
2998 if (!_timers
.empty()) {
2999 enable_timer(_timers
.get_next_timeout());
3004 int reactor::run() noexcept
{
3007 } catch (const std::exception
& e
) {
3008 seastar_logger
.error(e
.what());
3009 print_with_backtrace("exception running reactor main loop");
3014 int reactor::do_run() {
3015 #ifndef SEASTAR_ASAN_ENABLED
3016 // SIGSTKSZ is too small when using asan. We also don't need to
3017 // handle SIGSEGV ourselves when using asan, so just don't install
3018 // a signal handler stack.
3019 auto signal_stack
= install_signal_handler_stack();
3021 (void)install_signal_handler_stack
;
3026 // The order in which we execute the pollers is very important for performance.
3028 // This is because events that are generated in one poller may feed work into others. If
3029 // they were reversed, we'd only be able to do that work in the next task quota.
3031 // One example is the relationship between the smp poller and the I/O submission poller:
3032 // If the smp poller runs first, requests from remote I/O queues can be dispatched right away
3034 // We will run the pollers in the following order:
3036 // 1. SMP: any remote event arrives before anything else
3037 // 2. reap kernel events completion: storage related completions may free up space in the I/O
3039 // 4. I/O queue: must be after reap, to free up events. If new slots are freed may submit I/O
3040 // 5. kernel submission: for I/O, will submit what was generated from last step.
3041 // 6. reap kernel events completion: some of the submissions from last step may return immediately.
3042 // For example if we are dealing with poll() on a fd that has events.
3043 poller
smp_poller(std::make_unique
<smp_pollfn
>(*this));
3045 poller
reap_kernel_completions_poller(std::make_unique
<reap_kernel_completions_pollfn
>(*this));
3046 poller
io_queue_submission_poller(std::make_unique
<io_queue_submission_pollfn
>(*this));
3047 poller
kernel_submit_work_poller(std::make_unique
<kernel_submit_work_pollfn
>(*this));
3048 poller
final_real_kernel_completions_poller(std::make_unique
<reap_kernel_completions_pollfn
>(*this));
3050 poller
batch_flush_poller(std::make_unique
<batch_flush_pollfn
>(*this));
3051 poller
execution_stage_poller(std::make_unique
<execution_stage_pollfn
>());
3053 start_aio_eventfd_loop();
3055 if (_id
== 0 && _cfg
.auto_handle_sigint_sigterm
) {
3056 if (_handle_sigint
) {
3057 _signals
.handle_signal_once(SIGINT
, [this] { stop(); });
3059 _signals
.handle_signal_once(SIGTERM
, [this] { stop(); });
3062 // Start initialization in the background.
3063 // Communicate when done using _start_promise.
3064 (void)_cpu_started
.wait(smp::count
).then([this] {
3065 (void)_network_stack
->initialize().then([this] {
3066 _start_promise
.set_value();
3069 // Wait for network stack in the background and then signal all cpus.
3070 (void)_network_stack_ready
->then([this] (std::unique_ptr
<network_stack
> stack
) {
3071 _network_stack
= std::move(stack
);
3072 return smp::invoke_on_all([] {
3073 engine()._cpu_started
.signal();
3077 poller
syscall_poller(std::make_unique
<syscall_pollfn
>(*this));
3079 poller
drain_cross_cpu_freelist(std::make_unique
<drain_cross_cpu_freelist_pollfn
>());
3081 poller
expire_lowres_timers(std::make_unique
<lowres_timer_pollfn
>(*this));
3082 poller
sig_poller(std::make_unique
<signal_pollfn
>(*this));
3084 using namespace std::chrono_literals
;
3085 timer
<lowres_clock
> load_timer
;
3086 auto last_idle
= _total_idle
;
3087 auto idle_start
= now(), idle_end
= idle_start
;
3088 load_timer
.set_callback([this, &last_idle
, &idle_start
, &idle_end
] () mutable {
3089 _total_idle
+= idle_end
- idle_start
;
3090 auto load
= double((_total_idle
- last_idle
).count()) / double(std::chrono::duration_cast
<sched_clock::duration
>(1s
).count());
3091 last_idle
= _total_idle
;
3092 load
= std::min(load
, 1.0);
3093 idle_start
= idle_end
;
3094 _loads
.push_front(load
);
3095 if (_loads
.size() > 5) {
3096 auto drop
= _loads
.back();
3102 load_timer
.arm_periodic(1s
);
3104 itimerspec its
= seastar::posix::to_relative_itimerspec(_task_quota
, _task_quota
);
3105 _task_quota_timer
.timerfd_settime(0, its
);
3106 auto& task_quote_itimerspec
= its
;
3108 struct sigaction sa_block_notifier
= {};
3109 sa_block_notifier
.sa_handler
= &reactor::block_notifier
;
3110 sa_block_notifier
.sa_flags
= SA_RESTART
;
3111 auto r
= sigaction(internal::cpu_stall_detector::signal_number(), &sa_block_notifier
, nullptr);
3116 std::function
<bool()> check_for_work
= [this] () {
3117 return poll_once() || have_more_tasks();
3119 std::function
<bool()> pure_check_for_work
= [this] () {
3120 return pure_poll_once() || have_more_tasks();
3125 load_timer
.cancel();
3126 // Final tasks may include sending the last response to cpu 0, so run them
3127 while (have_more_tasks()) {
3130 while (!_at_destroy_tasks
->_q
.empty()) {
3131 run_tasks(*_at_destroy_tasks
);
3133 _finished_running_tasks
= true;
3134 _smp
->arrive_at_event_loop_end();
3143 lowres_clock::update(); // Don't delay expiring lowres timers
3144 if (check_for_work()) {
3146 _total_idle
+= idle_end
- idle_start
;
3147 account_idle(idle_end
- idle_start
);
3148 idle_start
= idle_end
;
3154 idle_start
= idle_end
;
3157 bool go_to_sleep
= true;
3159 // we can't run check_for_work(), because that can run tasks in the context
3160 // of the idle handler which change its state, without the idle handler expecting
3161 // it. So run pure_check_for_work() instead.
3162 auto handler_result
= _idle_cpu_handler(pure_check_for_work
);
3163 go_to_sleep
= handler_result
== idle_cpu_handler_result::no_more_work
;
3165 report_exception("Exception while running idle cpu handler", std::current_exception());
3168 internal::cpu_relax();
3169 if (idle_end
- idle_start
> _max_poll_time
) {
3170 // Turn off the task quota timer to avoid spurious wakeups
3171 struct itimerspec zero_itimerspec
= {};
3172 _task_quota_timer
.timerfd_settime(0, zero_itimerspec
);
3173 auto start_sleep
= now();
3174 _cpu_stall_detector
->start_sleep();
3176 _cpu_stall_detector
->end_sleep();
3177 // We may have slept for a while, so freshen idle_end
3179 _total_sleep
+= idle_end
- start_sleep
;
3180 _task_quota_timer
.timerfd_settime(0, task_quote_itimerspec
);
3183 // We previously ran pure_check_for_work(), might not actually have performed
3189 // To prevent ordering issues from rising, destroy the I/O queue explicitly at this point.
3190 // This is needed because the reactor is destroyed from the thread_local destructors. If
3191 // the I/O queue happens to use any other infrastructure that is also kept this way (for
3192 // instance, collectd), we will not have any way to guarantee who is destroyed first.
3199 for (auto i
= _pollers
.begin(); i
!= _pollers
.end(); ++i
) {
3200 auto ok
= (*i
)->try_enter_interrupt_mode();
3202 while (i
!= _pollers
.begin()) {
3203 (*--i
)->exit_interrupt_mode();
3209 _backend
->wait_and_process_events(&_active_sigmask
);
3211 for (auto i
= _pollers
.rbegin(); i
!= _pollers
.rend(); ++i
) {
3212 (*i
)->exit_interrupt_mode();
3217 reactor::poll_once() {
3219 for (auto c
: _pollers
) {
3227 reactor::pure_poll_once() {
3228 for (auto c
: _pollers
) {
3229 if (c
->pure_poll()) {
3236 namespace internal
{
3238 class poller::registration_task final
: public task
{
3242 explicit registration_task(poller
* p
) : _p(p
) {}
3243 virtual void run_and_dispose() noexcept override
{
3245 engine().register_poller(_p
->_pollfn
.get());
3246 _p
->_registration_task
= nullptr;
3250 task
* waiting_task() noexcept override
{ return nullptr; }
3254 void moved(poller
* p
) {
3259 class poller::deregistration_task final
: public task
{
3261 std::unique_ptr
<pollfn
> _p
;
3263 explicit deregistration_task(std::unique_ptr
<pollfn
>&& p
) : _p(std::move(p
)) {}
3264 virtual void run_and_dispose() noexcept override
{
3265 engine().unregister_poller(_p
.get());
3268 task
* waiting_task() noexcept override
{ return nullptr; }
3273 void reactor::register_poller(pollfn
* p
) {
3274 _pollers
.push_back(p
);
3277 void reactor::unregister_poller(pollfn
* p
) {
3278 _pollers
.erase(std::find(_pollers
.begin(), _pollers
.end(), p
));
3281 void reactor::replace_poller(pollfn
* old
, pollfn
* neww
) {
3282 std::replace(_pollers
.begin(), _pollers
.end(), old
, neww
);
3285 namespace internal
{
3287 poller::poller(poller
&& x
) noexcept
3288 : _pollfn(std::move(x
._pollfn
)), _registration_task(std::exchange(x
._registration_task
, nullptr)) {
3289 if (_pollfn
&& _registration_task
) {
3290 _registration_task
->moved(this);
3295 poller::operator=(poller
&& x
) noexcept
{
3298 new (this) poller(std::move(x
));
3304 poller::do_register() noexcept
{
3305 // We can't just insert a poller into reactor::_pollers, because we
3306 // may be running inside a poller ourselves, and so in the middle of
3307 // iterating reactor::_pollers itself. So we schedule a task to add
3308 // the poller instead.
3309 auto task
= new registration_task(this);
3310 engine().add_task(task
);
3311 _registration_task
= task
;
3315 // We can't just remove the poller from reactor::_pollers, because we
3316 // may be running inside a poller ourselves, and so in the middle of
3317 // iterating reactor::_pollers itself. So we schedule a task to remove
3318 // the poller instead.
3320 // Since we don't want to call the poller after we exit the destructor,
3321 // we replace it atomically with another one, and schedule a task to
3322 // delete the replacement.
3324 if (_registration_task
) {
3325 // not added yet, so don't do it at all.
3326 _registration_task
->cancel();
3327 delete _registration_task
;
3328 } else if (!engine()._finished_running_tasks
) {
3329 // If _finished_running_tasks, the call to add_task() below will just
3330 // leak it, since no one will call task::run_and_dispose(). Just leave
3331 // the poller there, the reactor will never use it.
3332 auto dummy
= make_pollfn([] { return false; });
3333 auto dummy_p
= dummy
.get();
3334 auto task
= new deregistration_task(std::move(dummy
));
3335 engine().add_task(task
);
3336 engine().replace_poller(_pollfn
.get(), dummy_p
);
3343 syscall_work_queue::syscall_work_queue()
3346 , _start_eventfd(0) {
3349 void syscall_work_queue::submit_item(std::unique_ptr
<syscall_work_queue::work_item
> item
) {
3350 (void)_queue_has_room
.wait().then_wrapped([this, item
= std::move(item
)] (future
<> f
) mutable {
3351 // propagate wait failure via work_item
3353 item
->set_exception(f
.get_exception());
3356 _pending
.push(item
.release());
3357 _start_eventfd
.signal(1);
3361 unsigned syscall_work_queue::complete() {
3362 std::array
<work_item
*, queue_length
> tmp_buf
;
3363 auto end
= tmp_buf
.data();
3364 auto nr
= _completed
.consume_all([&] (work_item
* wi
) {
3367 for (auto p
= tmp_buf
.data(); p
!= end
; ++p
) {
3372 _queue_has_room
.signal(nr
);
3377 smp_message_queue::smp_message_queue(reactor
* from
, reactor
* to
)
3383 smp_message_queue::~smp_message_queue()
3385 if (_pending
.remote
!= _completed
.remote
) {
3390 void smp_message_queue::stop() {
3394 void smp_message_queue::move_pending() {
3395 auto begin
= _tx
.a
.pending_fifo
.cbegin();
3396 auto end
= _tx
.a
.pending_fifo
.cend();
3397 end
= _pending
.push(begin
, end
);
3401 auto nr
= end
- begin
;
3402 _pending
.maybe_wakeup();
3403 _tx
.a
.pending_fifo
.erase(begin
, end
);
3404 _current_queue_length
+= nr
;
3405 _last_snt_batch
= nr
;
3409 bool smp_message_queue::pure_poll_tx() const {
3410 // can't use read_available(), not available on older boost
3411 // empty() is not const, so need const_cast.
3412 return !const_cast<lf_queue
&>(_completed
).empty();
3415 void smp_message_queue::submit_item(shard_id t
, smp_timeout_clock::time_point timeout
, std::unique_ptr
<smp_message_queue::work_item
> item
) {
3416 // matching signal() in process_completions()
3417 auto ssg_id
= internal::smp_service_group_id(item
->ssg
);
3418 auto& sem
= get_smp_service_groups_semaphore(ssg_id
, t
);
3419 // Future indirectly forwarded to `item`.
3420 (void)get_units(sem
, 1, timeout
).then_wrapped([this, item
= std::move(item
)] (future
<smp_service_group_semaphore_units
> units_fut
) mutable {
3421 if (units_fut
.failed()) {
3422 item
->fail_with(units_fut
.get_exception());
3427 _tx
.a
.pending_fifo
.push_back(item
.get());
3428 // no exceptions from this point
3430 units_fut
.get0().release();
3431 if (_tx
.a
.pending_fifo
.size() >= batch_size
) {
3437 void smp_message_queue::respond(work_item
* item
) {
3438 _completed_fifo
.push_back(item
);
3439 if (_completed_fifo
.size() >= batch_size
|| engine()._stopped
) {
3440 flush_response_batch();
3444 void smp_message_queue::flush_response_batch() {
3445 if (!_completed_fifo
.empty()) {
3446 auto begin
= _completed_fifo
.cbegin();
3447 auto end
= _completed_fifo
.cend();
3448 end
= _completed
.push(begin
, end
);
3452 _completed
.maybe_wakeup();
3453 _completed_fifo
.erase(begin
, end
);
3457 bool smp_message_queue::has_unflushed_responses() const {
3458 return !_completed_fifo
.empty();
3461 bool smp_message_queue::pure_poll_rx() const {
3462 // can't use read_available(), not available on older boost
3463 // empty() is not const, so need const_cast.
3464 return !const_cast<lf_queue
&>(_pending
).empty();
3468 smp_message_queue::lf_queue::maybe_wakeup() {
3469 // Called after lf_queue_base::push().
3471 // This is read-after-write, which wants memory_order_seq_cst,
3472 // but we insert that barrier using systemwide_memory_barrier()
3473 // because seq_cst is so expensive.
3475 // However, we do need a compiler barrier:
3476 std::atomic_signal_fence(std::memory_order_seq_cst
);
3477 if (remote
->_sleeping
.load(std::memory_order_relaxed
)) {
3478 // We are free to clear it, because we're sending a signal now
3479 remote
->_sleeping
.store(false, std::memory_order_relaxed
);
3484 smp_message_queue::lf_queue::~lf_queue() {
3485 consume_all([] (work_item
* ptr
) {
3491 template<size_t PrefetchCnt
, typename Func
>
3492 size_t smp_message_queue::process_queue(lf_queue
& q
, Func process
) {
3493 // copy batch to local memory in order to minimize
3494 // time in which cross-cpu data is accessed
3495 work_item
* items
[queue_length
+ PrefetchCnt
];
3499 // start prefetching first item before popping the rest to overlap memory
3500 // access with potential cache miss the second pop may cause
3502 auto nr
= q
.pop(items
);
3503 std::fill(std::begin(items
) + nr
, std::begin(items
) + nr
+ PrefetchCnt
, nr
? items
[nr
- 1] : wi
);
3506 prefetch_n
<2>(std::begin(items
) + i
, std::begin(items
) + i
+ PrefetchCnt
);
3514 size_t smp_message_queue::process_completions(shard_id t
) {
3515 auto nr
= process_queue
<prefetch_cnt
*2>(_completed
, [t
] (work_item
* wi
) {
3517 auto ssg_id
= internal::smp_service_group_id(wi
->ssg
);
3518 get_smp_service_groups_semaphore(ssg_id
, t
).signal();
3521 _current_queue_length
-= nr
;
3523 _last_cmpl_batch
= nr
;
3528 void smp_message_queue::flush_request_batch() {
3529 if (!_tx
.a
.pending_fifo
.empty()) {
3534 size_t smp_message_queue::process_incoming() {
3535 auto nr
= process_queue
<prefetch_cnt
>(_pending
, [] (work_item
* wi
) {
3539 _last_rcv_batch
= nr
;
3543 void smp_message_queue::start(unsigned cpuid
) {
3545 namespace sm
= seastar::metrics
;
3547 std::snprintf(instance
, sizeof(instance
), "%u-%u", this_shard_id(), cpuid
);
3548 _metrics
.add_group("smp", {
3549 // queue_length value:GAUGE:0:U
3550 // Absolute value of num packets in last tx batch.
3551 sm::make_queue_length("send_batch_queue_length", _last_snt_batch
, sm::description("Current send batch queue length"), {sm::shard_label(instance
)})(sm::metric_disabled
),
3552 sm::make_queue_length("receive_batch_queue_length", _last_rcv_batch
, sm::description("Current receive batch queue length"), {sm::shard_label(instance
)})(sm::metric_disabled
),
3553 sm::make_queue_length("complete_batch_queue_length", _last_cmpl_batch
, sm::description("Current complete batch queue length"), {sm::shard_label(instance
)})(sm::metric_disabled
),
3554 sm::make_queue_length("send_queue_length", _current_queue_length
, sm::description("Current send queue length"), {sm::shard_label(instance
)})(sm::metric_disabled
),
3555 // total_operations value:DERIVE:0:U
3556 sm::make_counter("total_received_messages", _received
, sm::description("Total number of received messages"), {sm::shard_label(instance
)})(sm::metric_disabled
),
3557 // total_operations value:DERIVE:0:U
3558 sm::make_counter("total_sent_messages", _sent
, sm::description("Total number of sent messages"), {sm::shard_label(instance
)})(sm::metric_disabled
),
3559 // total_operations value:DERIVE:0:U
3560 sm::make_counter("total_completed_messages", _compl
, sm::description("Total number of messages completed"), {sm::shard_label(instance
)})(sm::metric_disabled
)
3564 readable_eventfd
writeable_eventfd::read_side() {
3565 return readable_eventfd(_fd
.dup());
3568 file_desc
writeable_eventfd::try_create_eventfd(size_t initial
) {
3569 assert(size_t(int(initial
)) == initial
);
3570 return file_desc::eventfd(initial
, EFD_CLOEXEC
);
3573 void writeable_eventfd::signal(size_t count
) {
3575 auto r
= _fd
.write(&c
, sizeof(c
));
3576 assert(r
== sizeof(c
));
3579 writeable_eventfd
readable_eventfd::write_side() {
3580 return writeable_eventfd(_fd
.get_file_desc().dup());
3583 file_desc
readable_eventfd::try_create_eventfd(size_t initial
) {
3584 assert(size_t(int(initial
)) == initial
);
3585 return file_desc::eventfd(initial
, EFD_CLOEXEC
| EFD_NONBLOCK
);
3588 future
<size_t> readable_eventfd::wait() {
3589 return engine().readable(*_fd
._s
).then([this] {
3591 int r
= ::read(_fd
.get_fd(), &count
, sizeof(count
));
3592 assert(r
== sizeof(count
));
3593 return make_ready_future
<size_t>(count
);
3597 void schedule(task
* t
) noexcept
{
3598 engine().add_task(t
);
3601 void schedule_urgent(task
* t
) noexcept
{
3602 engine().add_urgent_task(t
);
3607 bool operator==(const ::sockaddr_in a
, const ::sockaddr_in b
) {
3608 return (a
.sin_addr
.s_addr
== b
.sin_addr
.s_addr
) && (a
.sin_port
== b
.sin_port
);
3613 static bool kernel_supports_aio_fsync() {
3614 return internal::kernel_uname().whitelisted({"4.18"});
3617 static program_options::selection_value
<network_stack_factory
> create_network_stacks_option(reactor_options
& zis
) {
3618 using value_type
= program_options::selection_value
<network_stack_factory
>;
3619 value_type::candidates candidates
;
3620 std::vector
<std::string
> net_stack_names
;
3622 auto deleter
= [] (network_stack_factory
* p
) { delete p
; };
3624 std::string default_stack
;
3625 for (auto reg_func
: {register_native_stack
, register_posix_stack
}) {
3626 auto s
= reg_func();
3628 default_stack
= s
.name
;
3630 candidates
.push_back({s
.name
, {new network_stack_factory(std::move(s
.factory
)), deleter
}, std::move(s
.opts
)});
3631 net_stack_names
.emplace_back(s
.name
);
3634 return program_options::selection_value
<network_stack_factory
>(zis
, "network-stack", std::move(candidates
), default_stack
,
3635 format("select network stack (valid values: {})", format_separated(net_stack_names
.begin(), net_stack_names
.end(), ", ")));
3638 static program_options::selection_value
<reactor_backend_selector
>::candidates
backend_selector_candidates() {
3639 using value_type
= program_options::selection_value
<reactor_backend_selector
>;
3640 value_type::candidates candidates
;
3642 auto deleter
= [] (reactor_backend_selector
* p
) { delete p
; };
3644 for (auto&& be
: reactor_backend_selector::available()) {
3645 auto name
= be
.name();
3646 candidates
.push_back({std::move(name
), {new reactor_backend_selector(std::move(be
)), deleter
}, {}});
3651 reactor_options::reactor_options(program_options::option_group
* parent_group
)
3652 : program_options::option_group(parent_group
, "Core options")
3653 , network_stack(create_network_stacks_option(*this))
3654 , poll_mode(*this, "poll-mode", "poll continuously (100% cpu use)")
3655 , idle_poll_time_us(*this, "idle-poll-time-us", reactor::calculate_poll_time() / 1us
,
3656 "idle polling time in microseconds (reduce for overprovisioned environments or laptops)")
3657 , poll_aio(*this, "poll-aio", true,
3658 "busy-poll for disk I/O (reduces latency and increases throughput)")
3659 , task_quota_ms(*this, "task-quota-ms", 0.5, "Max time (ms) between polls")
3660 , io_latency_goal_ms(*this, "io-latency-goal-ms", {}, "Max time (ms) io operations must take (1.5 * task-quota-ms if not set)")
3661 , max_task_backlog(*this, "max-task-backlog", 1000, "Maximum number of task backlog to allow; above this we ignore I/O")
3662 , blocked_reactor_notify_ms(*this, "blocked-reactor-notify-ms", 25, "threshold in miliseconds over which the reactor is considered blocked if no progress is made")
3663 , blocked_reactor_reports_per_minute(*this, "blocked-reactor-reports-per-minute", 5, "Maximum number of backtraces reported by stall detector per minute")
3664 , blocked_reactor_report_format_oneline(*this, "blocked-reactor-report-format-oneline", true, "Print a simplified backtrace on a single line")
3665 , relaxed_dma(*this, "relaxed-dma", "allow using buffered I/O if DMA is not available (reduces performance)")
3666 , linux_aio_nowait(*this, "linux-aio-nowait", aio_nowait_supported
,
3667 "use the Linux NOWAIT AIO feature, which reduces reactor stalls due to aio (autodetected)")
3668 , unsafe_bypass_fsync(*this, "unsafe-bypass-fsync", false, "Bypass fsync(), may result in data loss. Use for testing on consumer drives")
3669 , kernel_page_cache(*this, "kernel-page-cache", false,
3670 "Use the kernel page cache. This disables DMA (O_DIRECT)."
3671 " Useful for short-lived functional tests with a small data set.")
3672 , overprovisioned(*this, "overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0")
3673 , abort_on_seastar_bad_alloc(*this, "abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory")
3674 , force_aio_syscalls(*this, "force-aio-syscalls", false,
3675 "Force io_getevents(2) to issue a system call, instead of bypassing the kernel when possible."
3676 " This makes strace output more useful, but slows down the application")
3677 , dump_memory_diagnostics_on_alloc_failure_kind(*this, "dump-memory-diagnostics-on-alloc-failure-kind", memory::alloc_failure_kind::critical
,
3678 "Dump diagnostics of the seastar allocator state on allocation failure."
3679 " Accepted values: none, critical (default), all. When set to critical, only allocations marked as critical will trigger diagnostics dump."
3680 " The diagnostics will be written to the seastar_memory logger, with error level."
3681 " Note that if the seastar_memory logger is set to debug or trace level, the diagnostics will be logged irrespective of this setting.")
3682 , reactor_backend(*this, "reactor-backend", backend_selector_candidates(), reactor_backend_selector::default_backend().name(),
3683 format("Internal reactor implementation ({})", reactor_backend_selector::available()))
3684 , aio_fsync(*this, "aio-fsync", kernel_supports_aio_fsync(),
3685 "Use Linux aio for fsync() calls. This reduces latency; requires Linux 4.18 or later.")
3686 , max_networking_io_control_blocks(*this, "max-networking-io-control-blocks", 10000,
3687 "Maximum number of I/O control blocks (IOCBs) to allocate per shard. This translates to the number of sockets supported per shard."
3688 " Requires tuning /proc/sys/fs/aio-max-nr. Only valid for the linux-aio reactor backend (see --reactor-backend).")
3689 #ifdef SEASTAR_HEAPPROF
3690 , heapprof(*this, "heapprof", "enable seastar heap profiling")
3692 , heapprof(*this, "heapprof", program_options::unused
{})
3694 , no_handle_interrupt(*this, "no-handle-interrupt", "ignore SIGINT (for gdb)")
3698 smp_options::smp_options(program_options::option_group
* parent_group
)
3699 : program_options::option_group(parent_group
, "SMP options")
3700 , smp(*this, "smp", {}, "number of threads (default: one per CPU)")
3701 , cpuset(*this, "cpuset", {}, "CPUs to use (in cpuset(7) format; default: all))")
3702 , memory(*this, "memory", std::nullopt
, "memory to use, in bytes (ex: 4G) (default: all)")
3703 , reserve_memory(*this, "reserve-memory", {}, "memory reserved to OS (if --memory not specified)")
3704 , hugepages(*this, "hugepages", {}, "path to accessible hugetlbfs mount (typically /dev/hugepages/something)")
3705 , lock_memory(*this, "lock-memory", {}, "lock all memory (prevents swapping)")
3706 , thread_affinity(*this, "thread-affinity", true, "pin threads to their cpus (disable for overprovisioning)")
3707 #ifdef SEASTAR_HAVE_HWLOC
3708 , num_io_queues(*this, "num-io-queues", {}, "Number of IO queues. Each IO unit will be responsible for a fraction of the IO requests. Defaults to the number of threads")
3709 , num_io_groups(*this, "num-io-groups", {}, "Number of IO groups. Each IO group will be responsible for a fraction of the IO requests. Defaults to the number of NUMA nodes")
3711 , num_io_queues(*this, "num-io-queues", program_options::unused
{})
3712 , num_io_groups(*this, "num-io-groups", program_options::unused
{})
3714 , io_properties_file(*this, "io-properties-file", {}, "path to a YAML file describing the characteristics of the I/O Subsystem")
3715 , io_properties(*this, "io-properties", {}, "a YAML string describing the characteristics of the I/O Subsystem")
3716 , mbind(*this, "mbind", true, "enable mbind")
3717 #ifndef SEASTAR_NO_EXCEPTION_HACK
3718 , enable_glibc_exception_scaling_workaround(*this, "enable-glibc-exception-scaling-workaround", true, "enable workaround for glibc/gcc c++ exception scalablity problem")
3720 , enable_glibc_exception_scaling_workaround(*this, "enable-glibc-exception-scaling-workaround", program_options::unused
{})
3722 #ifdef SEASTAR_HAVE_HWLOC
3723 , allow_cpus_in_remote_numa_nodes(*this, "allow-cpus-in-remote-numa-nodes", true, "if some CPUs are found not to have any local NUMA nodes, allow assigning them to remote ones")
3725 , allow_cpus_in_remote_numa_nodes(*this, "allow-cpus-in-remote-numa-nodes", program_options::unused
{})
3730 thread_local
scollectd::impl scollectd_impl
;
3732 scollectd::impl
& scollectd::get_impl() {
3733 return scollectd_impl
;
3736 struct reactor_deleter
{
3737 void operator()(reactor
* p
) {
3743 thread_local
std::unique_ptr
<reactor
, reactor_deleter
> reactor_holder
;
3745 thread_local smp_message_queue
** smp::_qs
;
3746 thread_local
std::thread::id
smp::_tmain
;
3747 unsigned smp::count
= 0;
3749 void smp::start_all_queues()
3751 for (unsigned c
= 0; c
< count
; c
++) {
3752 if (c
!= this_shard_id()) {
3753 _qs
[c
][this_shard_id()].start(c
);
3756 _alien
._qs
[this_shard_id()].start();
3759 #ifdef SEASTAR_HAVE_DPDK
3761 int dpdk_thread_adaptor(void* f
)
3763 (*static_cast<std::function
<void ()>*>(f
))();
3769 void smp::join_all()
3771 #ifdef SEASTAR_HAVE_DPDK
3773 rte_eal_mp_wait_lcore();
3777 for (auto&& t
: smp::_threads
) {
3782 void smp::pin(unsigned cpu_id
) {
3784 // dpdk does its own pinning
3787 pin_this_thread(cpu_id
);
3790 void smp::arrive_at_event_loop_end() {
3791 if (_all_event_loops_done
) {
3792 _all_event_loops_done
->wait();
3796 void smp::allocate_reactor(unsigned id
, reactor_backend_selector rbs
, reactor_config cfg
) {
3797 assert(!reactor_holder
);
3799 // we cannot just write "local_engin = new reactor" since reactor's constructor
3800 // uses local_engine
3802 int r
= posix_memalign(&buf
, cache_line_size
, sizeof(reactor
));
3804 local_engine
= reinterpret_cast<reactor
*>(buf
);
3805 *internal::this_shard_id_ptr() = id
;
3806 new (buf
) reactor(this->shared_from_this(), _alien
, id
, std::move(rbs
), cfg
);
3807 reactor_holder
.reset(local_engine
);
3810 void smp::cleanup() noexcept
{
3811 smp::_threads
= std::vector
<posix_thread
>();
3812 _thread_loops
.clear();
3815 void smp::cleanup_cpu() {
3816 size_t cpuid
= this_shard_id();
3819 for(unsigned i
= 0; i
< smp::count
; i
++) {
3820 _qs
[i
][cpuid
].stop();
3824 _alien
._qs
[cpuid
].stop();
3828 void smp::create_thread(std::function
<void ()> thread_loop
) {
3830 _thread_loops
.push_back(std::move(thread_loop
));
3832 _threads
.emplace_back(std::move(thread_loop
));
3836 // Installs handler for Signal which ensures that Func is invoked only once
3837 // in the whole program and that after it is invoked the default handler is restored.
3838 template<int Signal
, void(*Func
)()>
3839 void install_oneshot_signal_handler() {
3840 static bool handled
= false;
3841 static util::spinlock lock
;
3843 struct sigaction sa
;
3844 sa
.sa_sigaction
= [](int sig
, siginfo_t
*info
, void *p
) {
3845 std::lock_guard
<util::spinlock
> g(lock
);
3849 signal(sig
, SIG_DFL
);
3852 sigfillset(&sa
.sa_mask
);
3853 sa
.sa_flags
= SA_SIGINFO
| SA_RESTART
;
3854 if (Signal
== SIGSEGV
) {
3855 sa
.sa_flags
|= SA_ONSTACK
;
3857 auto r
= ::sigaction(Signal
, &sa
, nullptr);
3858 throw_system_error_on(r
== -1);
3861 static void reraise_signal(int signo
) {
3862 signal(signo
, SIG_DFL
);
3863 pthread_kill(pthread_self(), signo
);
3866 static void sigsegv_action() noexcept
{
3867 print_with_backtrace("Segmentation fault");
3868 reraise_signal(SIGSEGV
);
3871 static void sigabrt_action() noexcept
{
3872 print_with_backtrace("Aborting");
3873 reraise_signal(SIGABRT
);
3876 void smp::qs_deleter::operator()(smp_message_queue
** qs
) const {
3877 for (unsigned i
= 0; i
< smp::count
; i
++) {
3878 for (unsigned j
= 0; j
< smp::count
; j
++) {
3879 qs
[i
][j
].~smp_message_queue();
3881 ::operator delete[](qs
[i
]);
3886 class disk_config_params
{
3888 unsigned _num_io_groups
= 0;
3889 std::unordered_map
<dev_t
, mountpoint_params
> _mountpoints
;
3890 std::chrono::duration
<double> _latency_goal
;
3893 uint64_t per_io_group(uint64_t qty
, unsigned nr_groups
) const noexcept
{
3894 return std::max(qty
/ nr_groups
, 1ul);
3897 unsigned num_io_groups() const noexcept
{ return _num_io_groups
; }
3899 std::chrono::duration
<double> latency_goal() const {
3900 return _latency_goal
;
3903 double latency_goal_opt(const reactor_options
& opts
) const {
3904 return opts
.io_latency_goal_ms
?
3905 opts
.io_latency_goal_ms
.get_value() :
3906 opts
.task_quota_ms
.get_value() * 1.5;
3909 void parse_config(const smp_options
& smp_opts
, const reactor_options
& reactor_opts
) {
3910 seastar_logger
.debug("smp::count: {}", smp::count
);
3911 _latency_goal
= std::chrono::duration_cast
<std::chrono::duration
<double>>(latency_goal_opt(reactor_opts
) * 1ms
);
3912 seastar_logger
.debug("latency_goal: {}", latency_goal().count());
3914 if (smp_opts
.num_io_groups
) {
3915 _num_io_groups
= smp_opts
.num_io_groups
.get_value();
3916 if (!_num_io_groups
) {
3917 throw std::runtime_error("num-io-groups must be greater than zero");
3919 } else if (smp_opts
.num_io_queues
) {
3920 seastar_logger
.warn("the --num-io-queues option is deprecated, switch to --num-io-groups instead");
3922 if (smp_opts
.io_properties_file
&& smp_opts
.io_properties
) {
3923 throw std::runtime_error("Both io-properties and io-properties-file specified. Don't know which to trust!");
3926 std::optional
<YAML::Node
> doc
;
3927 if (smp_opts
.io_properties_file
) {
3928 doc
= YAML::LoadFile(smp_opts
.io_properties_file
.get_value());
3929 } else if (smp_opts
.io_properties
) {
3930 doc
= YAML::Load(smp_opts
.io_properties
.get_value());
3934 if (!doc
->IsMap()) {
3935 throw std::runtime_error("Bogus io-properties (did you mix up --io-properties and --io-properties-file?)");
3937 for (auto&& section
: *doc
) {
3938 auto sec_name
= section
.first
.as
<std::string
>();
3939 if (sec_name
!= "disks") {
3940 throw std::runtime_error(fmt::format("While parsing I/O options: section {} currently unsupported.", sec_name
));
3942 auto disks
= section
.second
.as
<std::vector
<mountpoint_params
>>();
3943 for (auto& d
: disks
) {
3945 auto ret
= stat(d
.mountpoint
.c_str(), &buf
);
3947 throw std::runtime_error(fmt::format("Couldn't stat {}", d
.mountpoint
));
3950 auto st_dev
= S_ISBLK(buf
.st_mode
) ? buf
.st_rdev
: buf
.st_dev
;
3951 if (_mountpoints
.count(st_dev
)) {
3952 throw std::runtime_error(fmt::format("Mountpoint {} already configured", d
.mountpoint
));
3954 if (_mountpoints
.size() >= reactor::max_queues
) {
3955 throw std::runtime_error(fmt::format("Configured number of queues {} is larger than the maximum {}",
3956 _mountpoints
.size(), reactor::max_queues
));
3958 if (d
.read_bytes_rate
== 0 || d
.write_bytes_rate
== 0 ||
3959 d
.read_req_rate
== 0 || d
.write_req_rate
== 0) {
3960 throw std::runtime_error(fmt::format("R/W bytes and req rates must not be zero"));
3963 seastar_logger
.debug("dev_id: {} mountpoint: {}", st_dev
, d
.mountpoint
);
3964 _mountpoints
.emplace(st_dev
, d
);
3969 // Placeholder for unconfigured disks.
3970 mountpoint_params d
= {};
3971 _mountpoints
.emplace(0, d
);
3974 struct io_queue::config
generate_config(dev_t devid
, unsigned nr_groups
) const {
3975 seastar_logger
.debug("generate_config dev_id: {}", devid
);
3976 const mountpoint_params
& p
= _mountpoints
.at(devid
);
3977 struct io_queue::config cfg
;
3981 if (p
.read_bytes_rate
!= std::numeric_limits
<uint64_t>::max()) {
3982 cfg
.blocks_count_rate
= (io_queue::read_request_base_count
* (unsigned long)per_io_group(p
.read_bytes_rate
, nr_groups
)) >> io_queue::block_size_shift
;
3983 cfg
.disk_blocks_write_to_read_multiplier
= (io_queue::read_request_base_count
* p
.read_bytes_rate
) / p
.write_bytes_rate
;
3985 if (p
.read_req_rate
!= std::numeric_limits
<uint64_t>::max()) {
3986 cfg
.req_count_rate
= io_queue::read_request_base_count
* (unsigned long)per_io_group(p
.read_req_rate
, nr_groups
);
3987 cfg
.disk_req_write_to_read_multiplier
= (io_queue::read_request_base_count
* p
.read_req_rate
) / p
.write_req_rate
;
3989 if (p
.read_saturation_length
!= std::numeric_limits
<uint64_t>::max()) {
3990 cfg
.disk_read_saturation_length
= p
.read_saturation_length
;
3992 if (p
.write_saturation_length
!= std::numeric_limits
<uint64_t>::max()) {
3993 cfg
.disk_write_saturation_length
= p
.write_saturation_length
;
3995 cfg
.mountpoint
= p
.mountpoint
;
3996 cfg
.duplex
= p
.duplex
;
3997 cfg
.rate_factor
= p
.rate_factor
;
3998 cfg
.rate_limit_duration
= latency_goal();
3999 // Block count limit should not be less than the minimal IO size on the device
4000 // On the other hand, even this is not good enough -- in the worst case the
4001 // scheduler will self-tune to allow for the single 64k request, while it would
4002 // be better to sacrifice some IO latency, but allow for larger concurrency
4003 cfg
.block_count_limit_min
= (64 << 10) >> io_queue::block_size_shift
;
4009 return boost::adaptors::keys(_mountpoints
);
4013 unsigned smp::adjust_max_networking_aio_io_control_blocks(unsigned network_iocbs
)
4015 static unsigned constexpr storage_iocbs
= reactor::max_aio
;
4016 static unsigned constexpr preempt_iocbs
= 2;
4018 auto aio_max_nr
= read_first_line_as
<unsigned>("/proc/sys/fs/aio-max-nr");
4019 auto aio_nr
= read_first_line_as
<unsigned>("/proc/sys/fs/aio-nr");
4020 auto available_aio
= aio_max_nr
- aio_nr
;
4021 auto requested_aio_network
= network_iocbs
* smp::count
;
4022 auto requested_aio_other
= (storage_iocbs
+ preempt_iocbs
) * smp::count
;
4023 auto requested_aio
= requested_aio_network
+ requested_aio_other
;
4024 auto network_iocbs_old
= network_iocbs
;
4026 if (available_aio
< requested_aio
) {
4027 seastar_logger
.warn("Requested AIO slots too large, please increase request capacity in /proc/sys/fs/aio-max-nr. available:{} requested:{}", available_aio
, requested_aio
);
4028 if (available_aio
>= requested_aio_other
+ smp::count
) { // at least one queue for each shard
4029 network_iocbs
= (available_aio
- requested_aio_other
) / smp::count
;
4030 seastar_logger
.warn("max-networking-io-control-blocks adjusted from {} to {}, since AIO slots are unavailable", network_iocbs_old
, network_iocbs
);
4032 throw std::runtime_error("Could not setup Async I/O: Not enough request capacity in /proc/sys/fs/aio-max-nr. Try increasing that number or reducing the amount of logical CPUs available for your application");
4036 return network_iocbs
;
4039 void smp::configure(const smp_options
& smp_opts
, const reactor_options
& reactor_opts
)
4041 #ifndef SEASTAR_NO_EXCEPTION_HACK
4042 if (smp_opts
.enable_glibc_exception_scaling_workaround
.get_value()) {
4047 // Mask most, to prevent threads (esp. dpdk helper threads)
4048 // from servicing a signal. Individual reactors will unmask signals
4049 // as they become prepared to handle them.
4051 // We leave some signals unmasked since we don't handle them ourself.
4054 for (auto sig
: {SIGHUP
, SIGQUIT
, SIGILL
, SIGABRT
, SIGFPE
, SIGSEGV
,
4055 SIGALRM
, SIGCONT
, SIGSTOP
, SIGTSTP
, SIGTTIN
, SIGTTOU
}) {
4056 sigdelset(&sigs
, sig
);
4058 if (!reactor_opts
._auto_handle_sigint_sigterm
) {
4059 sigdelset(&sigs
, SIGINT
);
4060 sigdelset(&sigs
, SIGTERM
);
4062 pthread_sigmask(SIG_BLOCK
, &sigs
, nullptr);
4064 #ifndef SEASTAR_ASAN_ENABLED
4065 // We don't need to handle SIGSEGV when asan is enabled.
4066 install_oneshot_signal_handler
<SIGSEGV
, sigsegv_action
>();
4068 (void)sigsegv_action
;
4070 install_oneshot_signal_handler
<SIGABRT
, sigabrt_action
>();
4072 #ifdef SEASTAR_HAVE_DPDK
4073 const auto* native_stack
= dynamic_cast<const net::native_stack_options
*>(reactor_opts
.network_stack
.get_selected_candidate_opts());
4074 _using_dpdk
= native_stack
&& native_stack
->dpdk_pmd
;
4076 auto thread_affinity
= smp_opts
.thread_affinity
.get_value();
4077 if (reactor_opts
.overprovisioned
4078 && smp_opts
.thread_affinity
.defaulted()) {
4079 thread_affinity
= false;
4081 if (!thread_affinity
&& _using_dpdk
) {
4082 fmt::print("warning: --thread-affinity 0 ignored in dpdk mode\n");
4084 auto mbind
= smp_opts
.mbind
.get_value();
4085 if (!thread_affinity
) {
4089 resource::configuration rc
;
4091 smp::_tmain
= std::this_thread::get_id();
4092 resource::cpuset cpu_set
= get_current_cpuset();
4094 if (smp_opts
.cpuset
) {
4095 auto opts_cpuset
= smp_opts
.cpuset
.get_value();
4096 // CPUs that are not available are those pinned by
4097 // --cpuset but not present in current task set
4098 std::set
<unsigned int> not_available_cpus
;
4099 std::set_difference(opts_cpuset
.begin(), opts_cpuset
.end(),
4100 cpu_set
.begin(), cpu_set
.end(),
4101 std::inserter(not_available_cpus
, not_available_cpus
.end()));
4103 if (!not_available_cpus
.empty()) {
4104 std::ostringstream not_available_cpus_list
;
4105 for (auto cpu_id
: not_available_cpus
) {
4106 not_available_cpus_list
<< " " << cpu_id
;
4108 seastar_logger
.error("Bad value for --cpuset:{} not allowed. Shutting down.", not_available_cpus_list
.str());
4111 cpu_set
= opts_cpuset
;
4115 smp::count
= smp_opts
.smp
.get_value();
4117 smp::count
= cpu_set
.size();
4119 std::vector
<reactor
*> reactors(smp::count
);
4120 if (smp_opts
.memory
) {
4121 rc
.total_memory
= parse_memory_size(smp_opts
.memory
.get_value());
4122 #ifdef SEASTAR_HAVE_DPDK
4123 if (smp_opts
.hugepages
&&
4124 !reactor_opts
.network_stack
.get_selected_candidate_name().compare("native") &&
4126 size_t dpdk_memory
= dpdk::eal::mem_size(smp::count
);
4128 if (dpdk_memory
>= rc
.total_memory
) {
4129 std::cerr
<<"Can't run with the given amount of memory: ";
4130 std::cerr
<<smp_opts
.memory
.get_value();
4131 std::cerr
<<". Consider giving more."<<std::endl
;
4136 // Subtract the memory we are about to give to DPDK from the total
4137 // amount of memory we are allowed to use.
4139 rc
.total_memory
.value() -= dpdk_memory
;
4143 if (smp_opts
.reserve_memory
) {
4144 rc
.reserve_memory
= parse_memory_size(smp_opts
.reserve_memory
.get_value());
4146 rc
.reserve_additional_memory
= smp_opts
.reserve_additional_memory
;
4147 std::optional
<std::string
> hugepages_path
;
4148 if (smp_opts
.hugepages
) {
4149 hugepages_path
= smp_opts
.hugepages
.get_value();
4152 if (smp_opts
.lock_memory
) {
4153 mlock
= smp_opts
.lock_memory
.get_value();
4156 auto extra_flags
= 0;
4158 // Linux will serialize faulting in anonymous memory, and also
4159 // serialize marking them as locked. This can take many minutes on
4160 // terabyte class machines, so fault them in the future to spread
4161 // out the cost. This isn't good since we'll see contention if
4162 // multiple shards fault in memory at once, but if that work can be
4163 // in parallel to regular reactor work on other shards.
4164 extra_flags
|= MCL_ONFAULT
; // Linux 4.4+
4166 auto r
= mlockall(MCL_CURRENT
| MCL_FUTURE
| extra_flags
);
4168 // Don't hard fail for now, it's hard to get the configuration right
4169 fmt::print("warning: failed to mlockall: {}\n", strerror(errno
));
4173 rc
.cpus
= smp::count
;
4174 rc
.cpu_set
= std::move(cpu_set
);
4176 disk_config_params disk_config
;
4177 disk_config
.parse_config(smp_opts
, reactor_opts
);
4178 for (auto& id
: disk_config
.device_ids()) {
4179 rc
.devices
.push_back(id
);
4181 rc
.num_io_groups
= disk_config
.num_io_groups();
4183 #ifdef SEASTAR_HAVE_HWLOC
4184 if (smp_opts
.allow_cpus_in_remote_numa_nodes
.get_value()) {
4185 rc
.assign_orphan_cpus
= true;
4189 auto resources
= resource::allocate(rc
);
4190 logger::set_shard_field_width(std::ceil(std::log10(smp::count
)));
4191 std::vector
<resource::cpu
> allocations
= std::move(resources
.cpus
);
4192 if (thread_affinity
) {
4193 smp::pin(allocations
[0].cpu_id
);
4195 if (smp_opts
.memory_allocator
== memory_allocator::seastar
) {
4196 memory::configure(allocations
[0].mem
, mbind
, hugepages_path
);
4199 if (reactor_opts
.abort_on_seastar_bad_alloc
) {
4200 memory::enable_abort_on_allocation_failure();
4203 if (reactor_opts
.dump_memory_diagnostics_on_alloc_failure_kind
) {
4204 memory::set_dump_memory_diagnostics_on_alloc_failure_kind(reactor_opts
.dump_memory_diagnostics_on_alloc_failure_kind
.get_value());
4207 reactor_config reactor_cfg
;
4208 reactor_cfg
.auto_handle_sigint_sigterm
= reactor_opts
._auto_handle_sigint_sigterm
;
4209 reactor_cfg
.max_networking_aio_io_control_blocks
= adjust_max_networking_aio_io_control_blocks(reactor_opts
.max_networking_io_control_blocks
.get_value());
4211 #ifdef SEASTAR_HEAPPROF
4212 bool heapprof_enabled
= reactor_opts
.heapprof
;
4213 if (heapprof_enabled
) {
4214 memory::set_heap_profiling_enabled(heapprof_enabled
);
4217 bool heapprof_enabled
= false;
4220 #ifdef SEASTAR_HAVE_DPDK
4222 dpdk::eal::cpuset cpus
;
4223 for (auto&& a
: allocations
) {
4224 cpus
[a
.cpu_id
] = true;
4226 dpdk::eal::init(cpus
, reactor_opts
._argv0
, hugepages_path
, native_stack
? bool(native_stack
->dpdk_pmd
) : false);
4230 // Better to put it into the smp class, but at smp construction time
4231 // correct smp::count is not known.
4232 boost::barrier
reactors_registered(smp::count
);
4233 boost::barrier
smp_queues_constructed(smp::count
);
4234 // We use shared_ptr since this thread can exit while other threads are still unlocking
4235 auto inited
= std::make_shared
<boost::barrier
>(smp::count
);
4237 auto ioq_topology
= std::move(resources
.ioq_topology
);
4239 // ATTN: The ioq_topology value is referenced by below lambdas which are
4240 // then copied to other shard's threads, so each shard has a copy of the
4241 // ioq_topology on stack, but (!) still references and uses the value
4242 // from shard-0. This access is race-free because
4243 // 1. The .shard_to_group is not modified
4244 // 2. The .queues is pre-resize()-d in advance, so the vector itself
4245 // doesn't change; existing slots are accessed by owning shards only
4246 // without interference
4247 // 3. The .groups manipulations are guarded by the .lock lock (but it's
4248 // also pre-resize()-d in advance)
4250 auto alloc_io_queues
= [&ioq_topology
, &disk_config
] (shard_id shard
) {
4251 for (auto& topo
: ioq_topology
) {
4252 auto& io_info
= topo
.second
;
4253 auto group_idx
= io_info
.shard_to_group
[shard
];
4254 std::shared_ptr
<io_group
> group
;
4257 std::lock_guard
_(io_info
.lock
);
4258 auto& iog
= io_info
.groups
[group_idx
];
4260 struct io_queue::config qcfg
= disk_config
.generate_config(topo
.first
, io_info
.groups
.size());
4261 iog
= std::make_shared
<io_group
>(std::move(qcfg
));
4262 seastar_logger
.debug("allocate {} IO group", group_idx
);
4267 io_info
.queues
[shard
] = std::make_unique
<io_queue
>(std::move(group
), engine()._io_sink
);
4268 seastar_logger
.debug("attached {} queue to {} IO group", shard
, group_idx
);
4272 auto assign_io_queues
= [&ioq_topology
] (shard_id shard
) {
4273 for (auto& topo
: ioq_topology
) {
4274 auto queue
= std::move(topo
.second
.queues
[shard
]);
4276 engine()._io_queues
.emplace(topo
.first
, std::move(queue
));
4278 auto num_io_groups
= topo
.second
.groups
.size();
4279 if (engine()._num_io_groups
== 0) {
4280 engine()._num_io_groups
= num_io_groups
;
4281 } else if (engine()._num_io_groups
!= num_io_groups
) {
4282 throw std::logic_error(format("Number of IO-groups mismatch, {} != {}", engine()._num_io_groups
, num_io_groups
));
4287 _all_event_loops_done
.emplace(smp::count
);
4289 auto backend_selector
= reactor_opts
.reactor_backend
.get_selected_candidate();
4290 seastar_logger
.info("Reactor backend: {}", backend_selector
);
4293 auto smp_tmain
= smp::_tmain
;
4294 for (i
= 1; i
< smp::count
; i
++) {
4295 auto allocation
= allocations
[i
];
4296 create_thread([this, smp_tmain
, inited
, &reactors_registered
, &smp_queues_constructed
, &smp_opts
, &reactor_opts
, &reactors
, hugepages_path
, i
, allocation
, assign_io_queues
, alloc_io_queues
, thread_affinity
, heapprof_enabled
, mbind
, backend_selector
, reactor_cfg
] {
4298 // initialize thread_locals that are equal across all reacto threads of this smp instance
4299 smp::_tmain
= smp_tmain
;
4300 auto thread_name
= seastar::format("reactor-{}", i
);
4301 pthread_setname_np(pthread_self(), thread_name
.c_str());
4302 if (thread_affinity
) {
4303 smp::pin(allocation
.cpu_id
);
4305 if (smp_opts
.memory_allocator
== memory_allocator::seastar
) {
4306 memory::configure(allocation
.mem
, mbind
, hugepages_path
);
4308 if (heapprof_enabled
) {
4309 memory::set_heap_profiling_enabled(heapprof_enabled
);
4313 for (auto sig
: { SIGSEGV
}) {
4314 sigdelset(&mask
, sig
);
4316 auto r
= ::pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
4317 throw_pthread_error(r
);
4318 init_default_smp_service_group(i
);
4319 lowres_clock::update();
4320 allocate_reactor(i
, backend_selector
, reactor_cfg
);
4321 reactors
[i
] = &engine();
4323 reactors_registered
.wait();
4324 smp_queues_constructed
.wait();
4325 // _qs_owner is only initialized here
4326 _qs
= _qs_owner
.get();
4328 assign_io_queues(i
);
4330 engine().configure(reactor_opts
);
4332 } catch (const std::exception
& e
) {
4333 seastar_logger
.error(e
.what());
4339 init_default_smp_service_group(0);
4340 lowres_clock::update();
4342 allocate_reactor(0, backend_selector
, reactor_cfg
);
4343 } catch (const std::exception
& e
) {
4344 seastar_logger
.error(e
.what());
4348 reactors
[0] = &engine();
4351 #ifdef SEASTAR_HAVE_DPDK
4353 auto it
= _thread_loops
.begin();
4354 RTE_LCORE_FOREACH_SLAVE(i
) {
4355 rte_eal_remote_launch(dpdk_thread_adaptor
, static_cast<void*>(&*(it
++)), i
);
4360 reactors_registered
.wait();
4361 _qs_owner
= decltype(smp::_qs_owner
){new smp_message_queue
* [smp::count
], qs_deleter
{}};
4362 _qs
= _qs_owner
.get();
4363 for(unsigned i
= 0; i
< smp::count
; i
++) {
4364 smp::_qs_owner
[i
] = reinterpret_cast<smp_message_queue
*>(operator new[] (sizeof(smp_message_queue
) * smp::count
));
4365 for (unsigned j
= 0; j
< smp::count
; ++j
) {
4366 new (&smp::_qs_owner
[i
][j
]) smp_message_queue(reactors
[j
], reactors
[i
]);
4369 _alien
._qs
= alien::instance::create_qs(reactors
);
4370 smp_queues_constructed
.wait();
4372 assign_io_queues(0);
4375 engine().configure(reactor_opts
);
4378 bool smp::poll_queues() {
4380 for (unsigned i
= 0; i
< count
; i
++) {
4381 if (this_shard_id() != i
) {
4382 auto& rxq
= _qs
[this_shard_id()][i
];
4383 rxq
.flush_response_batch();
4384 got
+= rxq
.has_unflushed_responses();
4385 got
+= rxq
.process_incoming();
4386 auto& txq
= _qs
[i
][this_shard_id()];
4387 txq
.flush_request_batch();
4388 got
+= txq
.process_completions(i
);
4394 bool smp::pure_poll_queues() {
4395 for (unsigned i
= 0; i
< count
; i
++) {
4396 if (this_shard_id() != i
) {
4397 auto& rxq
= _qs
[this_shard_id()][i
];
4398 rxq
.flush_response_batch();
4399 auto& txq
= _qs
[i
][this_shard_id()];
4400 txq
.flush_request_batch();
4401 if (rxq
.pure_poll_rx() || txq
.pure_poll_tx() || rxq
.has_unflushed_responses()) {
4409 __thread reactor
* local_engine
;
4411 void report_exception(std::string_view message
, std::exception_ptr eptr
) noexcept
{
4412 seastar_logger
.error("{}: {}", message
, eptr
);
4415 future
<> check_direct_io_support(std::string_view path
) noexcept
{
4419 std::function
<future
<>()> cleanup
;
4421 static w
parse(sstring path
, std::optional
<directory_entry_type
> type
) {
4423 throw std::invalid_argument(format("Could not open file at {}. Make sure it exists", path
));
4426 if (type
== directory_entry_type::directory
) {
4427 auto fpath
= path
+ "/.o_direct_test";
4428 return w
{fpath
, open_flags::wo
| open_flags::create
| open_flags::truncate
, [fpath
] { return remove_file(fpath
); }};
4429 } else if ((type
== directory_entry_type::regular
) || (type
== directory_entry_type::link
)) {
4430 return w
{path
, open_flags::ro
, [] { return make_ready_future
<>(); }};
4432 throw std::invalid_argument(format("{} neither a directory nor file. Can't be opened with O_DIRECT", path
));
4437 // Allocating memory for a sstring can throw, hence the futurize_invoke
4438 return futurize_invoke([path
] {
4439 return engine().file_type(path
).then([path
= sstring(path
)] (auto type
) {
4440 auto w
= w::parse(path
, type
);
4441 return open_file_dma(w
.path
, w
.flags
).then_wrapped([path
= w
.path
, cleanup
= std::move(w
.cleanup
)] (future
<file
> f
) {
4444 return cleanup().finally([fd
= std::move(fd
)] () mutable {
4447 } catch (std::system_error
& e
) {
4448 if (e
.code() == std::error_code(EINVAL
, std::system_category())) {
4449 report_exception(format("Could not open file at {}. Does your filesystem support O_DIRECT?", path
), std::current_exception());
4458 server_socket
listen(socket_address sa
) {
4459 return engine().listen(sa
);
4462 server_socket
listen(socket_address sa
, listen_options opts
) {
4463 return engine().listen(sa
, opts
);
4466 future
<connected_socket
> connect(socket_address sa
) {
4467 return engine().connect(sa
);
4470 future
<connected_socket
> connect(socket_address sa
, socket_address local
, transport proto
= transport::TCP
) {
4471 return engine().connect(sa
, local
, proto
);
4474 socket
make_socket() {
4475 return engine().net().socket();
4478 net::udp_channel
make_udp_channel() {
4479 return engine().net().make_udp_channel();
4482 net::udp_channel
make_udp_channel(const socket_address
& local
) {
4483 return engine().net().make_udp_channel(local
);
4486 void reactor::add_high_priority_task(task
* t
) noexcept
{
4488 // break .then() chains
4489 request_preemption();
4493 void set_idle_cpu_handler(idle_cpu_handler
&& handler
) {
4494 engine().set_idle_cpu_handler(std::move(handler
));
4497 namespace experimental
{
4498 future
<std::tuple
<file_desc
, file_desc
>> make_pipe() {
4499 return engine().make_pipe();
4502 future
<process
> spawn_process(const std::filesystem::path
& pathname
,
4503 spawn_parameters params
) {
4504 return process::spawn(pathname
, std::move(params
));
4507 future
<process
> spawn_process(const std::filesystem::path
& pathname
) {
4508 return process::spawn(pathname
);
4515 return fs::exists("/sys/hypervisor/type");
4518 std::chrono::nanoseconds
4519 reactor::calculate_poll_time() {
4520 // In a non-virtualized environment, select a poll time
4521 // that is competitive with halt/unhalt.
4523 // In a virutalized environment, IPIs are slow and dominate
4524 // sleep/wake (mprotect/tgkill), so increase poll time to reduce
4525 // so we don't sleep in a request/reply workload
4526 return virtualized() ? 2000us
: 200us
;
4531 memory::scoped_critical_alloc_section _
;
4532 auto tsk
= make_task([] {});
4534 return tsk
->get_future();
4537 future
<> check_for_io_immediately() noexcept
{
4538 memory::scoped_critical_alloc_section _
;
4539 engine().force_poll();
4540 auto tsk
= make_task(default_scheduling_group(), [] {});
4542 return tsk
->get_future();
4545 future
<> later() noexcept
{
4546 return check_for_io_immediately();
4549 void add_to_flush_poller(output_stream
<char>& os
) noexcept
{
4550 engine()._flush_batching
.push_back(os
);
4553 steady_clock_type::duration
reactor::total_idle_time() {
4557 steady_clock_type::duration
reactor::total_busy_time() {
4558 return now() - _start_time
- _total_idle
;
4561 std::chrono::nanoseconds
reactor::total_steal_time() {
4562 // Steal time: this mimics the concept some Hypervisors have about Steal time.
4563 // That is the time in which a VM has something to run, but is not running because some other
4564 // process (another VM or the hypervisor itself) is in control.
4566 // For us, we notice that during the time in which we were not sleeping (either running or busy
4567 // polling while idle), we should be accumulating thread runtime. If we are not, that's because
4568 // someone stole it from us.
4570 // Because this is totally in userspace we can miss some events. For instance, if the seastar
4571 // process is ready to run but the kernel hasn't scheduled us yet, that would be technically
4572 // steal time but we have no ways to account it.
4574 // But what we have here should be good enough and at least has a well defined meaning.
4575 return std::chrono::duration_cast
<std::chrono::nanoseconds
>(now() - _start_time
- _total_sleep
) -
4576 std::chrono::duration_cast
<std::chrono::nanoseconds
>(thread_cputime_clock::now().time_since_epoch());
4579 static std::atomic
<unsigned long> s_used_scheduling_group_ids_bitmap
{3}; // 0=main, 1=atexit
4580 static std::atomic
<unsigned long> s_next_scheduling_group_specific_key
{0};
4584 allocate_scheduling_group_id() noexcept
{
4585 static_assert(max_scheduling_groups() <= std::numeric_limits
<unsigned long>::digits
, "more scheduling groups than available bits");
4586 auto b
= s_used_scheduling_group_ids_bitmap
.load(std::memory_order_relaxed
);
4590 if (__builtin_popcountl(b
) == max_scheduling_groups()) {
4593 i
= count_trailing_zeros(~b
);
4594 nb
= b
| (1ul << i
);
4595 } while (!s_used_scheduling_group_ids_bitmap
.compare_exchange_weak(b
, nb
, std::memory_order_relaxed
));
4601 allocate_scheduling_group_specific_key() noexcept
{
4602 return s_next_scheduling_group_specific_key
.fetch_add(1, std::memory_order_relaxed
);
4607 deallocate_scheduling_group_id(unsigned id
) noexcept
{
4608 s_used_scheduling_group_ids_bitmap
.fetch_and(~(1ul << id
), std::memory_order_relaxed
);
4612 reactor::allocate_scheduling_group_specific_data(scheduling_group sg
, scheduling_group_key key
) {
4613 auto& sg_data
= _scheduling_group_specific_data
;
4614 auto& this_sg
= sg_data
.per_scheduling_group_data
[sg
._id
];
4615 this_sg
.specific_vals
.resize(std::max
<size_t>(this_sg
.specific_vals
.size(), key
.id()+1));
4616 this_sg
.specific_vals
[key
.id()] =
4617 aligned_alloc(sg_data
.scheduling_group_key_configs
[key
.id()].alignment
,
4618 sg_data
.scheduling_group_key_configs
[key
.id()].allocation_size
);
4619 if (!this_sg
.specific_vals
[key
.id()]) {
4622 if (sg_data
.scheduling_group_key_configs
[key
.id()].constructor
) {
4623 sg_data
.scheduling_group_key_configs
[key
.id()].constructor(this_sg
.specific_vals
[key
.id()]);
4628 reactor::init_scheduling_group(seastar::scheduling_group sg
, sstring name
, float shares
) {
4629 auto& sg_data
= _scheduling_group_specific_data
;
4630 auto& this_sg
= sg_data
.per_scheduling_group_data
[sg
._id
];
4631 this_sg
.queue_is_initialized
= true;
4632 _task_queues
.resize(std::max
<size_t>(_task_queues
.size(), sg
._id
+ 1));
4633 _task_queues
[sg
._id
] = std::make_unique
<task_queue
>(sg
._id
, name
, shares
);
4634 unsigned long num_keys
= s_next_scheduling_group_specific_key
.load(std::memory_order_relaxed
);
4636 return with_scheduling_group(sg
, [this, num_keys
, sg
] () {
4637 for (unsigned long key_id
= 0; key_id
< num_keys
; key_id
++) {
4638 allocate_scheduling_group_specific_data(sg
, scheduling_group_key(key_id
));
4644 reactor::init_new_scheduling_group_key(scheduling_group_key key
, scheduling_group_key_config cfg
) {
4645 auto& sg_data
= _scheduling_group_specific_data
;
4646 sg_data
.scheduling_group_key_configs
.resize(std::max
<size_t>(sg_data
.scheduling_group_key_configs
.size(), key
.id() + 1));
4647 sg_data
.scheduling_group_key_configs
[key
.id()] = cfg
;
4648 return parallel_for_each(_task_queues
, [this, cfg
, key
] (std::unique_ptr
<task_queue
>& tq
) {
4650 scheduling_group sg
= scheduling_group(tq
->_id
);
4651 return with_scheduling_group(sg
, [this, key
, sg
] () {
4652 allocate_scheduling_group_specific_data(sg
, key
);
4655 return make_ready_future();
4660 reactor::destroy_scheduling_group(scheduling_group sg
) noexcept
{
4661 if (sg
._id
>= max_scheduling_groups()) {
4662 on_fatal_internal_error(seastar_logger
, format("Invalid scheduling_group {}", sg
._id
));
4664 return with_scheduling_group(sg
, [this, sg
] () {
4665 auto& sg_data
= _scheduling_group_specific_data
;
4666 auto& this_sg
= sg_data
.per_scheduling_group_data
[sg
._id
];
4667 for (unsigned long key_id
= 0; key_id
< sg_data
.scheduling_group_key_configs
.size(); key_id
++) {
4668 void* val
= this_sg
.specific_vals
[key_id
];
4670 if (sg_data
.scheduling_group_key_configs
[key_id
].destructor
) {
4671 sg_data
.scheduling_group_key_configs
[key_id
].destructor(val
);
4674 this_sg
.specific_vals
[key_id
] = nullptr;
4677 }).then( [this, sg
] () {
4678 auto& sg_data
= _scheduling_group_specific_data
;
4679 auto& this_sg
= sg_data
.per_scheduling_group_data
[sg
._id
];
4680 this_sg
.queue_is_initialized
= false;
4681 _task_queues
[sg
._id
].reset();
4687 internal::no_such_scheduling_group(scheduling_group sg
) {
4688 throw std::invalid_argument(format("The scheduling group does not exist ({})", internal::scheduling_group_index(sg
)));
4692 scheduling_group::name() const noexcept
{
4693 return engine()._task_queues
[_id
]->_name
;
4697 scheduling_group::set_shares(float shares
) noexcept
{
4698 engine()._task_queues
[_id
]->set_shares(shares
);
4701 future
<scheduling_group
>
4702 create_scheduling_group(sstring name
, float shares
) noexcept
{
4703 auto aid
= allocate_scheduling_group_id();
4705 return make_exception_future
<scheduling_group
>(std::runtime_error(fmt::format("Scheduling group limit exceeded while creating {}", name
)));
4707 auto id
= static_cast<unsigned>(aid
);
4708 assert(id
< max_scheduling_groups());
4709 auto sg
= scheduling_group(id
);
4710 return smp::invoke_on_all([sg
, name
, shares
] {
4711 return engine().init_scheduling_group(sg
, name
, shares
);
4713 return make_ready_future
<scheduling_group
>(sg
);
4717 future
<scheduling_group_key
>
4718 scheduling_group_key_create(scheduling_group_key_config cfg
) noexcept
{
4719 scheduling_group_key key
= allocate_scheduling_group_specific_key();
4720 return smp::invoke_on_all([key
, cfg
] {
4721 return engine().init_new_scheduling_group_key(key
, cfg
);
4723 return make_ready_future
<scheduling_group_key
>(key
);
4728 rename_priority_class(io_priority_class pc
, sstring new_name
) {
4729 return pc
.rename(std::move(new_name
));
4733 destroy_scheduling_group(scheduling_group sg
) noexcept
{
4734 if (sg
== default_scheduling_group()) {
4735 return make_exception_future
<>(make_backtraced_exception_ptr
<std::runtime_error
>("Attempt to destroy the default scheduling group"));
4737 if (sg
== current_scheduling_group()) {
4738 return make_exception_future
<>(make_backtraced_exception_ptr
<std::runtime_error
>("Attempt to destroy the current scheduling group"));
4740 return smp::invoke_on_all([sg
] {
4741 return engine().destroy_scheduling_group(sg
);
4743 deallocate_scheduling_group_id(sg
._id
);
4748 rename_scheduling_group(scheduling_group sg
, sstring new_name
) noexcept
{
4749 if (sg
== default_scheduling_group()) {
4750 return make_exception_future
<>(make_backtraced_exception_ptr
<std::runtime_error
>("Attempt to rename the default scheduling group"));
4752 return smp::invoke_on_all([sg
, new_name
] {
4753 engine()._task_queues
[sg
._id
]->rename(new_name
);
4757 namespace internal
{
4760 sched_clock::duration
4761 timeval_to_duration(::timeval tv
) {
4762 return std::chrono::seconds(tv
.tv_sec
) + std::chrono::microseconds(tv
.tv_usec
);
4765 class reactor_stall_sampler
: public reactor::pollfn
{
4766 sched_clock::time_point _run_start
;
4767 ::rusage _run_start_rusage
;
4768 uint64_t _kernel_stalls
= 0;
4769 sched_clock::duration _nonsleep_cpu_time
= {};
4770 sched_clock::duration _nonsleep_wall_time
= {};
4772 static ::rusage
get_rusage() {
4774 ::getrusage(RUSAGE_THREAD
, &ru
);
4777 static sched_clock::duration
cpu_time(const ::rusage
& ru
) {
4778 return timeval_to_duration(ru
.ru_stime
) + timeval_to_duration(ru
.ru_utime
);
4780 void mark_run_start() {
4781 _run_start
= reactor::now();
4782 _run_start_rusage
= get_rusage();
4784 void mark_run_end() {
4785 auto start_nvcsw
= _run_start_rusage
.ru_nvcsw
;
4786 auto start_cpu_time
= cpu_time(_run_start_rusage
);
4787 auto start_time
= _run_start
;
4788 _run_start
= reactor::now();
4789 _run_start_rusage
= get_rusage();
4790 _kernel_stalls
+= _run_start_rusage
.ru_nvcsw
- start_nvcsw
;
4791 _nonsleep_cpu_time
+= cpu_time(_run_start_rusage
) - start_cpu_time
;
4792 _nonsleep_wall_time
+= _run_start
- start_time
;
4795 reactor_stall_sampler() { mark_run_start(); }
4796 virtual bool poll() override
{ return false; }
4797 virtual bool pure_poll() override
{ return false; }
4798 virtual bool try_enter_interrupt_mode() override
{
4799 // try_enter_interrupt_mode marks the end of a reactor run that should be context-switch free
4803 virtual void exit_interrupt_mode() override
{
4804 // start a reactor run that should be context switch free
4807 stall_report
report() const {
4809 // mark_run_end() with an immediate mark_run_start() is logically a no-op,
4810 // but each one of them has an effect, so they can't be marked const
4811 const_cast<reactor_stall_sampler
*>(this)->mark_run_end();
4812 r
.kernel_stalls
= _kernel_stalls
;
4813 r
.run_wall_time
= _nonsleep_wall_time
;
4814 r
.stall_time
= _nonsleep_wall_time
- _nonsleep_cpu_time
;
4815 const_cast<reactor_stall_sampler
*>(this)->mark_run_start();
4820 future
<stall_report
>
4821 report_reactor_stalls(noncopyable_function
<future
<> ()> uut
) {
4822 auto reporter
= std::make_unique
<reactor_stall_sampler
>();
4823 auto p_reporter
= reporter
.get();
4824 auto poller
= reactor::poller(std::move(reporter
));
4825 return uut().then([poller
= std::move(poller
), p_reporter
] () mutable {
4826 return p_reporter
->report();
4830 std::ostream
& operator<<(std::ostream
& os
, const stall_report
& sr
) {
4831 auto to_ms
= [] (sched_clock::duration d
) -> float {
4832 return std::chrono::duration
<float>(d
) / 1ms
;
4834 return os
<< format("{} stalls, {} ms stall time, {} ms run time", sr
.kernel_stalls
, to_ms(sr
.stall_time
), to_ms(sr
.run_wall_time
));
4837 size_t scheduling_group_count() {
4838 auto b
= s_used_scheduling_group_ids_bitmap
.load(std::memory_order_relaxed
);
4839 return __builtin_popcountl(b
);
4844 #ifdef SEASTAR_TASK_BACKTRACE
4846 void task::make_backtrace() noexcept
{
4847 memory::disable_backtrace_temporarily dbt
;
4849 _bt
= make_lw_shared
<simple_backtrace
>(current_backtrace_tasklocal());