]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/reactor.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / src / core / reactor.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2014 Cloudius Systems
20 */
21
22 #define __user /* empty */ // for xfs includes, below
23
24 #include <cinttypes>
25 #include <spawn.h>
26 #include <sys/syscall.h>
27 #include <sys/vfs.h>
28 #include <sys/statfs.h>
29 #include <sys/time.h>
30 #include <sys/resource.h>
31 #include <sys/inotify.h>
32 #include <sys/wait.h>
33 #include <fmt/ranges.h>
34 #include <seastar/core/task.hh>
35 #include <seastar/core/reactor.hh>
36 #include <seastar/core/memory.hh>
37 #include <seastar/core/posix.hh>
38 #include <seastar/core/sleep.hh>
39 #include <seastar/net/packet.hh>
40 #include <seastar/net/stack.hh>
41 #include <seastar/net/posix-stack.hh>
42 #include <seastar/net/native-stack.hh>
43 #include <seastar/core/resource.hh>
44 #include <seastar/core/print.hh>
45 #include "core/scollectd-impl.hh"
46 #include <seastar/util/conversions.hh>
47 #include <seastar/util/process.hh>
48 #include <seastar/core/loop.hh>
49 #include <seastar/core/when_all.hh>
50 #include <seastar/core/with_scheduling_group.hh>
51 #include <seastar/core/thread.hh>
52 #include <seastar/core/make_task.hh>
53 #include <seastar/core/systemwide_memory_barrier.hh>
54 #include <seastar/core/report_exception.hh>
55 #include <seastar/core/stall_sampler.hh>
56 #include <seastar/core/thread_cputime_clock.hh>
57 #include <seastar/core/abort_on_ebadf.hh>
58 #include <seastar/core/io_queue.hh>
59 #include <seastar/core/internal/buffer_allocator.hh>
60 #include <seastar/core/internal/io_desc.hh>
61 #include <seastar/core/internal/uname.hh>
62 #include <seastar/core/scheduling_specific.hh>
63 #include <seastar/core/smp_options.hh>
64 #include <seastar/util/log.hh>
65 #include <seastar/util/read_first_line.hh>
66 #include "core/file-impl.hh"
67 #include "core/reactor_backend.hh"
68 #include "core/syscall_result.hh"
69 #include "core/thread_pool.hh"
70 #include "syscall_work_queue.hh"
71 #include "cgroup.hh"
72 #include <cassert>
73 #include <cmath>
74 #include <unistd.h>
75 #include <fcntl.h>
76 #include <sys/eventfd.h>
77 #include <sys/poll.h>
78 #include <boost/lexical_cast.hpp>
79 #include <boost/thread/barrier.hpp>
80 #include <boost/algorithm/string/classification.hpp>
81 #include <boost/algorithm/string/split.hpp>
82 #include <boost/iterator/counting_iterator.hpp>
83 #include <boost/range/numeric.hpp>
84 #include <boost/range/algorithm/sort.hpp>
85 #include <boost/range/algorithm/remove_if.hpp>
86 #include <boost/range/algorithm/find_if.hpp>
87 #include <boost/algorithm/clamp.hpp>
88 #include <boost/range/adaptor/transformed.hpp>
89 #include <boost/range/adaptor/map.hpp>
90 #include <boost/version.hpp>
91 #include <atomic>
92 #include <dirent.h>
93 #include <linux/types.h> // for xfs, below
94 #include <sys/ioctl.h>
95 #include <linux/perf_event.h>
96 #include <xfs/linux.h>
97 #define min min /* prevent xfs.h from defining min() as a macro */
98 #include <xfs/xfs.h>
99 #undef min
100 #ifdef SEASTAR_HAVE_DPDK
101 #include <seastar/core/dpdk_rte.hh>
102 #include <rte_lcore.h>
103 #include <rte_launch.h>
104 #endif
105 #include <seastar/core/prefetch.hh>
106 #include <exception>
107 #include <regex>
108 #include <fstream>
109 #ifdef __GNUC__
110 #include <iostream>
111 #include <system_error>
112 #include <cxxabi.h>
113 #endif
114
115 #ifdef SEASTAR_SHUFFLE_TASK_QUEUE
116 #include <random>
117 #endif
118
119 #include <sys/mman.h>
120 #include <sys/utsname.h>
121 #include <linux/falloc.h>
122 #include <seastar/util/backtrace.hh>
123 #include <seastar/util/spinlock.hh>
124 #include <seastar/util/print_safe.hh>
125 #include <sys/sdt.h>
126
127 #ifdef HAVE_OSV
128 #include <osv/newpoll.hh>
129 #endif
130
131 #if defined(__x86_64__) || defined(__i386__)
132 #include <xmmintrin.h>
133 #endif
134
135 #include <seastar/util/defer.hh>
136 #include <seastar/core/alien.hh>
137 #include <seastar/core/internal/stall_detector.hh>
138 #include <seastar/core/metrics.hh>
139 #include <seastar/core/execution_stage.hh>
140 #include <seastar/core/exception_hacks.hh>
141 #include <seastar/util/memory_diagnostics.hh>
142 #include <seastar/util/internal/iovec_utils.hh>
143 #include <seastar/util/internal/magic.hh>
144
145 #include <yaml-cpp/yaml.h>
146
147 #ifdef SEASTAR_TASK_HISTOGRAM
148 #include <typeinfo>
149 #endif
150
151 namespace seastar {
152
153 static_assert(posix::shutdown_mask(SHUT_RD) == posix::rcv_shutdown);
154 static_assert(posix::shutdown_mask(SHUT_WR) == posix::snd_shutdown);
155 static_assert(posix::shutdown_mask(SHUT_RDWR) == (posix::snd_shutdown | posix::rcv_shutdown));
156
157 struct mountpoint_params {
158 std::string mountpoint = "none";
159 uint64_t read_bytes_rate = std::numeric_limits<uint64_t>::max();
160 uint64_t write_bytes_rate = std::numeric_limits<uint64_t>::max();
161 uint64_t read_req_rate = std::numeric_limits<uint64_t>::max();
162 uint64_t write_req_rate = std::numeric_limits<uint64_t>::max();
163 uint64_t read_saturation_length = std::numeric_limits<uint64_t>::max();
164 uint64_t write_saturation_length = std::numeric_limits<uint64_t>::max();
165 bool duplex = false;
166 float rate_factor = 1.0;
167 };
168
169 }
170
171 namespace YAML {
172 template<>
173 struct convert<seastar::mountpoint_params> {
174 static bool decode(const Node& node, seastar::mountpoint_params& mp) {
175 using namespace seastar;
176 mp.mountpoint = node["mountpoint"].as<std::string>().c_str();
177 mp.read_bytes_rate = parse_memory_size(node["read_bandwidth"].as<std::string>());
178 mp.read_req_rate = parse_memory_size(node["read_iops"].as<std::string>());
179 mp.write_bytes_rate = parse_memory_size(node["write_bandwidth"].as<std::string>());
180 mp.write_req_rate = parse_memory_size(node["write_iops"].as<std::string>());
181 if (node["read_saturation_length"]) {
182 mp.read_saturation_length = parse_memory_size(node["read_saturation_length"].as<std::string>());
183 }
184 if (node["write_saturation_length"]) {
185 mp.write_saturation_length = parse_memory_size(node["write_saturation_length"].as<std::string>());
186 }
187 if (node["duplex"]) {
188 mp.duplex = node["duplex"].as<bool>();
189 }
190 if (node["rate_factor"]) {
191 mp.rate_factor = node["rate_factor"].as<float>();
192 }
193 return true;
194 }
195 };
196 }
197
198 namespace seastar {
199
200 seastar::logger seastar_logger("seastar");
201 seastar::logger sched_logger("scheduler");
202
203 shard_id reactor::cpu_id() const {
204 assert(_id == this_shard_id());
205 return _id;
206 }
207
208 io_priority_class
209 reactor::register_one_priority_class(sstring name, uint32_t shares) {
210 return io_priority_class::register_one(std::move(name), shares);
211 }
212
213 future<>
214 reactor::update_shares_for_class(io_priority_class pc, uint32_t shares) {
215 return pc.update_shares(shares);
216 }
217
218 future<>
219 reactor::rename_priority_class(io_priority_class pc, sstring new_name) noexcept {
220 return pc.rename(std::move(new_name));
221 }
222
223 void reactor::update_shares_for_queues(io_priority_class pc, uint32_t shares) {
224 for (auto&& q : _io_queues) {
225 q.second->update_shares_for_class(pc, shares);
226 }
227 }
228
229 future<> reactor::update_bandwidth_for_queues(io_priority_class pc, uint64_t bandwidth) {
230 return smp::invoke_on_all([pc, bandwidth = bandwidth / _num_io_groups] {
231 return parallel_for_each(engine()._io_queues, [pc, bandwidth] (auto& queue) {
232 return queue.second->update_bandwidth_for_class(pc, bandwidth);
233 });
234 });
235 }
236
237 void reactor::rename_queues(io_priority_class pc, sstring new_name) {
238 for (auto&& queue : _io_queues) {
239 queue.second->rename_priority_class(pc, new_name);
240 }
241 }
242
243 future<std::tuple<pollable_fd, socket_address>>
244 reactor::do_accept(pollable_fd_state& listenfd) {
245 return readable_or_writeable(listenfd).then([this, &listenfd] () mutable {
246 socket_address sa;
247 listenfd.maybe_no_more_recv();
248 auto maybe_fd = listenfd.fd.try_accept(sa, SOCK_NONBLOCK | SOCK_CLOEXEC);
249 if (!maybe_fd) {
250 // We speculated that we will have an another connection, but got a false
251 // positive. Try again without speculation.
252 return do_accept(listenfd);
253 }
254 // Speculate that there is another connection on this listening socket, to avoid
255 // a task-quota delay. Usually this will fail, but accept is a rare-enough operation
256 // that it is worth the false positive in order to withstand a connection storm
257 // without having to accept at a rate of 1 per task quota.
258 listenfd.speculate_epoll(EPOLLIN);
259 pollable_fd pfd(std::move(*maybe_fd), pollable_fd::speculation(EPOLLOUT));
260 return make_ready_future<std::tuple<pollable_fd, socket_address>>(std::make_tuple(std::move(pfd), std::move(sa)));
261 });
262 }
263
264 future<> reactor::do_connect(pollable_fd_state& pfd, socket_address& sa) {
265 pfd.fd.connect(sa.u.sa, sa.length());
266 return pfd.writeable().then([&pfd]() mutable {
267 auto err = pfd.fd.getsockopt<int>(SOL_SOCKET, SO_ERROR);
268 if (err != 0) {
269 throw std::system_error(err, std::system_category());
270 }
271 return make_ready_future<>();
272 });
273 }
274
275 future<size_t>
276 reactor::do_read(pollable_fd_state& fd, void* buffer, size_t len) {
277 return readable(fd).then([this, &fd, buffer, len] () mutable {
278 auto r = fd.fd.read(buffer, len);
279 if (!r) {
280 return do_read(fd, buffer, len);
281 }
282 if (size_t(*r) == len) {
283 fd.speculate_epoll(EPOLLIN);
284 }
285 return make_ready_future<size_t>(*r);
286 });
287 }
288
289 future<temporary_buffer<char>>
290 reactor::do_read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) {
291 return fd.readable().then([this, &fd, ba] {
292 auto buffer = ba->allocate_buffer();
293 auto r = fd.fd.read(buffer.get_write(), buffer.size());
294 if (!r) {
295 // Speculation failure, try again with real polling this time
296 // Note we release the buffer and will reallocate it when poll
297 // completes.
298 return do_read_some(fd, ba);
299 }
300 if (size_t(*r) == buffer.size()) {
301 fd.speculate_epoll(EPOLLIN);
302 }
303 buffer.trim(*r);
304 return make_ready_future<temporary_buffer<char>>(std::move(buffer));
305 });
306 }
307
308 future<size_t>
309 reactor::do_recvmsg(pollable_fd_state& fd, const std::vector<iovec>& iov) {
310 return readable(fd).then([this, &fd, iov = iov] () mutable {
311 ::msghdr mh = {};
312 mh.msg_iov = &iov[0];
313 mh.msg_iovlen = iov.size();
314 auto r = fd.fd.recvmsg(&mh, 0);
315 if (!r) {
316 return do_recvmsg(fd, iov);
317 }
318 if (size_t(*r) == internal::iovec_len(iov)) {
319 fd.speculate_epoll(EPOLLIN);
320 }
321 return make_ready_future<size_t>(*r);
322 });
323 }
324
325 future<size_t>
326 reactor::do_send(pollable_fd_state& fd, const void* buffer, size_t len) {
327 return writeable(fd).then([this, &fd, buffer, len] () mutable {
328 auto r = fd.fd.send(buffer, len, MSG_NOSIGNAL);
329 if (!r) {
330 return do_send(fd, buffer, len);
331 }
332 if (size_t(*r) == len) {
333 fd.speculate_epoll(EPOLLOUT);
334 }
335 return make_ready_future<size_t>(*r);
336 });
337 }
338
339 future<size_t>
340 reactor::do_sendmsg(pollable_fd_state& fd, net::packet& p) {
341 return writeable(fd).then([this, &fd, &p] () mutable {
342 static_assert(offsetof(iovec, iov_base) == offsetof(net::fragment, base) &&
343 sizeof(iovec::iov_base) == sizeof(net::fragment::base) &&
344 offsetof(iovec, iov_len) == offsetof(net::fragment, size) &&
345 sizeof(iovec::iov_len) == sizeof(net::fragment::size) &&
346 alignof(iovec) == alignof(net::fragment) &&
347 sizeof(iovec) == sizeof(net::fragment)
348 , "net::fragment and iovec should be equivalent");
349
350 iovec* iov = reinterpret_cast<iovec*>(p.fragment_array());
351 msghdr mh = {};
352 mh.msg_iov = iov;
353 mh.msg_iovlen = std::min<size_t>(p.nr_frags(), IOV_MAX);
354 auto r = fd.fd.sendmsg(&mh, MSG_NOSIGNAL);
355 if (!r) {
356 return do_sendmsg(fd, p);
357 }
358 if (size_t(*r) == p.len()) {
359 fd.speculate_epoll(EPOLLOUT);
360 }
361 return make_ready_future<size_t>(*r);
362 });
363 }
364
365 future<>
366 reactor::send_all_part(pollable_fd_state& fd, const void* buffer, size_t len, size_t completed) {
367 if (completed == len) {
368 return make_ready_future<>();
369 } else {
370 return _backend->send(fd, static_cast<const char*>(buffer) + completed, len - completed).then(
371 [&fd, buffer, len, completed, this] (size_t part) mutable {
372 return send_all_part(fd, buffer, len, completed + part);
373 });
374 }
375 }
376
377
378 future<temporary_buffer<char>>
379 reactor::do_recv_some(pollable_fd_state& fd, internal::buffer_allocator* ba) {
380 return fd.readable().then([this, &fd, ba] {
381 auto buffer = ba->allocate_buffer();
382 auto r = fd.fd.recv(buffer.get_write(), buffer.size(), MSG_DONTWAIT);
383 if (!r) {
384 return do_recv_some(fd, ba);
385 }
386 if (size_t(*r) == buffer.size()) {
387 fd.speculate_epoll(EPOLLIN);
388 }
389 buffer.trim(*r);
390 return make_ready_future<temporary_buffer<char>>(std::move(buffer));
391 });
392 }
393
394 future<>
395 reactor::send_all(pollable_fd_state& fd, const void* buffer, size_t len) {
396 assert(len);
397 return send_all_part(fd, buffer, len, 0);
398 }
399
400 future<size_t> pollable_fd_state::read_some(char* buffer, size_t size) {
401 return engine()._backend->read(*this, buffer, size);
402 }
403
404 future<size_t> pollable_fd_state::read_some(uint8_t* buffer, size_t size) {
405 return engine()._backend->read(*this, buffer, size);
406 }
407
408 future<size_t> pollable_fd_state::read_some(const std::vector<iovec>& iov) {
409 return engine()._backend->recvmsg(*this, iov);
410 }
411
412 future<temporary_buffer<char>> pollable_fd_state::read_some(internal::buffer_allocator* ba) {
413 return engine()._backend->read_some(*this, ba);
414 }
415
416 future<size_t> pollable_fd_state::write_some(net::packet& p) {
417 return engine()._backend->sendmsg(*this, p);
418 }
419
420 future<> pollable_fd_state::write_all(const char* buffer, size_t size) {
421 return engine().send_all(*this, buffer, size);
422 }
423
424 future<> pollable_fd_state::write_all(const uint8_t* buffer, size_t size) {
425 return engine().send_all(*this, buffer, size);
426 }
427
428 future<> pollable_fd_state::write_all(net::packet& p) {
429 return write_some(p).then([this, &p] (size_t size) {
430 if (p.len() == size) {
431 return make_ready_future<>();
432 }
433 p.trim_front(size);
434 return write_all(p);
435 });
436 }
437
438 future<> pollable_fd_state::readable() {
439 return engine().readable(*this);
440 }
441
442 future<> pollable_fd_state::writeable() {
443 return engine().writeable(*this);
444 }
445
446 future<> pollable_fd_state::poll_rdhup() {
447 return engine().poll_rdhup(*this);
448 }
449
450 future<> pollable_fd_state::readable_or_writeable() {
451 return engine().readable_or_writeable(*this);
452 }
453
454 future<std::tuple<pollable_fd, socket_address>> pollable_fd_state::accept() {
455 return engine()._backend->accept(*this);
456 }
457
458 future<> pollable_fd_state::connect(socket_address& sa) {
459 return engine()._backend->connect(*this, sa);
460 }
461
462 future<temporary_buffer<char>> pollable_fd_state::recv_some(internal::buffer_allocator* ba) {
463 maybe_no_more_recv();
464 return engine()._backend->recv_some(*this, ba);
465 }
466
467 future<size_t> pollable_fd_state::recvmsg(struct msghdr *msg) {
468 maybe_no_more_recv();
469 return engine().readable(*this).then([this, msg] {
470 auto r = fd.recvmsg(msg, 0);
471 if (!r) {
472 return recvmsg(msg);
473 }
474 // We always speculate here to optimize for throughput in a workload
475 // with multiple outstanding requests. This way the caller can consume
476 // all messages without resorting to epoll. However this adds extra
477 // recvmsg() call when we hit the empty queue condition, so it may
478 // hurt request-response workload in which the queue is empty when we
479 // initially enter recvmsg(). If that turns out to be a problem, we can
480 // improve speculation by using recvmmsg().
481 speculate_epoll(EPOLLIN);
482 return make_ready_future<size_t>(*r);
483 });
484 }
485
486 future<size_t> pollable_fd_state::sendmsg(struct msghdr* msg) {
487 maybe_no_more_send();
488 return engine().writeable(*this).then([this, msg] () mutable {
489 auto r = fd.sendmsg(msg, 0);
490 if (!r) {
491 return sendmsg(msg);
492 }
493 // For UDP this will always speculate. We can't know if there's room
494 // or not, but most of the time there should be so the cost of mis-
495 // speculation is amortized.
496 if (size_t(*r) == internal::iovec_len(msg->msg_iov, msg->msg_iovlen)) {
497 speculate_epoll(EPOLLOUT);
498 }
499 return make_ready_future<size_t>(*r);
500 });
501 }
502
503 future<size_t> pollable_fd_state::sendto(socket_address addr, const void* buf, size_t len) {
504 maybe_no_more_send();
505 return engine().writeable(*this).then([this, buf, len, addr] () mutable {
506 auto r = fd.sendto(addr, buf, len, 0);
507 if (!r) {
508 return sendto(std::move(addr), buf, len);
509 }
510 // See the comment about speculation in sendmsg().
511 if (size_t(*r) == len) {
512 speculate_epoll(EPOLLOUT);
513 }
514 return make_ready_future<size_t>(*r);
515 });
516 }
517
518 namespace internal {
519
520 void set_need_preempt_var(const preemption_monitor* np) {
521 get_need_preempt_var() = np;
522 }
523
524 #ifdef SEASTAR_TASK_HISTOGRAM
525
526 class task_histogram {
527 static constexpr unsigned max_countdown = 1'000'000;
528 std::unordered_map<std::type_index, uint64_t> _histogram;
529 unsigned _countdown_to_print = max_countdown;
530 public:
531 void add(const task& t) {
532 ++_histogram[std::type_index(typeid(t))];
533 if (!--_countdown_to_print) {
534 print();
535 _countdown_to_print = max_countdown;
536 _histogram.clear();
537 }
538 }
539 void print() const {
540 seastar::fmt::print("task histogram, {:d} task types {:d} tasks\n", _histogram.size(), max_countdown - _countdown_to_print);
541 for (auto&& type_count : _histogram) {
542 auto&& type = type_count.first;
543 auto&& count = type_count.second;
544 seastar::fmt::print(" {:10d} {}\n", count, type.name());
545 }
546 }
547 };
548
549 thread_local task_histogram this_thread_task_histogram;
550
551 #endif
552
553 void task_histogram_add_task(const task& t) {
554 #ifdef SEASTAR_TASK_HISTOGRAM
555 this_thread_task_histogram.add(t);
556 #endif
557 }
558
559 }
560
561 using namespace std::chrono_literals;
562 namespace fs = std::filesystem;
563
564 using namespace net;
565
566 using namespace internal::linux_abi;
567
568 std::atomic<manual_clock::rep> manual_clock::_now;
569
570 constexpr unsigned reactor::max_queues;
571 constexpr unsigned reactor::max_aio_per_queue;
572
573 // Base version where this works; some filesystems were only fixed later, so
574 // this value is mixed in with filesystem-provided values later.
575 bool aio_nowait_supported = internal::kernel_uname().whitelisted({"4.13"});
576
577 static bool sched_debug() {
578 return false;
579 }
580
581 template <typename... Args>
582 void
583 sched_print(const char* fmt, Args&&... args) {
584 if (sched_debug()) {
585 sched_logger.trace(fmt, std::forward<Args>(args)...);
586 }
587 }
588
589 static std::atomic<bool> abort_on_ebadf = { false };
590
591 void set_abort_on_ebadf(bool do_abort) {
592 abort_on_ebadf.store(do_abort);
593 }
594
595 bool is_abort_on_ebadf_enabled() {
596 return abort_on_ebadf.load();
597 }
598
599 timespec to_timespec(steady_clock_type::time_point t) {
600 using ns = std::chrono::nanoseconds;
601 auto n = std::chrono::duration_cast<ns>(t.time_since_epoch()).count();
602 return { n / 1'000'000'000, n % 1'000'000'000 };
603 }
604
605 void lowres_clock::update() noexcept {
606 lowres_clock::_now = lowres_clock::time_point(std::chrono::steady_clock::now().time_since_epoch());
607 lowres_system_clock::_now = lowres_system_clock::time_point(std::chrono::system_clock::now().time_since_epoch());
608 }
609
610 template <typename Clock>
611 inline
612 timer<Clock>::~timer() {
613 if (_queued) {
614 engine().del_timer(this);
615 }
616 }
617
618 template <typename Clock>
619 inline
620 void timer<Clock>::arm(time_point until, std::optional<duration> period) noexcept {
621 arm_state(until, period);
622 engine().add_timer(this);
623 }
624
625 template <typename Clock>
626 inline
627 void timer<Clock>::readd_periodic() noexcept {
628 arm_state(Clock::now() + _period.value(), {_period.value()});
629 engine().queue_timer(this);
630 }
631
632 template <typename Clock>
633 inline
634 bool timer<Clock>::cancel() noexcept {
635 if (!_armed) {
636 return false;
637 }
638 _armed = false;
639 if (_queued) {
640 engine().del_timer(this);
641 _queued = false;
642 }
643 return true;
644 }
645
646 template class timer<steady_clock_type>;
647 template class timer<lowres_clock>;
648 template class timer<manual_clock>;
649
650 reactor::signals::signals() : _pending_signals(0) {
651 }
652
653 reactor::signals::~signals() {
654 sigset_t mask;
655 sigfillset(&mask);
656 ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
657 }
658
659 reactor::signals::signal_handler::signal_handler(int signo, noncopyable_function<void ()>&& handler)
660 : _handler(std::move(handler)) {
661 }
662
663 void
664 reactor::signals::handle_signal(int signo, noncopyable_function<void ()>&& handler) {
665 _signal_handlers.emplace(std::piecewise_construct,
666 std::make_tuple(signo), std::make_tuple(signo, std::move(handler)));
667
668 struct sigaction sa;
669 sa.sa_sigaction = [](int sig, siginfo_t *info, void *p) {
670 engine()._backend->signal_received(sig, info, p);
671 };
672 sa.sa_mask = make_empty_sigset_mask();
673 sa.sa_flags = SA_SIGINFO | SA_RESTART;
674 auto r = ::sigaction(signo, &sa, nullptr);
675 throw_system_error_on(r == -1);
676 auto mask = make_sigset_mask(signo);
677 r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
678 throw_pthread_error(r);
679 }
680
681 void
682 reactor::signals::handle_signal_once(int signo, noncopyable_function<void ()>&& handler) {
683 return handle_signal(signo, [fired = false, handler = std::move(handler)] () mutable {
684 if (!fired) {
685 fired = true;
686 handler();
687 }
688 });
689 }
690
691 bool reactor::signals::poll_signal() {
692 auto signals = _pending_signals.load(std::memory_order_relaxed);
693 if (signals) {
694 _pending_signals.fetch_and(~signals, std::memory_order_relaxed);
695 for (size_t i = 0; i < sizeof(signals)*8; i++) {
696 if (signals & (1ull << i)) {
697 _signal_handlers.at(i)._handler();
698 }
699 }
700 }
701 return signals;
702 }
703
704 bool reactor::signals::pure_poll_signal() const {
705 return _pending_signals.load(std::memory_order_relaxed);
706 }
707
708 void reactor::signals::action(int signo, siginfo_t* siginfo, void* ignore) {
709 engine().start_handling_signal();
710 engine()._signals._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed);
711 }
712
713 void reactor::signals::failed_to_handle(int signo) {
714 char tname[64];
715 pthread_getname_np(pthread_self(), tname, sizeof(tname));
716 auto tid = syscall(SYS_gettid);
717 seastar_logger.error("Failed to handle signal {} on thread {} ({}): engine not ready", signo, tid, tname);
718 }
719
720 void reactor::handle_signal(int signo, noncopyable_function<void ()>&& handler) {
721 _signals.handle_signal(signo, std::move(handler));
722 }
723
724 // Accumulates an in-memory backtrace and flush to stderr eventually.
725 // Async-signal safe.
726 class backtrace_buffer {
727 static constexpr unsigned _max_size = 8 << 10;
728 unsigned _pos = 0;
729 char _buf[_max_size];
730 public:
731 void flush() noexcept {
732 print_safe(_buf, _pos);
733 _pos = 0;
734 }
735
736 void reserve(size_t len) noexcept {
737 assert(len < _max_size);
738 if (_pos + len >= _max_size) {
739 flush();
740 }
741 }
742
743 void append(const char* str, size_t len) noexcept {
744 reserve(len);
745 memcpy(_buf + _pos, str, len);
746 _pos += len;
747 }
748
749 void append(const char* str) noexcept { append(str, strlen(str)); }
750
751 template <typename Integral>
752 void append_decimal(Integral n) noexcept {
753 char buf[sizeof(n) * 3];
754 auto len = convert_decimal_safe(buf, sizeof(buf), n);
755 append(buf, len);
756 }
757
758 template <typename Integral>
759 void append_hex(Integral ptr) noexcept {
760 char buf[sizeof(ptr) * 2];
761 auto p = convert_hex_safe(buf, sizeof(buf), ptr);
762 append(p, (buf + sizeof(buf)) - p);
763 }
764
765 void append_backtrace() noexcept {
766 backtrace([this] (frame f) {
767 append(" ");
768 if (!f.so->name.empty()) {
769 append(f.so->name.c_str(), f.so->name.size());
770 append("+");
771 }
772
773 append("0x");
774 append_hex(f.addr);
775 append("\n");
776 });
777 }
778
779 void append_backtrace_oneline() noexcept {
780 backtrace([this] (frame f) noexcept {
781 reserve(3 + sizeof(f.addr) * 2);
782 append(" 0x");
783 append_hex(f.addr);
784 });
785 }
786 };
787
788 static void print_with_backtrace(backtrace_buffer& buf, bool oneline) noexcept {
789 if (local_engine) {
790 buf.append(" on shard ");
791 buf.append_decimal(this_shard_id());
792 }
793
794 if (!oneline) {
795 buf.append(".\nBacktrace:\n");
796 buf.append_backtrace();
797 } else {
798 buf.append(". Backtrace:");
799 buf.append_backtrace_oneline();
800 buf.append("\n");
801 }
802 buf.flush();
803 }
804
805 static void print_with_backtrace(const char* cause, bool oneline = false) noexcept {
806 backtrace_buffer buf;
807 buf.append(cause);
808 print_with_backtrace(buf, oneline);
809 }
810
811 // Installs signal handler stack for current thread.
812 // The stack remains installed as long as the returned object is kept alive.
813 // When it goes out of scope the previous handler is restored.
814 static decltype(auto) install_signal_handler_stack() {
815 size_t size = SIGSTKSZ;
816 auto mem = std::make_unique<char[]>(size);
817 stack_t stack;
818 stack_t prev_stack;
819 stack.ss_sp = mem.get();
820 stack.ss_flags = 0;
821 stack.ss_size = size;
822 auto r = sigaltstack(&stack, &prev_stack);
823 throw_system_error_on(r == -1);
824 return defer([mem = std::move(mem), prev_stack] () mutable noexcept {
825 try {
826 auto r = sigaltstack(&prev_stack, NULL);
827 throw_system_error_on(r == -1);
828 } catch (...) {
829 mem.release(); // We failed to restore previous stack, must leak it.
830 seastar_logger.error("Failed to restore signal stack: {}", std::current_exception());
831 }
832 });
833 }
834
835 reactor::task_queue::task_queue(unsigned id, sstring name, float shares)
836 : _shares(std::max(shares, 1.0f))
837 , _reciprocal_shares_times_2_power_32((uint64_t(1) << 32) / _shares)
838 , _id(id)
839 , _ts(now())
840 , _name(std::move(name)) {
841 register_stats();
842 }
843
844 void
845 reactor::task_queue::register_stats() {
846 seastar::metrics::metric_groups new_metrics;
847 namespace sm = seastar::metrics;
848 static auto group = sm::label("group");
849 auto group_label = group(_name);
850 new_metrics.add_group("scheduler", {
851 sm::make_counter("runtime_ms", [this] {
852 return std::chrono::duration_cast<std::chrono::milliseconds>(_runtime).count();
853 }, sm::description("Accumulated runtime of this task queue; an increment rate of 1000ms per second indicates full utilization"),
854 {group_label}),
855 sm::make_counter("waittime_ms", [this] {
856 return std::chrono::duration_cast<std::chrono::milliseconds>(_waittime).count();
857 }, sm::description("Accumulated waittime of this task queue; an increment rate of 1000ms per second indicates queue is waiting for something (e.g. IO)"),
858 {group_label}),
859 sm::make_counter("starvetime_ms", [this] {
860 return std::chrono::duration_cast<std::chrono::milliseconds>(_starvetime).count();
861 }, sm::description("Accumulated starvation time of this task queue; an increment rate of 1000ms per second indicates the scheduler feels really bad"),
862 {group_label}),
863 sm::make_counter("tasks_processed", _tasks_processed,
864 sm::description("Count of tasks executing on this queue; indicates together with runtime_ms indicates length of tasks"),
865 {group_label}),
866 sm::make_gauge("queue_length", [this] { return _q.size(); },
867 sm::description("Size of backlog on this queue, in tasks; indicates whether the queue is busy and/or contended"),
868 {group_label}),
869 sm::make_gauge("shares", [this] { return _shares; },
870 sm::description("Shares allocated to this queue"),
871 {group_label}),
872 sm::make_counter("time_spent_on_task_quota_violations_ms", [this] {
873 return _time_spent_on_task_quota_violations / 1ms;
874 }, sm::description("Total amount in milliseconds we were in violation of the task quota"),
875 {group_label}),
876 });
877 _metrics = std::exchange(new_metrics, {});
878 }
879
880 void
881 reactor::task_queue::rename(sstring new_name) {
882 if (_name != new_name) {
883 _name = new_name;
884 register_stats();
885 }
886 }
887
888 #ifdef __clang__
889 __attribute__((no_sanitize("undefined"))) // multiplication below may overflow; we check for that
890 #elif defined(__GNUC__)
891 [[gnu::no_sanitize_undefined]]
892 #endif
893 inline
894 int64_t
895 reactor::task_queue::to_vruntime(sched_clock::duration runtime) const {
896 auto scaled = (runtime.count() * _reciprocal_shares_times_2_power_32) >> 32;
897 // Prevent overflow from returning ridiculous values
898 return std::max<int64_t>(scaled, 0);
899 }
900
901 void
902 reactor::task_queue::set_shares(float shares) noexcept {
903 _shares = std::max(shares, 1.0f);
904 _reciprocal_shares_times_2_power_32 = (uint64_t(1) << 32) / _shares;
905 }
906
907 void
908 reactor::account_runtime(task_queue& tq, sched_clock::duration runtime) {
909 if (runtime > (2 * _task_quota)) {
910 tq._time_spent_on_task_quota_violations += runtime - _task_quota;
911 }
912 tq._vruntime += tq.to_vruntime(runtime);
913 tq._runtime += runtime;
914 }
915
916 void
917 reactor::account_idle(sched_clock::duration runtime) {
918 // anything to do here?
919 }
920
921 struct reactor::task_queue::indirect_compare {
922 bool operator()(const task_queue* tq1, const task_queue* tq2) const {
923 return tq1->_vruntime < tq2->_vruntime;
924 }
925 };
926
927 reactor::reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg)
928 : _smp(std::move(smp))
929 , _alien(alien)
930 , _cfg(cfg)
931 , _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC))
932 , _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
933 , _id(id)
934 #ifdef HAVE_OSV
935 , _timer_thread(
936 [&] { timer_thread_func(); }, sched::thread::attr().stack(4096).name("timer_thread").pin(sched::cpu::current()))
937 , _engine_thread(sched::thread::current())
938 #endif
939 , _cpu_started(0)
940 , _cpu_stall_detector(internal::make_cpu_stall_detector())
941 , _reuseport(posix_reuseport_detect())
942 , _thread_pool(std::make_unique<thread_pool>(this, seastar::format("syscall-{}", id))) {
943 /*
944 * The _backend assignment is here, not on the initialization list as
945 * the chosen backend constructor may want to handle signals and thus
946 * needs the _signals._signal_handlers map to be initialized.
947 */
948 _backend = rbs.create(*this);
949 *internal::get_scheduling_group_specific_thread_local_data_ptr() = &_scheduling_group_specific_data;
950 _task_queues.push_back(std::make_unique<task_queue>(0, "main", 1000));
951 _task_queues.push_back(std::make_unique<task_queue>(1, "atexit", 1000));
952 _at_destroy_tasks = _task_queues.back().get();
953 set_need_preempt_var(&_preemption_monitor);
954 seastar::thread_impl::init();
955 _backend->start_tick();
956
957 #ifdef HAVE_OSV
958 _timer_thread.start();
959 #else
960 sigset_t mask;
961 sigemptyset(&mask);
962 sigaddset(&mask, internal::cpu_stall_detector::signal_number());
963 auto r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
964 assert(r == 0);
965 #endif
966 memory::set_reclaim_hook([this] (std::function<void ()> reclaim_fn) {
967 add_high_priority_task(make_task(default_scheduling_group(), [fn = std::move(reclaim_fn)] {
968 fn();
969 }));
970 });
971 }
972
973 reactor::~reactor() {
974 sigset_t mask;
975 sigemptyset(&mask);
976 sigaddset(&mask, internal::cpu_stall_detector::signal_number());
977 auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
978 assert(r == 0);
979
980 _backend->stop_tick();
981 auto eraser = [](auto& list) {
982 while (!list.empty()) {
983 auto& timer = *list.begin();
984 timer.cancel();
985 }
986 };
987 eraser(_expired_timers);
988 eraser(_expired_lowres_timers);
989 eraser(_expired_manual_timers);
990 auto& sg_data = _scheduling_group_specific_data;
991 for (auto&& tq : _task_queues) {
992 if (tq) {
993 auto& this_sg = sg_data.per_scheduling_group_data[tq->_id];
994 // The following line will preserve the convention that constructor and destructor functions
995 // for the per sg values are called in the context of the containing scheduling group.
996 *internal::current_scheduling_group_ptr() = scheduling_group(tq->_id);
997 for (size_t key : boost::irange<size_t>(0, sg_data.scheduling_group_key_configs.size())) {
998 void* val = this_sg.specific_vals[key];
999 if (val) {
1000 if (sg_data.scheduling_group_key_configs[key].destructor) {
1001 sg_data.scheduling_group_key_configs[key].destructor(val);
1002 }
1003 free(val);
1004 this_sg.specific_vals[key] = nullptr;
1005 }
1006 }
1007 }
1008 }
1009 }
1010
1011 reactor::sched_stats
1012 reactor::get_sched_stats() const {
1013 sched_stats ret;
1014 ret.tasks_processed = tasks_processed();
1015 return ret;
1016 }
1017
1018 future<> reactor::readable(pollable_fd_state& fd) {
1019 return _backend->readable(fd);
1020 }
1021
1022 future<> reactor::writeable(pollable_fd_state& fd) {
1023 return _backend->writeable(fd);
1024 }
1025
1026 future<> reactor::readable_or_writeable(pollable_fd_state& fd) {
1027 return _backend->readable_or_writeable(fd);
1028 }
1029
1030 future<> reactor::poll_rdhup(pollable_fd_state& fd) {
1031 return _backend->poll_rdhup(fd);
1032 }
1033
1034 void reactor::set_strict_dma(bool value) {
1035 _strict_o_direct = value;
1036 }
1037
1038 void reactor::set_bypass_fsync(bool value) {
1039 _bypass_fsync = value;
1040 }
1041
1042 void
1043 reactor::reset_preemption_monitor() {
1044 return _backend->reset_preemption_monitor();
1045 }
1046
1047 void
1048 reactor::request_preemption() {
1049 return _backend->request_preemption();
1050 }
1051
1052 void reactor::start_handling_signal() {
1053 return _backend->start_handling_signal();
1054 }
1055
1056 namespace internal {
1057
1058 cpu_stall_detector::cpu_stall_detector(cpu_stall_detector_config cfg)
1059 : _shard_id(this_shard_id()) {
1060 // glib's backtrace() calls dlopen("libgcc_s.so.1") once to resolve unwind related symbols.
1061 // If first stall detector invocation happens during another dlopen() call the calling thread
1062 // will deadlock. The dummy call here makes sure that backtrace's initialization happens in
1063 // a safe place.
1064 backtrace([] (frame) {});
1065 update_config(cfg);
1066
1067 namespace sm = seastar::metrics;
1068
1069 _metrics.add_group("stall_detector", {
1070 sm::make_counter("reported", _total_reported, sm::description("Total number of reported stalls, look in the traces for the exact reason"))});
1071
1072 // note: if something is added here that can, it should take care to destroy _timer.
1073 }
1074
1075 cpu_stall_detector_posix_timer::cpu_stall_detector_posix_timer(cpu_stall_detector_config cfg) : cpu_stall_detector(cfg) {
1076 struct sigevent sev = {};
1077 sev.sigev_notify = SIGEV_THREAD_ID;
1078 sev.sigev_signo = signal_number();
1079 sev._sigev_un._tid = syscall(SYS_gettid);
1080 int err = timer_create(CLOCK_THREAD_CPUTIME_ID, &sev, &_timer);
1081 if (err) {
1082 throw std::system_error(std::error_code(err, std::system_category()));
1083 }
1084 }
1085
1086 cpu_stall_detector_posix_timer::~cpu_stall_detector_posix_timer() {
1087 timer_delete(_timer);
1088 }
1089
1090 cpu_stall_detector_config
1091 cpu_stall_detector::get_config() const {
1092 return _config;
1093 }
1094
1095 void cpu_stall_detector::update_config(cpu_stall_detector_config cfg) {
1096 _config = cfg;
1097 _threshold = std::chrono::duration_cast<sched_clock::duration>(cfg.threshold);
1098 _slack = std::chrono::duration_cast<sched_clock::duration>(cfg.threshold * cfg.slack);
1099 _stall_detector_reports_per_minute = cfg.stall_detector_reports_per_minute;
1100 _max_reports_per_minute = cfg.stall_detector_reports_per_minute;
1101 _rearm_timer_at = reactor::now();
1102 }
1103
1104 void cpu_stall_detector::maybe_report() {
1105 if (_reported++ < _max_reports_per_minute) {
1106 generate_trace();
1107 }
1108 }
1109 // We use a tick at every timer firing so we can report suppressed backtraces.
1110 // Best case it's a correctly predicted branch. If a backtrace had happened in
1111 // the near past it's an increment and two branches.
1112 //
1113 // We can do it a cheaper if we don't report suppressed backtraces.
1114 void cpu_stall_detector::on_signal() {
1115 auto tasks_processed = engine().tasks_processed();
1116 auto last_seen = _last_tasks_processed_seen.load(std::memory_order_relaxed);
1117 if (!last_seen) {
1118 return; // stall detector in not active
1119 } else if (last_seen == tasks_processed) {
1120 // we defer to the check of spuriousness to inside this unlikely condition
1121 // since the check itself can be costly, involving a syscall (reactor::now())
1122 if (is_spurious_signal()) {
1123 return;
1124 }
1125 // no task was processed - report unless supressed
1126 maybe_report();
1127 _report_at <<= 1;
1128 } else {
1129 _last_tasks_processed_seen.store(tasks_processed, std::memory_order_relaxed);
1130 }
1131 arm_timer();
1132 }
1133
1134 void cpu_stall_detector::report_suppressions(sched_clock::time_point now) {
1135 if (now > _minute_mark + 60s) {
1136 if (_reported > _max_reports_per_minute) {
1137 auto suppressed = _reported - _max_reports_per_minute;
1138 backtrace_buffer buf;
1139 // Reuse backtrace buffer infrastructure so we don't have to allocate here
1140 buf.append("Rate-limit: suppressed ");
1141 buf.append_decimal(suppressed);
1142 suppressed == 1 ? buf.append(" backtrace") : buf.append(" backtraces");
1143 buf.append(" on shard ");
1144 buf.append_decimal(_shard_id);
1145 buf.append("\n");
1146 buf.flush();
1147 }
1148 reset_suppression_state(now);
1149 }
1150 }
1151
1152 void
1153 cpu_stall_detector::reset_suppression_state(sched_clock::time_point now) {
1154 _reported = 0;
1155 _minute_mark = now;
1156 }
1157
1158 void cpu_stall_detector_posix_timer::arm_timer() {
1159 auto its = posix::to_relative_itimerspec(_threshold * _report_at + _slack, 0s);
1160 timer_settime(_timer, 0, &its, nullptr);
1161 }
1162
1163 void cpu_stall_detector::start_task_run(sched_clock::time_point now) {
1164 if (now > _rearm_timer_at) {
1165 report_suppressions(now);
1166 _report_at = 1;
1167 _run_started_at = now;
1168 _rearm_timer_at = now + _threshold * _report_at;
1169 arm_timer();
1170 }
1171 _last_tasks_processed_seen.store(engine().tasks_processed(), std::memory_order_relaxed);
1172 std::atomic_signal_fence(std::memory_order_release); // Don't delay this write, so the signal handler can see it
1173 }
1174
1175 void cpu_stall_detector::end_task_run(sched_clock::time_point now) {
1176 std::atomic_signal_fence(std::memory_order_acquire); // Don't hoist this write, so the signal handler can see it
1177 _last_tasks_processed_seen.store(0, std::memory_order_relaxed);
1178 }
1179
1180 void cpu_stall_detector_posix_timer::start_sleep() {
1181 auto its = posix::to_relative_itimerspec(0s, 0s);
1182 timer_settime(_timer, 0, &its, nullptr);
1183 _rearm_timer_at = reactor::now();
1184 }
1185
1186 void cpu_stall_detector::end_sleep() {
1187 }
1188
1189 static long
1190 perf_event_open(struct perf_event_attr* hw_event, pid_t pid, int cpu, int group_fd, unsigned long flags) {
1191 return syscall(__NR_perf_event_open, hw_event, pid, cpu, group_fd, flags);
1192 }
1193
1194 cpu_stall_detector_linux_perf_event::cpu_stall_detector_linux_perf_event(file_desc fd, cpu_stall_detector_config cfg)
1195 : cpu_stall_detector(cfg), _fd(std::move(fd)) {
1196 void* ret = ::mmap(nullptr, 2*getpagesize(), PROT_READ|PROT_WRITE, MAP_SHARED, _fd.get(), 0);
1197 if (ret == MAP_FAILED) {
1198 abort();
1199 }
1200 _mmap = static_cast<struct ::perf_event_mmap_page*>(ret);
1201 _data_area = reinterpret_cast<char*>(_mmap) + getpagesize();
1202 _data_area_mask = getpagesize() - 1;
1203 }
1204
1205 cpu_stall_detector_linux_perf_event::~cpu_stall_detector_linux_perf_event() {
1206 ::munmap(_mmap, 2*getpagesize());
1207 }
1208
1209 void
1210 cpu_stall_detector_linux_perf_event::arm_timer() {
1211 auto period = _threshold * _report_at + _slack;
1212 uint64_t ns = period / 1ns;
1213 _next_signal_time = reactor::now() + period;
1214
1215 // clear out any existing records in the ring buffer, so when we get interrupted next time
1216 // we have only the stack associated with that interrupt, and so we don't overflow.
1217 data_area_reader(*this).skip_all();
1218 if (__builtin_expect(_enabled && _current_period == ns, 1)) {
1219 // Common case - we're re-arming with the same period, the counter
1220 // is already enabled.
1221
1222 // We want to set the next interrupt to ns from now, and somewhat oddly the
1223 // way to do this is PERF_EVENT_IOC_PERIOD, even with the same period as
1224 // already configured, see the code at:
1225 //
1226 // https://elixir.bootlin.com/linux/v5.15.86/source/kernel/events/core.c#L5636
1227 //
1228 // Ths change is intentional: kernel commit bad7192b842c83e580747ca57104dd51fe08c223
1229 // so we can resumably rely on it.
1230 _fd.ioctl(PERF_EVENT_IOC_PERIOD, ns);
1231
1232 } else {
1233 // Uncommon case - we're moving from disabled to enabled, or changing
1234 // the period. Issue more calls and be careful.
1235 _fd.ioctl(PERF_EVENT_IOC_DISABLE, 0); // avoid false alarms while we modify stuff
1236 _fd.ioctl(PERF_EVENT_IOC_PERIOD, ns);
1237 _fd.ioctl(PERF_EVENT_IOC_RESET, 0);
1238 _fd.ioctl(PERF_EVENT_IOC_ENABLE, 0);
1239 _enabled = true;
1240 _current_period = ns;
1241 }
1242 }
1243
1244 void
1245 cpu_stall_detector_linux_perf_event::start_sleep() {
1246 _fd.ioctl(PERF_EVENT_IOC_DISABLE, 0);
1247 _enabled = false;
1248 }
1249
1250 bool
1251 cpu_stall_detector_linux_perf_event::is_spurious_signal() {
1252 // If the current time is before the expected signal time, it is
1253 // probably a spurious signal. One reason this could occur is that
1254 // PERF_EVENT_IOC_PERIOD does not reset the current overflow point
1255 // on kernels prior to 3.14 (or 3.7 on Arm).
1256 return reactor::now() < _next_signal_time;
1257 }
1258
1259 void
1260 cpu_stall_detector_linux_perf_event::maybe_report_kernel_trace() {
1261 data_area_reader reader(*this);
1262 auto current_record = [&] () -> ::perf_event_header {
1263 return reader.read_struct<perf_event_header>();
1264 };
1265
1266 while (reader.have_data()) {
1267 auto record = current_record();
1268
1269 if (record.type != PERF_RECORD_SAMPLE) {
1270 reader.skip(record.size - sizeof(record));
1271 continue;
1272 }
1273
1274 auto nr = reader.read_u64();
1275 backtrace_buffer buf;
1276 buf.append("kernel callstack:");
1277 for (uint64_t i = 0; i < nr; ++i) {
1278 buf.append(" 0x");
1279 buf.append_hex(uintptr_t(reader.read_u64()));
1280 }
1281 buf.append("\n");
1282 buf.flush();
1283 };
1284 }
1285
1286 std::unique_ptr<cpu_stall_detector_linux_perf_event>
1287 cpu_stall_detector_linux_perf_event::try_make(cpu_stall_detector_config cfg) {
1288 ::perf_event_attr pea = {
1289 .type = PERF_TYPE_SOFTWARE,
1290 .size = sizeof(pea),
1291 .config = PERF_COUNT_SW_TASK_CLOCK, // more likely to work on virtual machines than hardware events
1292 .sample_period = 1'000'000'000, // Needs non-zero value or PERF_IOC_PERIOD gets confused
1293 .sample_type = PERF_SAMPLE_CALLCHAIN,
1294 .disabled = 1,
1295 .exclude_callchain_user = 1, // we're using backtrace() to capture the user callchain
1296 .wakeup_events = 1,
1297 };
1298 unsigned long flags = 0;
1299 if (internal::kernel_uname().whitelisted({"3.14"})) {
1300 flags |= PERF_FLAG_FD_CLOEXEC;
1301 }
1302 int fd = perf_event_open(&pea, 0, -1, -1, flags);
1303 if (fd == -1) {
1304 throw std::system_error(errno, std::system_category(), "perf_event_open() failed");
1305 }
1306 auto desc = file_desc::from_fd(fd);
1307 struct f_owner_ex sig_owner = {
1308 .type = F_OWNER_TID,
1309 .pid = static_cast<pid_t>(syscall(SYS_gettid)),
1310 };
1311 auto ret1 = ::fcntl(fd, F_SETOWN_EX, &sig_owner);
1312 if (ret1 == -1) {
1313 abort();
1314 }
1315 auto ret2 = ::fcntl(fd, F_SETSIG, signal_number());
1316 if (ret2 == -1) {
1317 abort();
1318 }
1319 auto fd_flags = ::fcntl(fd, F_GETFL);
1320 if (fd_flags == -1) {
1321 abort();
1322 }
1323 auto ret3 = ::fcntl(fd, F_SETFL, fd_flags | O_ASYNC);
1324 if (ret3 == -1) {
1325 abort();
1326 }
1327 return std::make_unique<cpu_stall_detector_linux_perf_event>(std::move(desc), std::move(cfg));
1328 }
1329
1330
1331 std::unique_ptr<cpu_stall_detector> make_cpu_stall_detector(cpu_stall_detector_config cfg) {
1332 try {
1333 return cpu_stall_detector_linux_perf_event::try_make(cfg);
1334 } catch (...) {
1335 seastar_logger.warn("Creation of perf_event based stall detector failed, falling back to posix timer: {}", std::current_exception());
1336 return std::make_unique<cpu_stall_detector_posix_timer>(cfg);
1337 }
1338 }
1339
1340 void cpu_stall_detector::generate_trace() {
1341 auto delta = reactor::now() - _run_started_at;
1342
1343 _total_reported++;
1344 if (_config.report) {
1345 _config.report();
1346 return;
1347 }
1348
1349 backtrace_buffer buf;
1350 buf.append("Reactor stalled for ");
1351 buf.append_decimal(uint64_t(delta / 1ms));
1352 buf.append(" ms");
1353 print_with_backtrace(buf, _config.oneline);
1354 maybe_report_kernel_trace();
1355 }
1356
1357 } // internal namespace
1358
1359 void
1360 reactor::update_blocked_reactor_notify_ms(std::chrono::milliseconds ms) {
1361 auto cfg = _cpu_stall_detector->get_config();
1362 if (ms != cfg.threshold) {
1363 cfg.threshold = ms;
1364 _cpu_stall_detector->update_config(cfg);
1365 seastar_logger.info("updated: blocked-reactor-notify-ms={}", ms.count());
1366 }
1367 }
1368
1369 std::chrono::milliseconds
1370 reactor::get_blocked_reactor_notify_ms() const {
1371 auto d = _cpu_stall_detector->get_config().threshold;
1372 return std::chrono::duration_cast<std::chrono::milliseconds>(d);
1373 }
1374
1375 void
1376 reactor::set_stall_detector_report_function(std::function<void ()> report) {
1377 auto cfg = _cpu_stall_detector->get_config();
1378 cfg.report = std::move(report);
1379 _cpu_stall_detector->update_config(std::move(cfg));
1380 _cpu_stall_detector->reset_suppression_state(reactor::now());
1381 }
1382
1383 std::function<void ()>
1384 reactor::get_stall_detector_report_function() const {
1385 return _cpu_stall_detector->get_config().report;
1386 }
1387
1388 void
1389 reactor::block_notifier(int) {
1390 engine()._cpu_stall_detector->on_signal();
1391 }
1392
1393 template <typename T, typename E, typename EnableFunc>
1394 void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_fn) noexcept(noexcept(enable_fn())) {
1395 expired_timers = timers.expire(timers.now());
1396 for (auto& t : expired_timers) {
1397 t._expired = true;
1398 }
1399 const auto prev_sg = current_scheduling_group();
1400 while (!expired_timers.empty()) {
1401 auto t = &*expired_timers.begin();
1402 expired_timers.pop_front();
1403 t->_queued = false;
1404 if (t->_armed) {
1405 t->_armed = false;
1406 if (t->_period) {
1407 t->readd_periodic();
1408 }
1409 try {
1410 *internal::current_scheduling_group_ptr() = t->_sg;
1411 t->_callback();
1412 } catch (...) {
1413 seastar_logger.error("Timer callback failed: {}", std::current_exception());
1414 }
1415 }
1416 }
1417 // complete_timers() can be called from the context of run_tasks()
1418 // as well so we need to restore the previous scheduling group (set by run_tasks()).
1419 *internal::current_scheduling_group_ptr() = prev_sg;
1420 enable_fn();
1421 }
1422
1423 #ifdef HAVE_OSV
1424 void reactor::timer_thread_func() {
1425 sched::timer tmr(*sched::thread::current());
1426 WITH_LOCK(_timer_mutex) {
1427 while (!_stopped) {
1428 if (_timer_due != 0) {
1429 set_timer(tmr, _timer_due);
1430 _timer_cond.wait(_timer_mutex, &tmr);
1431 if (tmr.expired()) {
1432 _timer_due = 0;
1433 _engine_thread->unsafe_stop();
1434 _pending_tasks.push_front(make_task(default_scheduling_group(), [this] {
1435 complete_timers(_timers, _expired_timers, [this] {
1436 if (!_timers.empty()) {
1437 enable_timer(_timers.get_next_timeout());
1438 }
1439 });
1440 }));
1441 _engine_thread->wake();
1442 } else {
1443 tmr.cancel();
1444 }
1445 } else {
1446 _timer_cond.wait(_timer_mutex);
1447 }
1448 }
1449 }
1450 }
1451
1452 void reactor::set_timer(sched::timer &tmr, s64 t) {
1453 using namespace osv::clock;
1454 tmr.set(wall::time_point(std::chrono::nanoseconds(t)));
1455 }
1456 #endif
1457
1458 class network_stack_factory {
1459 network_stack_entry::factory_func _func;
1460
1461 public:
1462 network_stack_factory(noncopyable_function<future<std::unique_ptr<network_stack>> (const program_options::option_group&)> func)
1463 : _func(std::move(func)) { }
1464 future<std::unique_ptr<network_stack>> operator()(const program_options::option_group& opts) { return _func(opts); }
1465 };
1466
1467 void reactor::configure(const reactor_options& opts) {
1468 _network_stack_ready = opts.network_stack.get_selected_candidate()(*opts.network_stack.get_selected_candidate_opts());
1469
1470 _handle_sigint = !opts.no_handle_interrupt;
1471 auto task_quota = opts.task_quota_ms.get_value() * 1ms;
1472 _task_quota = std::chrono::duration_cast<sched_clock::duration>(task_quota);
1473
1474 auto blocked_time = opts.blocked_reactor_notify_ms.get_value() * 1ms;
1475 internal::cpu_stall_detector_config csdc;
1476 csdc.threshold = blocked_time;
1477 csdc.stall_detector_reports_per_minute = opts.blocked_reactor_reports_per_minute.get_value();
1478 csdc.oneline = opts.blocked_reactor_report_format_oneline.get_value();
1479 _cpu_stall_detector->update_config(csdc);
1480
1481 _max_task_backlog = opts.max_task_backlog.get_value();
1482 _max_poll_time = opts.idle_poll_time_us.get_value() * 1us;
1483 if (opts.poll_mode) {
1484 _max_poll_time = std::chrono::nanoseconds::max();
1485 }
1486 if (opts.overprovisioned && opts.idle_poll_time_us.defaulted() && !opts.poll_mode) {
1487 _max_poll_time = 0us;
1488 }
1489 set_strict_dma(!opts.relaxed_dma);
1490 if (!opts.poll_aio.get_value() || (opts.poll_aio.defaulted() && opts.overprovisioned)) {
1491 _aio_eventfd = pollable_fd(file_desc::eventfd(0, 0));
1492 }
1493 set_bypass_fsync(opts.unsafe_bypass_fsync.get_value());
1494 _kernel_page_cache = opts.kernel_page_cache.get_value();
1495 _force_io_getevents_syscall = opts.force_aio_syscalls.get_value();
1496 aio_nowait_supported = opts.linux_aio_nowait.get_value();
1497 _have_aio_fsync = opts.aio_fsync.get_value();
1498 }
1499
1500 pollable_fd
1501 reactor::posix_listen(socket_address sa, listen_options opts) {
1502 auto specific_protocol = (int)(opts.proto);
1503 if (sa.is_af_unix()) {
1504 // no type-safe way to create listen_opts with proto=0
1505 specific_protocol = 0;
1506 }
1507 static auto somaxconn = [] {
1508 std::optional<int> result;
1509 std::ifstream ifs("/proc/sys/net/core/somaxconn");
1510 if (ifs) {
1511 result = 0;
1512 ifs >> *result;
1513 }
1514 return result;
1515 }();
1516 if (somaxconn && *somaxconn < opts.listen_backlog) {
1517 fmt::print(
1518 "Warning: /proc/sys/net/core/somaxconn is set to {:d} "
1519 "which is lower than the backlog parameter {:d} used for listen(), "
1520 "please change it with `sysctl -w net.core.somaxconn={:d}`\n",
1521 *somaxconn, opts.listen_backlog, opts.listen_backlog);
1522 }
1523
1524 file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, specific_protocol);
1525 if (opts.reuse_address) {
1526 fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1);
1527 }
1528 if (_reuseport && !sa.is_af_unix())
1529 fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);
1530
1531 try {
1532 fd.bind(sa.u.sa, sa.length());
1533 fd.listen(opts.listen_backlog);
1534 } catch (const std::system_error& s) {
1535 throw std::system_error(s.code(), fmt::format("posix_listen failed for address {}", sa));
1536 }
1537
1538 return pollable_fd(std::move(fd));
1539 }
1540
1541 bool
1542 reactor::posix_reuseport_detect() {
1543 return false; // FIXME: reuseport currently leads to heavy load imbalance. Until we fix that, just
1544 // disable it unconditionally.
1545 try {
1546 file_desc fd = file_desc::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
1547 fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);
1548 return true;
1549 } catch(std::system_error& e) {
1550 return false;
1551 }
1552 }
1553
1554 void pollable_fd_state::maybe_no_more_recv() {
1555 if (shutdown_mask & posix::rcv_shutdown) {
1556 throw std::system_error(std::error_code(ECONNABORTED, std::system_category()));
1557 }
1558 }
1559
1560 void pollable_fd_state::maybe_no_more_send() {
1561 if (shutdown_mask & posix::snd_shutdown) {
1562 throw std::system_error(std::error_code(ECONNABORTED, std::system_category()));
1563 }
1564 }
1565
1566 void pollable_fd_state::forget() {
1567 engine()._backend->forget(*this);
1568 }
1569
1570 void intrusive_ptr_release(pollable_fd_state* fd) {
1571 if (!--fd->_refs) {
1572 fd->forget();
1573 }
1574 }
1575
1576 pollable_fd::pollable_fd(file_desc fd, pollable_fd::speculation speculate)
1577 : _s(engine()._backend->make_pollable_fd_state(std::move(fd), speculate))
1578 {}
1579
1580 void pollable_fd::shutdown(int how, shutdown_kernel_only kernel_only) {
1581 if (!kernel_only) {
1582 // TCP will respond to shutdown() by returning ECONNABORT on the next IO,
1583 // but UDP responds by returning AGAIN. The shutdown_mask tells us to convert
1584 // EAGAIN to ECONNABORT in that case.
1585 _s->shutdown_mask |= posix::shutdown_mask(how);
1586 }
1587 engine()._backend->shutdown(*_s, how);
1588 }
1589
1590 pollable_fd
1591 reactor::make_pollable_fd(socket_address sa, int proto) {
1592 int maybe_nonblock = _backend->do_blocking_io() ? 0 : SOCK_NONBLOCK;
1593 file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | maybe_nonblock | SOCK_CLOEXEC, proto);
1594 return pollable_fd(std::move(fd));
1595 }
1596
1597 future<>
1598 reactor::posix_connect(pollable_fd pfd, socket_address sa, socket_address local) {
1599 #ifdef IP_BIND_ADDRESS_NO_PORT
1600 if (!sa.is_af_unix()) {
1601 try {
1602 // do not reserve an ephemeral port when using bind() with port number 0.
1603 // connect() will handle it later. The reason for that is that bind() may fail
1604 // to allocate a port while connect will success, this is because bind() does not
1605 // know dst address and has to find globally unique local port.
1606 pfd.get_file_desc().setsockopt(SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
1607 } catch (std::system_error& err) {
1608 if (err.code() != std::error_code(ENOPROTOOPT, std::system_category())) {
1609 throw;
1610 }
1611 }
1612 }
1613 #endif
1614 if (!local.is_wildcard()) {
1615 // call bind() only if local address is not wildcard
1616 pfd.get_file_desc().bind(local.u.sa, local.length());
1617 }
1618 return pfd.connect(sa).finally([pfd] {});
1619 }
1620
1621 server_socket
1622 reactor::listen(socket_address sa, listen_options opt) {
1623 return server_socket(_network_stack->listen(sa, opt));
1624 }
1625
1626 future<connected_socket>
1627 reactor::connect(socket_address sa) {
1628 return _network_stack->connect(sa);
1629 }
1630
1631 future<connected_socket>
1632 reactor::connect(socket_address sa, socket_address local, transport proto) {
1633 return _network_stack->connect(sa, local, proto);
1634 }
1635
1636 void io_completion::complete_with(ssize_t res) {
1637 if (res >= 0) {
1638 complete(res);
1639 return;
1640 }
1641
1642 ++engine()._io_stats.aio_errors;
1643 try {
1644 throw_kernel_error(res);
1645 } catch (...) {
1646 set_exception(std::current_exception());
1647 }
1648 }
1649
1650 bool
1651 reactor::flush_pending_aio() {
1652 for (auto& ioq : _io_queues) {
1653 ioq.second->poll_io_queue();
1654 }
1655 return false;
1656 }
1657
1658 steady_clock_type::time_point reactor::next_pending_aio() const noexcept {
1659 steady_clock_type::time_point next = steady_clock_type::time_point::max();
1660
1661 for (auto& ioq : _io_queues) {
1662 steady_clock_type::time_point n = ioq.second->next_pending_aio();
1663 if (n < next) {
1664 next = std::move(n);
1665 }
1666 }
1667
1668 return next;
1669 }
1670
1671 bool
1672 reactor::reap_kernel_completions() {
1673 return _backend->reap_kernel_completions();
1674 }
1675
1676 const io_priority_class& default_priority_class() {
1677 static thread_local auto shard_default_class = [] {
1678 return io_priority_class::register_one("default", 1);
1679 }();
1680 return shard_default_class;
1681 }
1682
1683 namespace internal {
1684
1685 size_t sanitize_iovecs(std::vector<iovec>& iov, size_t disk_alignment) noexcept {
1686 if (iov.size() > IOV_MAX) {
1687 iov.resize(IOV_MAX);
1688 }
1689 auto length = iovec_len(iov);
1690 while (auto rest = length & (disk_alignment - 1)) {
1691 if (iov.back().iov_len <= rest) {
1692 length -= iov.back().iov_len;
1693 iov.pop_back();
1694 } else {
1695 iov.back().iov_len -= rest;
1696 length -= rest;
1697 }
1698 }
1699 return length;
1700 }
1701
1702 }
1703
1704 future<file>
1705 reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_options options) noexcept {
1706 return do_with(static_cast<int>(flags), std::move(options), [this, nameref] (auto& open_flags, file_open_options& options) {
1707 sstring name(nameref);
1708 return _thread_pool->submit<syscall_result<int>>([this, name, &open_flags, &options, strict_o_direct = _strict_o_direct, bypass_fsync = _bypass_fsync] () mutable {
1709 // We want O_DIRECT, except in three cases:
1710 // - tmpfs (which doesn't support it, but works fine anyway)
1711 // - strict_o_direct == false (where we forgive it being not supported)
1712 // - _kernel_page_cache == true (where we disable it for short-lived test processes)
1713 // Because open() with O_DIRECT will fail, we open it without O_DIRECT, try
1714 // to update it to O_DIRECT with fcntl(), and if that fails, see if we
1715 // can forgive it.
1716 auto is_tmpfs = [] (int fd) {
1717 struct ::statfs buf;
1718 auto r = ::fstatfs(fd, &buf);
1719 if (r == -1) {
1720 return false;
1721 }
1722 return buf.f_type == internal::fs_magic::tmpfs;
1723 };
1724 open_flags |= O_CLOEXEC;
1725 if (bypass_fsync) {
1726 open_flags &= ~O_DSYNC;
1727 }
1728 auto mode = static_cast<mode_t>(options.create_permissions);
1729 int fd = ::open(name.c_str(), open_flags, mode);
1730 if (fd == -1) {
1731 return wrap_syscall<int>(fd);
1732 }
1733 int o_direct_flag = _kernel_page_cache ? 0 : O_DIRECT;
1734 int r = ::fcntl(fd, F_SETFL, open_flags | o_direct_flag);
1735 auto maybe_ret = wrap_syscall<int>(r); // capture errno (should be EINVAL)
1736 if (r == -1 && strict_o_direct && !is_tmpfs(fd)) {
1737 ::close(fd);
1738 return maybe_ret;
1739 }
1740 if (fd != -1 && options.extent_allocation_size_hint && !_kernel_page_cache) {
1741 fsxattr attr = {};
1742 int r = ::ioctl(fd, XFS_IOC_FSGETXATTR, &attr);
1743 // xfs delayed allocation is disabled when extent size hints are present.
1744 // This causes tons of xfs log fsyncs. Given that extent size hints are
1745 // unneeded when delayed allocation is available (which is the case
1746 // when not using O_DIRECT), disable them.
1747 //
1748 // Ignore error; may be !xfs, and just a hint anyway
1749 if (r != -1) {
1750 attr.fsx_xflags |= XFS_XFLAG_EXTSIZE;
1751 attr.fsx_extsize = std::min(options.extent_allocation_size_hint,
1752 file_open_options::max_extent_allocation_size_hint);
1753
1754 // Ignore error; may be !xfs, and just a hint anyway
1755 ::ioctl(fd, XFS_IOC_FSSETXATTR, &attr);
1756 }
1757 }
1758 return wrap_syscall<int>(fd);
1759 }).then([&options, name = std::move(name), &open_flags] (syscall_result<int> sr) {
1760 sr.throw_fs_exception_if_error("open failed", name);
1761 return make_file_impl(sr.result, options, open_flags);
1762 }).then([] (shared_ptr<file_impl> impl) {
1763 return make_ready_future<file>(std::move(impl));
1764 });
1765 });
1766 }
1767
1768 future<>
1769 reactor::remove_file(std::string_view pathname) noexcept {
1770 // Allocating memory for a sstring can throw, hence the futurize_invoke
1771 return futurize_invoke([pathname] {
1772 return engine()._thread_pool->submit<syscall_result<int>>([pathname = sstring(pathname)] {
1773 return wrap_syscall<int>(::remove(pathname.c_str()));
1774 }).then([pathname = sstring(pathname)] (syscall_result<int> sr) {
1775 sr.throw_fs_exception_if_error("remove failed", pathname);
1776 return make_ready_future<>();
1777 });
1778 });
1779 }
1780
1781 future<>
1782 reactor::rename_file(std::string_view old_pathname, std::string_view new_pathname) noexcept {
1783 // Allocating memory for a sstring can throw, hence the futurize_invoke
1784 return futurize_invoke([old_pathname, new_pathname] {
1785 return engine()._thread_pool->submit<syscall_result<int>>([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] {
1786 return wrap_syscall<int>(::rename(old_pathname.c_str(), new_pathname.c_str()));
1787 }).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
1788 sr.throw_fs_exception_if_error("rename failed", old_pathname, new_pathname);
1789 return make_ready_future<>();
1790 });
1791 });
1792 }
1793
1794 future<>
1795 reactor::link_file(std::string_view oldpath, std::string_view newpath) noexcept {
1796 // Allocating memory for a sstring can throw, hence the futurize_invoke
1797 return futurize_invoke([oldpath, newpath] {
1798 return engine()._thread_pool->submit<syscall_result<int>>([oldpath = sstring(oldpath), newpath = sstring(newpath)] {
1799 return wrap_syscall<int>(::link(oldpath.c_str(), newpath.c_str()));
1800 }).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result<int> sr) {
1801 sr.throw_fs_exception_if_error("link failed", oldpath, newpath);
1802 return make_ready_future<>();
1803 });
1804 });
1805 }
1806
1807 future<>
1808 reactor::chmod(std::string_view name, file_permissions permissions) noexcept {
1809 auto mode = static_cast<mode_t>(permissions);
1810 // Allocating memory for a sstring can throw, hence the futurize_invoke
1811 return futurize_invoke([name, mode, this] {
1812 return _thread_pool->submit<syscall_result<int>>([name = sstring(name), mode] {
1813 return wrap_syscall<int>(::chmod(name.c_str(), mode));
1814 }).then([name = sstring(name), mode] (syscall_result<int> sr) {
1815 if (sr.result == -1) {
1816 auto reason = format("chmod(0{:o}) failed", mode);
1817 sr.throw_fs_exception(reason, fs::path(name));
1818 }
1819 return make_ready_future<>();
1820 });
1821 });
1822 }
1823
1824 directory_entry_type stat_to_entry_type(__mode_t type) {
1825 if (S_ISDIR(type)) {
1826 return directory_entry_type::directory;
1827 }
1828 if (S_ISBLK(type)) {
1829 return directory_entry_type::block_device;
1830 }
1831 if (S_ISCHR(type)) {
1832 return directory_entry_type::char_device;
1833 }
1834 if (S_ISFIFO(type)) {
1835 return directory_entry_type::fifo;
1836 }
1837 if (S_ISLNK(type)) {
1838 return directory_entry_type::link;
1839 }
1840 if (S_ISSOCK(type)) {
1841 return directory_entry_type::socket;
1842 }
1843 if (S_ISREG(type)) {
1844 return directory_entry_type::regular;
1845 }
1846 return directory_entry_type::unknown;
1847 }
1848
1849 future<std::optional<directory_entry_type>>
1850 reactor::file_type(std::string_view name, follow_symlink follow) noexcept {
1851 // Allocating memory for a sstring can throw, hence the futurize_invoke
1852 return futurize_invoke([name, follow, this] {
1853 return _thread_pool->submit<syscall_result_extra<struct stat>>([name = sstring(name), follow] {
1854 struct stat st;
1855 auto stat_syscall = follow ? stat : lstat;
1856 auto ret = stat_syscall(name.c_str(), &st);
1857 return wrap_syscall(ret, st);
1858 }).then([name = sstring(name)] (syscall_result_extra<struct stat> sr) {
1859 if (long(sr.result) == -1) {
1860 if (sr.error != ENOENT && sr.error != ENOTDIR) {
1861 sr.throw_fs_exception_if_error("stat failed", name);
1862 }
1863 return make_ready_future<std::optional<directory_entry_type> >
1864 (std::optional<directory_entry_type>() );
1865 }
1866 return make_ready_future<std::optional<directory_entry_type> >
1867 (std::optional<directory_entry_type>(stat_to_entry_type(sr.extra.st_mode)) );
1868 });
1869 });
1870 }
1871
1872 future<std::optional<directory_entry_type>>
1873 file_type(std::string_view name, follow_symlink follow) noexcept {
1874 return engine().file_type(name, follow);
1875 }
1876
1877 static std::chrono::system_clock::time_point
1878 timespec_to_time_point(const timespec& ts) {
1879 auto d = std::chrono::duration_cast<std::chrono::system_clock::duration>(
1880 ts.tv_sec * 1s + ts.tv_nsec * 1ns);
1881 return std::chrono::system_clock::time_point(d);
1882 }
1883
1884 future<struct stat>
1885 reactor::fstat(int fd) noexcept {
1886 return _thread_pool->submit<syscall_result_extra<struct stat>>([fd] {
1887 struct stat st;
1888 auto ret = ::fstat(fd, &st);
1889 return wrap_syscall(ret, st);
1890 }).then([] (syscall_result_extra<struct stat> ret) {
1891 ret.throw_if_error();
1892 return make_ready_future<struct stat>(ret.extra);
1893 });
1894 }
1895
1896 future<int>
1897 reactor::inotify_add_watch(int fd, std::string_view path, uint32_t flags) {
1898 // Allocating memory for a sstring can throw, hence the futurize_invoke
1899 return futurize_invoke([path, fd, flags, this] {
1900 return _thread_pool->submit<syscall_result<int>>([fd, path = sstring(path), flags] {
1901 auto ret = ::inotify_add_watch(fd, path.c_str(), flags);
1902 return wrap_syscall(ret);
1903 }).then([] (syscall_result<int> ret) {
1904 ret.throw_if_error();
1905 return make_ready_future<int>(ret.result);
1906 });
1907 });
1908 }
1909
1910 future<std::tuple<file_desc, file_desc>>
1911 reactor::make_pipe() {
1912 return do_with(std::array<int, 2>{}, [this] (auto& pipe) {
1913 return _thread_pool->submit<syscall_result<int>>([&pipe] {
1914 return wrap_syscall<int>(::pipe2(pipe.data(), O_NONBLOCK));
1915 }).then([&pipe] (syscall_result<int> ret) {
1916 ret.throw_if_error();
1917 return make_ready_future<std::tuple<file_desc, file_desc>>(file_desc::from_fd(pipe[0]),
1918 file_desc::from_fd(pipe[1]));
1919 });
1920 });
1921 }
1922
1923 future<std::tuple<pid_t, file_desc, file_desc, file_desc>>
1924 reactor::spawn(std::string_view pathname,
1925 std::vector<sstring> argv,
1926 std::vector<sstring> env) {
1927 return when_all_succeed(make_pipe(),
1928 make_pipe(),
1929 make_pipe()).then_unpack([pathname = sstring(pathname),
1930 argv = std::move(argv),
1931 env = std::move(env), this] (std::tuple<file_desc, file_desc> cin_pipe,
1932 std::tuple<file_desc, file_desc> cout_pipe,
1933 std::tuple<file_desc, file_desc> cerr_pipe) mutable {
1934 return do_with(pid_t{},
1935 std::move(cin_pipe),
1936 std::move(cout_pipe),
1937 std::move(cerr_pipe),
1938 std::move(pathname),
1939 posix_spawn_file_actions_t{},
1940 posix_spawnattr_t{},
1941 std::move(argv),
1942 std::move(env),
1943 [this](auto& child_pid, auto& cin_pipe, auto& cout_pipe, auto& cerr_pipe, auto& pathname, auto& actions, auto& attr, auto& argv, auto& env) {
1944 static constexpr int pipefd_read_end = 0;
1945 static constexpr int pipefd_write_end = 1;
1946 // Allocating memory for spawn {file actions,attributes} objects can throw, hence the futurize_invoke
1947 return futurize_invoke([&child_pid, &cin_pipe, &cout_pipe, &cerr_pipe, &pathname, &actions, &attr, &argv, &env, this] {
1948 // the args and envs parameters passed to posix_spawn() should be array of pointers, and
1949 // the last one should be a null pointer.
1950 std::vector<const char*> argvp;
1951 std::transform(argv.cbegin(), argv.cend(), std::back_inserter(argvp),
1952 [](auto& s) { return s.c_str(); });
1953 argvp.push_back(nullptr);
1954
1955 std::vector<const char*> envp;
1956 std::transform(env.cbegin(), env.cend(), std::back_inserter(envp),
1957 [](auto& s) { return s.c_str(); });
1958 envp.push_back(nullptr);
1959
1960 int r = 0;
1961 r = ::posix_spawn_file_actions_init(&actions);
1962 throw_pthread_error(r);
1963 // the child process does not write to stdin
1964 std::get<pipefd_write_end>(cin_pipe).spawn_actions_add_close(&actions);
1965 // the child process does not read from stdout
1966 std::get<pipefd_read_end>(cout_pipe).spawn_actions_add_close(&actions);
1967 // the child process does not read from stderr
1968 std::get<pipefd_read_end>(cerr_pipe).spawn_actions_add_close(&actions);
1969 // redirect stdin, stdout and stderr to cin_pipe, cout_pipe and cerr_pipe respectively
1970 std::get<pipefd_read_end>(cin_pipe).spawn_actions_add_dup2(&actions, STDIN_FILENO);
1971 std::get<pipefd_write_end>(cout_pipe).spawn_actions_add_dup2(&actions, STDOUT_FILENO);
1972 std::get<pipefd_write_end>(cerr_pipe).spawn_actions_add_dup2(&actions, STDERR_FILENO);
1973 // after dup2() the interesting ends of pipes, close them
1974 std::get<pipefd_read_end>(cin_pipe).spawn_actions_add_close(&actions);
1975 std::get<pipefd_write_end>(cout_pipe).spawn_actions_add_close(&actions);
1976 std::get<pipefd_write_end>(cerr_pipe).spawn_actions_add_close(&actions);
1977 r = ::posix_spawnattr_init(&attr);
1978 throw_pthread_error(r);
1979 // make sure the following signals are not ignored by the child process
1980 sigset_t default_signals;
1981 sigemptyset(&default_signals);
1982 sigaddset(&default_signals, SIGINT);
1983 sigaddset(&default_signals, SIGTERM);
1984 r = ::posix_spawnattr_setsigdefault(&attr, &default_signals);
1985 throw_pthread_error(r);
1986 // make sure no signals are marked in the child process
1987 sigset_t mask_signals;
1988 sigemptyset(&mask_signals);
1989 r = ::posix_spawnattr_setsigmask(&attr, &mask_signals);
1990 throw_pthread_error(r);
1991 r = ::posix_spawnattr_setflags(&attr, POSIX_SPAWN_SETSIGDEF | POSIX_SPAWN_SETSIGMASK);
1992 throw_pthread_error(r);
1993
1994 return _thread_pool->submit<syscall_result<int>>([&child_pid, &pathname, &actions, &attr,
1995 argv = std::move(argvp),
1996 env = std::move(envp)] {
1997 return wrap_syscall<int>(::posix_spawn(&child_pid, pathname.c_str(), &actions, &attr,
1998 const_cast<char* const *>(argv.data()),
1999 const_cast<char* const *>(env.data())));
2000 });
2001 }).finally([&actions, &attr] {
2002 posix_spawn_file_actions_destroy(&actions);
2003 posix_spawnattr_destroy(&attr);
2004 }).then([&child_pid, &cin_pipe, &cout_pipe, &cerr_pipe] (syscall_result<int> ret) {
2005 throw_pthread_error(ret.result);
2006 return make_ready_future<std::tuple<pid_t, file_desc, file_desc, file_desc>>(
2007 child_pid,
2008 std::get<pipefd_write_end>(std::move(cin_pipe)),
2009 std::get<pipefd_read_end>(std::move(cout_pipe)),
2010 std::get<pipefd_read_end>(std::move(cerr_pipe)));
2011 });
2012 });
2013 });
2014 }
2015
2016 static auto next_waitpid_timeout(std::chrono::milliseconds this_timeout) {
2017 static const std::chrono::milliseconds step_timeout(20);
2018 static const std::chrono::milliseconds max_timeout(1000);
2019 if (this_timeout >= max_timeout) {
2020 return max_timeout;
2021 }
2022 return this_timeout + step_timeout;
2023 }
2024
2025 #ifndef __NR_pidfd_open
2026
2027 # if defined(__alpha__)
2028 # define __NR_pidfd_open 544
2029 # else
2030 # define __NR_pidfd_open 434
2031 # endif
2032
2033 #endif
2034
2035 future<int> reactor::waitpid(pid_t pid) {
2036 return _thread_pool->submit<syscall_result<int>>([pid] {
2037 return wrap_syscall<int>(syscall(__NR_pidfd_open, pid, O_NONBLOCK));
2038 }).then([pid, this] (syscall_result<int> pidfd) {
2039 if (pidfd.result == -1) {
2040 // pidfd_open() was introduced in linux 5.3, so the pidfd.error could be ENOSYS on
2041 // older kernels. But it could be other error like EMFILE or ENFILE. anyway, we
2042 // should always waitpid().
2043 return do_with(int{}, std::chrono::milliseconds(0), [pid, this](int& wstatus,
2044 std::chrono::milliseconds& wait_timeout) {
2045 return repeat_until_value([this,
2046 pid,
2047 &wstatus,
2048 &wait_timeout] {
2049 return _thread_pool->submit<syscall_result<pid_t>>([pid, &wstatus] {
2050 return wrap_syscall<pid_t>(::waitpid(pid, &wstatus, WNOHANG));
2051 }).then([&wstatus, &wait_timeout] (syscall_result<pid_t> ret) mutable {
2052 if (ret.result == 0) {
2053 wait_timeout = next_waitpid_timeout(wait_timeout);
2054 return ::seastar::sleep(wait_timeout).then([] {
2055 return make_ready_future<std::optional<int>>();
2056 });
2057 } else if (ret.result > 0) {
2058 return make_ready_future<std::optional<int>>(wstatus);
2059 } else {
2060 ret.throw_if_error();
2061 return make_ready_future<std::optional<int>>(-1);
2062 }
2063 });
2064 });
2065 });
2066 } else {
2067 return do_with(pollable_fd(file_desc::from_fd(pidfd.result)), int{}, [pid, this](auto& pidfd, int& wstatus) {
2068 return pidfd.readable().then([pid, &wstatus, this] {
2069 return _thread_pool->submit<syscall_result<pid_t>>([pid, &wstatus] {
2070 return wrap_syscall<pid_t>(::waitpid(pid, &wstatus, WNOHANG));
2071 });
2072 }).then([&wstatus] (syscall_result<pid_t> ret) {
2073 ret.throw_if_error();
2074 assert(ret.result > 0);
2075 return make_ready_future<int>(wstatus);
2076 });
2077 });
2078 }
2079 });
2080 }
2081
2082 void reactor::kill(pid_t pid, int sig) {
2083 auto ret = wrap_syscall<int>(::kill(pid, sig));
2084 ret.throw_if_error();
2085 }
2086
2087 future<stat_data>
2088 reactor::file_stat(std::string_view pathname, follow_symlink follow) noexcept {
2089 // Allocating memory for a sstring can throw, hence the futurize_invoke
2090 return futurize_invoke([pathname, follow, this] {
2091 return _thread_pool->submit<syscall_result_extra<struct stat>>([pathname = sstring(pathname), follow] {
2092 struct stat st;
2093 auto stat_syscall = follow ? stat : lstat;
2094 auto ret = stat_syscall(pathname.c_str(), &st);
2095 return wrap_syscall(ret, st);
2096 }).then([pathname = sstring(pathname)] (syscall_result_extra<struct stat> sr) {
2097 sr.throw_fs_exception_if_error("stat failed", pathname);
2098 struct stat& st = sr.extra;
2099 stat_data sd;
2100 sd.device_id = st.st_dev;
2101 sd.inode_number = st.st_ino;
2102 sd.mode = st.st_mode;
2103 sd.type = stat_to_entry_type(st.st_mode);
2104 sd.number_of_links = st.st_nlink;
2105 sd.uid = st.st_uid;
2106 sd.gid = st.st_gid;
2107 sd.rdev = st.st_rdev;
2108 sd.size = st.st_size;
2109 sd.block_size = st.st_blksize;
2110 sd.allocated_size = st.st_blocks * 512UL;
2111 sd.time_accessed = timespec_to_time_point(st.st_atim);
2112 sd.time_modified = timespec_to_time_point(st.st_mtim);
2113 sd.time_changed = timespec_to_time_point(st.st_ctim);
2114 return make_ready_future<stat_data>(std::move(sd));
2115 });
2116 });
2117 }
2118
2119 future<uint64_t>
2120 reactor::file_size(std::string_view pathname) noexcept {
2121 return file_stat(pathname, follow_symlink::yes).then([] (stat_data sd) {
2122 return make_ready_future<uint64_t>(sd.size);
2123 });
2124 }
2125
2126 future<bool>
2127 reactor::file_accessible(std::string_view pathname, access_flags flags) noexcept {
2128 // Allocating memory for a sstring can throw, hence the futurize_invoke
2129 return futurize_invoke([pathname, flags, this] {
2130 return _thread_pool->submit<syscall_result<int>>([pathname = sstring(pathname), flags] {
2131 auto aflags = std::underlying_type_t<access_flags>(flags);
2132 auto ret = ::access(pathname.c_str(), aflags);
2133 return wrap_syscall(ret);
2134 }).then([pathname = sstring(pathname), flags] (syscall_result<int> sr) {
2135 if (sr.result < 0) {
2136 if ((sr.error == ENOENT && flags == access_flags::exists) ||
2137 (sr.error == EACCES && flags != access_flags::exists)) {
2138 return make_ready_future<bool>(false);
2139 }
2140 sr.throw_fs_exception("access failed", fs::path(pathname));
2141 }
2142
2143 return make_ready_future<bool>(true);
2144 });
2145 });
2146 }
2147
2148 future<fs_type>
2149 reactor::file_system_at(std::string_view pathname) noexcept {
2150 // Allocating memory for a sstring can throw, hence the futurize_invoke
2151 return futurize_invoke([pathname, this] {
2152 return _thread_pool->submit<syscall_result_extra<struct statfs>>([pathname = sstring(pathname)] {
2153 struct statfs st;
2154 auto ret = statfs(pathname.c_str(), &st);
2155 return wrap_syscall(ret, st);
2156 }).then([pathname = sstring(pathname)] (syscall_result_extra<struct statfs> sr) {
2157 static std::unordered_map<long int, fs_type> type_mapper = {
2158 { internal::fs_magic::xfs, fs_type::xfs },
2159 { internal::fs_magic::ext2, fs_type::ext2 },
2160 { internal::fs_magic::ext3, fs_type::ext3 },
2161 { internal::fs_magic::ext4, fs_type::ext4 },
2162 { internal::fs_magic::btrfs, fs_type::btrfs },
2163 { internal::fs_magic::hfs, fs_type::hfs },
2164 { internal::fs_magic::tmpfs, fs_type::tmpfs },
2165 };
2166 sr.throw_fs_exception_if_error("statfs failed", pathname);
2167
2168 fs_type ret = fs_type::other;
2169 if (type_mapper.count(sr.extra.f_type) != 0) {
2170 ret = type_mapper.at(sr.extra.f_type);
2171 }
2172 return make_ready_future<fs_type>(ret);
2173 });
2174 });
2175 }
2176
2177 future<struct statfs>
2178 reactor::fstatfs(int fd) noexcept {
2179 return _thread_pool->submit<syscall_result_extra<struct statfs>>([fd] {
2180 struct statfs st;
2181 auto ret = ::fstatfs(fd, &st);
2182 return wrap_syscall(ret, st);
2183 }).then([] (syscall_result_extra<struct statfs> sr) {
2184 sr.throw_if_error();
2185 struct statfs st = sr.extra;
2186 return make_ready_future<struct statfs>(std::move(st));
2187 });
2188 }
2189
2190 future<struct statvfs>
2191 reactor::statvfs(std::string_view pathname) noexcept {
2192 // Allocating memory for a sstring can throw, hence the futurize_invoke
2193 return futurize_invoke([pathname, this] {
2194 return _thread_pool->submit<syscall_result_extra<struct statvfs>>([pathname = sstring(pathname)] {
2195 struct statvfs st;
2196 auto ret = ::statvfs(pathname.c_str(), &st);
2197 return wrap_syscall(ret, st);
2198 }).then([pathname = sstring(pathname)] (syscall_result_extra<struct statvfs> sr) {
2199 sr.throw_fs_exception_if_error("statvfs failed", pathname);
2200 struct statvfs st = sr.extra;
2201 return make_ready_future<struct statvfs>(std::move(st));
2202 });
2203 });
2204 }
2205
2206 future<file>
2207 reactor::open_directory(std::string_view name) noexcept {
2208 // Allocating memory for a sstring can throw, hence the futurize_invoke
2209 return futurize_invoke([name, this] {
2210 auto oflags = O_DIRECTORY | O_CLOEXEC | O_RDONLY;
2211 return _thread_pool->submit<syscall_result<int>>([name = sstring(name), oflags] {
2212 return wrap_syscall<int>(::open(name.c_str(), oflags));
2213 }).then([name = sstring(name), oflags] (syscall_result<int> sr) {
2214 sr.throw_fs_exception_if_error("open failed", name);
2215 return make_file_impl(sr.result, file_open_options(), oflags);
2216 }).then([] (shared_ptr<file_impl> file_impl) {
2217 return make_ready_future<file>(std::move(file_impl));
2218 });
2219 });
2220 }
2221
2222 future<>
2223 reactor::make_directory(std::string_view name, file_permissions permissions) noexcept {
2224 // Allocating memory for a sstring can throw, hence the futurize_invoke
2225 return futurize_invoke([name, permissions, this] {
2226 return _thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
2227 auto mode = static_cast<mode_t>(permissions);
2228 return wrap_syscall<int>(::mkdir(name.c_str(), mode));
2229 }).then([name = sstring(name)] (syscall_result<int> sr) {
2230 sr.throw_fs_exception_if_error("mkdir failed", name);
2231 });
2232 });
2233 }
2234
2235 future<>
2236 reactor::touch_directory(std::string_view name, file_permissions permissions) noexcept {
2237 // Allocating memory for a sstring can throw, hence the futurize_invoke
2238 return futurize_invoke([name, permissions] {
2239 return engine()._thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
2240 auto mode = static_cast<mode_t>(permissions);
2241 return wrap_syscall<int>(::mkdir(name.c_str(), mode));
2242 }).then([name = sstring(name)] (syscall_result<int> sr) {
2243 if (sr.result == -1 && sr.error != EEXIST) {
2244 sr.throw_fs_exception("mkdir failed", fs::path(name));
2245 }
2246 return make_ready_future<>();
2247 });
2248 });
2249 }
2250
2251 future<>
2252 reactor::fdatasync(int fd) noexcept {
2253 ++_fsyncs;
2254 if (_bypass_fsync) {
2255 return make_ready_future<>();
2256 }
2257 if (_have_aio_fsync) {
2258 // Does not go through the I/O queue, but has to be deleted
2259 struct fsync_io_desc final : public io_completion {
2260 promise<> _pr;
2261 public:
2262 virtual void complete(size_t res) noexcept override {
2263 _pr.set_value();
2264 delete this;
2265 }
2266
2267 virtual void set_exception(std::exception_ptr eptr) noexcept override {
2268 _pr.set_exception(std::move(eptr));
2269 delete this;
2270 }
2271
2272 future<> get_future() {
2273 return _pr.get_future();
2274 }
2275 };
2276
2277 return futurize_invoke([this, fd] {
2278 auto desc = new fsync_io_desc;
2279 auto fut = desc->get_future();
2280 auto req = internal::io_request::make_fdatasync(fd);
2281 _io_sink.submit(desc, std::move(req));
2282 return fut;
2283 });
2284 }
2285 return _thread_pool->submit<syscall_result<int>>([fd] {
2286 return wrap_syscall<int>(::fdatasync(fd));
2287 }).then([] (syscall_result<int> sr) {
2288 sr.throw_if_error();
2289 return make_ready_future<>();
2290 });
2291 }
2292
2293 // Note: terminate if arm_highres_timer throws
2294 // `when` should always be valid
2295 void reactor::enable_timer(steady_clock_type::time_point when) noexcept
2296 {
2297 #ifndef HAVE_OSV
2298 itimerspec its;
2299 its.it_interval = {};
2300 its.it_value = to_timespec(when);
2301 _backend->arm_highres_timer(its);
2302 #else
2303 using ns = std::chrono::nanoseconds;
2304 WITH_LOCK(_timer_mutex) {
2305 _timer_due = std::chrono::duration_cast<ns>(when.time_since_epoch()).count();
2306 _timer_cond.wake_one();
2307 }
2308 #endif
2309 }
2310
2311 void reactor::add_timer(timer<steady_clock_type>* tmr) noexcept {
2312 if (queue_timer(tmr)) {
2313 enable_timer(_timers.get_next_timeout());
2314 }
2315 }
2316
2317 bool reactor::queue_timer(timer<steady_clock_type>* tmr) noexcept {
2318 return _timers.insert(*tmr);
2319 }
2320
2321 void reactor::del_timer(timer<steady_clock_type>* tmr) noexcept {
2322 if (tmr->_expired) {
2323 _expired_timers.erase(_expired_timers.iterator_to(*tmr));
2324 tmr->_expired = false;
2325 } else {
2326 _timers.remove(*tmr);
2327 }
2328 }
2329
2330 void reactor::add_timer(timer<lowres_clock>* tmr) noexcept {
2331 if (queue_timer(tmr)) {
2332 _lowres_next_timeout = _lowres_timers.get_next_timeout();
2333 }
2334 }
2335
2336 bool reactor::queue_timer(timer<lowres_clock>* tmr) noexcept {
2337 return _lowres_timers.insert(*tmr);
2338 }
2339
2340 void reactor::del_timer(timer<lowres_clock>* tmr) noexcept {
2341 if (tmr->_expired) {
2342 _expired_lowres_timers.erase(_expired_lowres_timers.iterator_to(*tmr));
2343 tmr->_expired = false;
2344 } else {
2345 _lowres_timers.remove(*tmr);
2346 }
2347 }
2348
2349 void reactor::add_timer(timer<manual_clock>* tmr) noexcept {
2350 queue_timer(tmr);
2351 }
2352
2353 bool reactor::queue_timer(timer<manual_clock>* tmr) noexcept {
2354 return _manual_timers.insert(*tmr);
2355 }
2356
2357 void reactor::del_timer(timer<manual_clock>* tmr) noexcept {
2358 if (tmr->_expired) {
2359 _expired_manual_timers.erase(_expired_manual_timers.iterator_to(*tmr));
2360 tmr->_expired = false;
2361 } else {
2362 _manual_timers.remove(*tmr);
2363 }
2364 }
2365
2366 void reactor::at_exit(noncopyable_function<future<> ()> func) {
2367 assert(!_stopping);
2368 _exit_funcs.push_back(std::move(func));
2369 }
2370
2371 future<> reactor::run_exit_tasks() {
2372 _stop_requested.broadcast();
2373 _stopping = true;
2374 stop_aio_eventfd_loop();
2375 return do_for_each(_exit_funcs.rbegin(), _exit_funcs.rend(), [] (auto& func) {
2376 return func();
2377 });
2378 }
2379
2380 void reactor::stop() {
2381 assert(_id == 0);
2382 _smp->cleanup_cpu();
2383 if (!_stopping) {
2384 // Run exit tasks locally and then stop all other engines
2385 // in the background and wait on semaphore for all to complete.
2386 // Finally, set _stopped on cpu 0.
2387 (void)run_exit_tasks().then([this] {
2388 return do_with(semaphore(0), [this] (semaphore& sem) {
2389 // Stop other cpus asynchronously, signal when done.
2390 (void)smp::invoke_on_others(0, [] {
2391 engine()._smp->cleanup_cpu();
2392 return engine().run_exit_tasks().then([] {
2393 engine()._stopped = true;
2394 });
2395 }).then([&sem]() {
2396 sem.signal();
2397 });
2398 return sem.wait().then([this] {
2399 _stopped = true;
2400 });
2401 });
2402 });
2403 }
2404 }
2405
2406 void reactor::exit(int ret) {
2407 // Run stop() asynchronously on cpu 0.
2408 (void)smp::submit_to(0, [this, ret] { _return = ret; stop(); });
2409 }
2410
2411 uint64_t
2412 reactor::pending_task_count() const {
2413 uint64_t ret = 0;
2414 for (auto&& tq : _task_queues) {
2415 ret += tq->_q.size();
2416 }
2417 return ret;
2418 }
2419
2420 uint64_t
2421 reactor::tasks_processed() const {
2422 return _global_tasks_processed;
2423 }
2424
2425 void reactor::register_metrics() {
2426
2427 namespace sm = seastar::metrics;
2428
2429 _metric_groups.add_group("reactor", {
2430 sm::make_gauge("tasks_pending", std::bind(&reactor::pending_task_count, this), sm::description("Number of pending tasks in the queue")),
2431 // total_operations value:DERIVE:0:U
2432 sm::make_counter("tasks_processed", std::bind(&reactor::tasks_processed, this), sm::description("Total tasks processed")),
2433 sm::make_counter("polls", _polls, sm::description("Number of times pollers were executed")),
2434 sm::make_gauge("timers_pending", std::bind(&decltype(_timers)::size, &_timers), sm::description("Number of tasks in the timer-pending queue")),
2435 sm::make_gauge("utilization", [this] { return (1-_load) * 100; }, sm::description("CPU utilization")),
2436 sm::make_counter("cpu_busy_ms", [this] () -> int64_t { return total_busy_time() / 1ms; },
2437 sm::description("Total cpu busy time in milliseconds")),
2438 sm::make_counter("cpu_steal_time_ms", [this] () -> int64_t { return total_steal_time() / 1ms; },
2439 sm::description("Total steal time, the time in which some other process was running while Seastar was not trying to run (not sleeping)."
2440 "Because this is in userspace, some time that could be legitimally thought as steal time is not accounted as such. For example, if we are sleeping and can wake up but the kernel hasn't woken us up yet.")),
2441 // total_operations value:DERIVE:0:U
2442 sm::make_counter("aio_reads", _io_stats.aio_reads, sm::description("Total aio-reads operations")),
2443
2444 sm::make_total_bytes("aio_bytes_read", _io_stats.aio_read_bytes, sm::description("Total aio-reads bytes")),
2445 // total_operations value:DERIVE:0:U
2446 sm::make_counter("aio_writes", _io_stats.aio_writes, sm::description("Total aio-writes operations")),
2447 sm::make_total_bytes("aio_bytes_write", _io_stats.aio_write_bytes, sm::description("Total aio-writes bytes")),
2448 sm::make_counter("aio_outsizes", _io_stats.aio_outsizes, sm::description("Total number of aio operations that exceed IO limit")),
2449 sm::make_counter("aio_errors", _io_stats.aio_errors, sm::description("Total aio errors")),
2450 // total_operations value:DERIVE:0:U
2451 sm::make_counter("fsyncs", _fsyncs, sm::description("Total number of fsync operations")),
2452 // total_operations value:DERIVE:0:U
2453 sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::operation_count, _thread_pool.get()),
2454 sm::description("Total number of io-threaded-fallbacks operations")),
2455
2456 });
2457
2458 _metric_groups.add_group("memory", {
2459 sm::make_counter("malloc_operations", [] { return memory::stats().mallocs(); },
2460 sm::description("Total number of malloc operations")),
2461 sm::make_counter("free_operations", [] { return memory::stats().frees(); }, sm::description("Total number of free operations")),
2462 sm::make_counter("cross_cpu_free_operations", [] { return memory::stats().cross_cpu_frees(); }, sm::description("Total number of cross cpu free")),
2463 sm::make_gauge("malloc_live_objects", [] { return memory::stats().live_objects(); }, sm::description("Number of live objects")),
2464 sm::make_current_bytes("free_memory", [] { return memory::stats().free_memory(); }, sm::description("Free memory size in bytes")),
2465 sm::make_current_bytes("total_memory", [] { return memory::stats().total_memory(); }, sm::description("Total memory size in bytes")),
2466 sm::make_current_bytes("allocated_memory", [] { return memory::stats().allocated_memory(); }, sm::description("Allocated memory size in bytes")),
2467 sm::make_counter("reclaims_operations", [] { return memory::stats().reclaims(); }, sm::description("Total reclaims operations")),
2468 sm::make_counter("malloc_failed", [] { return memory::stats().failed_allocations(); }, sm::description("Total count of failed memory allocations"))
2469 });
2470
2471 _metric_groups.add_group("reactor", {
2472 sm::make_counter("logging_failures", [] { return logging_failures; }, sm::description("Total number of logging failures")),
2473 // total_operations value:DERIVE:0:U
2474 sm::make_counter("cpp_exceptions", _cxx_exceptions, sm::description("Total number of C++ exceptions")),
2475 sm::make_counter("abandoned_failed_futures", _abandoned_failed_futures, sm::description("Total number of abandoned failed futures, futures destroyed while still containing an exception")),
2476 });
2477
2478 using namespace seastar::metrics;
2479 _metric_groups.add_group("reactor", {
2480 make_counter("fstream_reads", _io_stats.fstream_reads,
2481 description(
2482 "Counts reads from disk file streams. A high rate indicates high disk activity."
2483 " Contrast with other fstream_read* counters to locate bottlenecks.")),
2484 make_counter("fstream_read_bytes", _io_stats.fstream_read_bytes,
2485 description(
2486 "Counts bytes read from disk file streams. A high rate indicates high disk activity."
2487 " Divide by fstream_reads to determine average read size.")),
2488 make_counter("fstream_reads_blocked", _io_stats.fstream_reads_blocked,
2489 description(
2490 "Counts the number of times a disk read could not be satisfied from read-ahead buffers, and had to block."
2491 " Indicates short streams, or incorrect read ahead configuration.")),
2492 make_counter("fstream_read_bytes_blocked", _io_stats.fstream_read_bytes_blocked,
2493 description(
2494 "Counts the number of bytes read from disk that could not be satisfied from read-ahead buffers, and had to block."
2495 " Indicates short streams, or incorrect read ahead configuration.")),
2496 make_counter("fstream_reads_aheads_discarded", _io_stats.fstream_read_aheads_discarded,
2497 description(
2498 "Counts the number of times a buffer that was read ahead of time and was discarded because it was not needed, wasting disk bandwidth."
2499 " Indicates over-eager read ahead configuration.")),
2500 make_counter("fstream_reads_ahead_bytes_discarded", _io_stats.fstream_read_ahead_discarded_bytes,
2501 description(
2502 "Counts the number of buffered bytes that were read ahead of time and were discarded because they were not needed, wasting disk bandwidth."
2503 " Indicates over-eager read ahead configuration.")),
2504 });
2505 }
2506
2507 void reactor::run_tasks(task_queue& tq) {
2508 // Make sure new tasks will inherit our scheduling group
2509 *internal::current_scheduling_group_ptr() = scheduling_group(tq._id);
2510 auto& tasks = tq._q;
2511 while (!tasks.empty()) {
2512 auto tsk = tasks.front();
2513 tasks.pop_front();
2514 STAP_PROBE(seastar, reactor_run_tasks_single_start);
2515 internal::task_histogram_add_task(*tsk);
2516 _current_task = tsk;
2517 tsk->run_and_dispose();
2518 _current_task = nullptr;
2519 STAP_PROBE(seastar, reactor_run_tasks_single_end);
2520 ++tq._tasks_processed;
2521 ++_global_tasks_processed;
2522 // check at end of loop, to allow at least one task to run
2523 if (need_preempt()) {
2524 if (tasks.size() <= _max_task_backlog) {
2525 break;
2526 } else {
2527 // While need_preempt() is set, task execution is inefficient due to
2528 // need_preempt() checks breaking out of loops and .then() calls. See
2529 // #302.
2530 reset_preemption_monitor();
2531 }
2532 }
2533 }
2534 }
2535
2536 namespace {
2537
2538 #ifdef SEASTAR_SHUFFLE_TASK_QUEUE
2539 void shuffle(task*& t, circular_buffer<task*>& q) {
2540 static thread_local std::mt19937 gen = std::mt19937(std::default_random_engine()());
2541 std::uniform_int_distribution<size_t> tasks_dist{0, q.size() - 1};
2542 auto& to_swap = q[tasks_dist(gen)];
2543 std::swap(to_swap, t);
2544 }
2545 #else
2546 void shuffle(task*&, circular_buffer<task*>&) {
2547 }
2548 #endif
2549
2550 }
2551
2552 void reactor::force_poll() {
2553 request_preemption();
2554 }
2555
2556 bool
2557 reactor::flush_tcp_batches() {
2558 bool work = !_flush_batching.empty();
2559 while (!_flush_batching.empty()) {
2560 auto& os = _flush_batching.front();
2561 _flush_batching.pop_front();
2562 os.poll_flush();
2563 }
2564 return work;
2565 }
2566
2567 bool
2568 reactor::do_expire_lowres_timers() noexcept {
2569 auto now = lowres_clock::now();
2570 if (now >= _lowres_next_timeout) {
2571 complete_timers(_lowres_timers, _expired_lowres_timers, [this] () noexcept {
2572 if (!_lowres_timers.empty()) {
2573 _lowres_next_timeout = _lowres_timers.get_next_timeout();
2574 } else {
2575 _lowres_next_timeout = lowres_clock::time_point::max();
2576 }
2577 });
2578 return true;
2579 }
2580 return false;
2581 }
2582
2583 void
2584 reactor::expire_manual_timers() noexcept {
2585 complete_timers(_manual_timers, _expired_manual_timers, [] () noexcept {});
2586 }
2587
2588 void
2589 manual_clock::expire_timers() noexcept {
2590 local_engine->expire_manual_timers();
2591 }
2592
2593 void
2594 manual_clock::advance(manual_clock::duration d) noexcept {
2595 _now.fetch_add(d.count());
2596 if (local_engine) {
2597 schedule_urgent(make_task(default_scheduling_group(), &manual_clock::expire_timers));
2598 // Expire timers on all cores in the background.
2599 (void)smp::invoke_on_all(&manual_clock::expire_timers);
2600 }
2601 }
2602
2603 bool
2604 reactor::do_check_lowres_timers() const noexcept {
2605 return lowres_clock::now() > _lowres_next_timeout;
2606 }
2607
2608 #ifndef HAVE_OSV
2609
2610 class reactor::kernel_submit_work_pollfn final : public simple_pollfn<true> {
2611 reactor& _r;
2612 public:
2613 kernel_submit_work_pollfn(reactor& r) : _r(r) {}
2614 virtual bool poll() override final {
2615 return _r._backend->kernel_submit_work();
2616 }
2617 };
2618
2619 #endif
2620
2621 class reactor::signal_pollfn final : public reactor::pollfn {
2622 reactor& _r;
2623 public:
2624 signal_pollfn(reactor& r) : _r(r) {}
2625 virtual bool poll() final override {
2626 return _r._signals.poll_signal();
2627 }
2628 virtual bool pure_poll() override final {
2629 return _r._signals.pure_poll_signal();
2630 }
2631 virtual bool try_enter_interrupt_mode() override {
2632 // Signals will interrupt our epoll_pwait() call, but
2633 // disable them now to avoid a signal between this point
2634 // and epoll_pwait()
2635 sigset_t block_all;
2636 sigfillset(&block_all);
2637 ::pthread_sigmask(SIG_SETMASK, &block_all, &_r._active_sigmask);
2638 if (poll()) {
2639 // raced already, and lost
2640 exit_interrupt_mode();
2641 return false;
2642 }
2643 return true;
2644 }
2645 virtual void exit_interrupt_mode() override final {
2646 ::pthread_sigmask(SIG_SETMASK, &_r._active_sigmask, nullptr);
2647 }
2648 };
2649
2650 class reactor::batch_flush_pollfn final : public simple_pollfn<true> {
2651 reactor& _r;
2652 public:
2653 batch_flush_pollfn(reactor& r) : _r(r) {}
2654 virtual bool poll() final override {
2655 return _r.flush_tcp_batches();
2656 }
2657 };
2658
2659 class reactor::reap_kernel_completions_pollfn final : public reactor::pollfn {
2660 reactor& _r;
2661 public:
2662 reap_kernel_completions_pollfn(reactor& r) : _r(r) {}
2663 virtual bool poll() final override {
2664 return _r.reap_kernel_completions();
2665 }
2666 virtual bool pure_poll() override final {
2667 return poll(); // actually performs work, but triggers no user continuations, so okay
2668 }
2669 virtual bool try_enter_interrupt_mode() override {
2670 return _r._backend->kernel_events_can_sleep();
2671 }
2672 virtual void exit_interrupt_mode() override final {
2673 }
2674 };
2675
2676 class reactor::io_queue_submission_pollfn final : public reactor::pollfn {
2677 reactor& _r;
2678 // Wake-up the reactor with highres timer when the io-queue
2679 // decides to delay dispatching until some time point in
2680 // the future
2681 timer<> _nearest_wakeup { [this] { _armed = false; } };
2682 bool _armed = false;
2683 public:
2684 io_queue_submission_pollfn(reactor& r) : _r(r) {}
2685 virtual bool poll() final override {
2686 return _r.flush_pending_aio();
2687 }
2688 virtual bool pure_poll() override final {
2689 return poll();
2690 }
2691 virtual bool try_enter_interrupt_mode() override {
2692 auto next = _r.next_pending_aio();
2693 auto now = steady_clock_type::now();
2694 if (next <= now) {
2695 return false;
2696 }
2697 _nearest_wakeup.arm(next);
2698 _armed = true;
2699 return true;
2700 }
2701 virtual void exit_interrupt_mode() override final {
2702 if (_armed) {
2703 _nearest_wakeup.cancel();
2704 _armed = false;
2705 }
2706 }
2707 };
2708
2709 // Other cpus can queue items for us to free; and they won't notify
2710 // us about them. But it's okay to ignore those items, freeing them
2711 // doesn't have any side effects.
2712 //
2713 // We'll take care of those items when we wake up for another reason.
2714 class reactor::drain_cross_cpu_freelist_pollfn final : public simple_pollfn<true> {
2715 public:
2716 virtual bool poll() final override {
2717 return memory::drain_cross_cpu_freelist();
2718 }
2719 };
2720
2721 class reactor::lowres_timer_pollfn final : public reactor::pollfn {
2722 reactor& _r;
2723 // A highres timer is implemented as a waking signal; so
2724 // we arm one when we have a lowres timer during sleep, so
2725 // it can wake us up.
2726 timer<> _nearest_wakeup { [this] { _armed = false; } };
2727 bool _armed = false;
2728 public:
2729 lowres_timer_pollfn(reactor& r) : _r(r) {}
2730 virtual bool poll() final override {
2731 return _r.do_expire_lowres_timers();
2732 }
2733 virtual bool pure_poll() final override {
2734 return _r.do_check_lowres_timers();
2735 }
2736 virtual bool try_enter_interrupt_mode() override {
2737 // arm our highres timer so a signal will wake us up
2738 auto next = _r._lowres_next_timeout;
2739 if (next == lowres_clock::time_point::max()) {
2740 // no pending timers
2741 return true;
2742 }
2743 auto now = lowres_clock::now();
2744 if (next <= now) {
2745 // whoops, go back
2746 return false;
2747 }
2748 _nearest_wakeup.arm(next - now);
2749 _armed = true;
2750 return true;
2751 }
2752 virtual void exit_interrupt_mode() override final {
2753 if (_armed) {
2754 _nearest_wakeup.cancel();
2755 _armed = false;
2756 }
2757 }
2758 };
2759
2760 class reactor::smp_pollfn final : public reactor::pollfn {
2761 reactor& _r;
2762 public:
2763 smp_pollfn(reactor& r) : _r(r) {}
2764 virtual bool poll() final override {
2765 // Avoid short-circuiting with `||` since there are side effects
2766 // we want to take place (instantiating tasks from the alien queue).
2767 // Cast to int to silence gcc -Wbitwise-instead-of-logical.
2768 return (int(smp::poll_queues()) |
2769 int(_r._alien.poll_queues()));
2770 }
2771 virtual bool pure_poll() final override {
2772 return (smp::pure_poll_queues() ||
2773 _r._alien.pure_poll_queues());
2774 }
2775 virtual bool try_enter_interrupt_mode() override {
2776 // systemwide_memory_barrier() is very slow if run concurrently,
2777 // so don't go to sleep if it is running now.
2778 _r._sleeping.store(true, std::memory_order_relaxed);
2779 bool barrier_done = try_systemwide_memory_barrier();
2780 if (!barrier_done) {
2781 _r._sleeping.store(false, std::memory_order_relaxed);
2782 return false;
2783 }
2784 if (poll()) {
2785 // raced
2786 _r._sleeping.store(false, std::memory_order_relaxed);
2787 return false;
2788 }
2789 return true;
2790 }
2791 virtual void exit_interrupt_mode() override final {
2792 _r._sleeping.store(false, std::memory_order_relaxed);
2793 }
2794 };
2795
2796 class reactor::execution_stage_pollfn final : public reactor::pollfn {
2797 internal::execution_stage_manager& _esm;
2798 public:
2799 execution_stage_pollfn() : _esm(internal::execution_stage_manager::get()) { }
2800
2801 virtual bool poll() override {
2802 return _esm.flush();
2803 }
2804 virtual bool pure_poll() override {
2805 return _esm.poll();
2806 }
2807 virtual bool try_enter_interrupt_mode() override {
2808 // This is a passive poller, so if a previous poll
2809 // returned false (idle), there's no more work to do.
2810 return true;
2811 }
2812 virtual void exit_interrupt_mode() override { }
2813 };
2814
2815 class reactor::syscall_pollfn final : public reactor::pollfn {
2816 reactor& _r;
2817 public:
2818 syscall_pollfn(reactor& r) : _r(r) {}
2819 virtual bool poll() final override {
2820 return _r._thread_pool->complete();
2821 }
2822 virtual bool pure_poll() override final {
2823 return poll(); // actually performs work, but triggers no user continuations, so okay
2824 }
2825 virtual bool try_enter_interrupt_mode() override {
2826 _r._thread_pool->enter_interrupt_mode();
2827 if (poll()) {
2828 // raced
2829 _r._thread_pool->exit_interrupt_mode();
2830 return false;
2831 }
2832 return true;
2833 }
2834 virtual void exit_interrupt_mode() override final {
2835 _r._thread_pool->exit_interrupt_mode();
2836 }
2837 };
2838
2839 void
2840 reactor::wakeup() {
2841 uint64_t one = 1;
2842 ::write(_notify_eventfd.get(), &one, sizeof(one));
2843 }
2844
2845 void reactor::start_aio_eventfd_loop() {
2846 if (!_aio_eventfd) {
2847 return;
2848 }
2849 future<> loop_done = repeat([this] {
2850 return _aio_eventfd->readable().then([this] {
2851 char garbage[8];
2852 ::read(_aio_eventfd->get_fd(), garbage, 8); // totally uninteresting
2853 return _stopping ? stop_iteration::yes : stop_iteration::no;
2854 });
2855 });
2856 // must use make_lw_shared, because at_exit expects a copyable function
2857 at_exit([loop_done = make_lw_shared(std::move(loop_done))] {
2858 return std::move(*loop_done);
2859 });
2860 }
2861
2862 void reactor::stop_aio_eventfd_loop() {
2863 if (!_aio_eventfd) {
2864 return;
2865 }
2866 uint64_t one = 1;
2867 ::write(_aio_eventfd->get_fd(), &one, 8);
2868 }
2869
2870 inline
2871 bool
2872 reactor::have_more_tasks() const {
2873 return _active_task_queues.size() + _activating_task_queues.size();
2874 }
2875
2876 void reactor::insert_active_task_queue(task_queue* tq) {
2877 tq->_active = true;
2878 auto& atq = _active_task_queues;
2879 auto less = task_queue::indirect_compare();
2880 if (atq.empty() || less(atq.back(), tq)) {
2881 // Common case: idle->working
2882 // Common case: CPU intensive task queue going to the back
2883 atq.push_back(tq);
2884 } else {
2885 // Common case: newly activated queue preempting everything else
2886 atq.push_front(tq);
2887 // Less common case: newly activated queue behind something already active
2888 size_t i = 0;
2889 while (i + 1 != atq.size() && !less(atq[i], atq[i+1])) {
2890 std::swap(atq[i], atq[i+1]);
2891 ++i;
2892 }
2893 }
2894 }
2895
2896 reactor::task_queue* reactor::pop_active_task_queue(sched_clock::time_point now) {
2897 task_queue* tq = _active_task_queues.front();
2898 _active_task_queues.pop_front();
2899 tq->_starvetime += now - tq->_ts;
2900 return tq;
2901 }
2902
2903 void
2904 reactor::insert_activating_task_queues() {
2905 // Quadratic, but since we expect the common cases in insert_active_task_queue() to dominate, faster
2906 for (auto&& tq : _activating_task_queues) {
2907 insert_active_task_queue(tq);
2908 }
2909 _activating_task_queues.clear();
2910 }
2911
2912 void reactor::add_task(task* t) noexcept {
2913 auto sg = t->group();
2914 auto* q = _task_queues[sg._id].get();
2915 bool was_empty = q->_q.empty();
2916 q->_q.push_back(std::move(t));
2917 shuffle(q->_q.back(), q->_q);
2918 if (was_empty) {
2919 activate(*q);
2920 }
2921 }
2922
2923 void reactor::add_urgent_task(task* t) noexcept {
2924 memory::scoped_critical_alloc_section _;
2925 auto sg = t->group();
2926 auto* q = _task_queues[sg._id].get();
2927 bool was_empty = q->_q.empty();
2928 q->_q.push_front(std::move(t));
2929 shuffle(q->_q.front(), q->_q);
2930 if (was_empty) {
2931 activate(*q);
2932 }
2933 }
2934
2935 void
2936 reactor::run_some_tasks() {
2937 if (!have_more_tasks()) {
2938 return;
2939 }
2940 sched_print("run_some_tasks: start");
2941 reset_preemption_monitor();
2942 lowres_clock::update();
2943
2944 sched_clock::time_point t_run_completed = now();
2945 STAP_PROBE(seastar, reactor_run_tasks_start);
2946 _cpu_stall_detector->start_task_run(t_run_completed);
2947 do {
2948 auto t_run_started = t_run_completed;
2949 insert_activating_task_queues();
2950 task_queue* tq = pop_active_task_queue(t_run_started);
2951 sched_print("running tq {} {}", (void*)tq, tq->_name);
2952 tq->_current = true;
2953 _last_vruntime = std::max(tq->_vruntime, _last_vruntime);
2954 run_tasks(*tq);
2955 tq->_current = false;
2956 t_run_completed = now();
2957 auto delta = t_run_completed - t_run_started;
2958 account_runtime(*tq, delta);
2959 sched_print("run complete ({} {}); time consumed {} usec; final vruntime {} empty {}",
2960 (void*)tq, tq->_name, delta / 1us, tq->_vruntime, tq->_q.empty());
2961 tq->_ts = t_run_completed;
2962 if (!tq->_q.empty()) {
2963 insert_active_task_queue(tq);
2964 } else {
2965 tq->_active = false;
2966 }
2967 } while (have_more_tasks() && !need_preempt());
2968 _cpu_stall_detector->end_task_run(t_run_completed);
2969 STAP_PROBE(seastar, reactor_run_tasks_end);
2970 *internal::current_scheduling_group_ptr() = default_scheduling_group(); // Prevent inheritance from last group run
2971 sched_print("run_some_tasks: end");
2972 }
2973
2974 void
2975 reactor::activate(task_queue& tq) {
2976 if (tq._active) {
2977 return;
2978 }
2979 sched_print("activating {} {}", (void*)&tq, tq._name);
2980 // If activate() was called, the task queue is likely network-bound or I/O bound, not CPU-bound. As
2981 // such its vruntime will be low, and it will have a large advantage over other task queues. Limit
2982 // the advantage so it doesn't dominate scheduling for a long time, in case it _does_ become CPU
2983 // bound later.
2984 //
2985 // FIXME: different scheduling groups have different sensitivity to jitter, take advantage
2986 if (_last_vruntime > tq._vruntime) {
2987 sched_print("tq {} {} losing vruntime {} due to sleep", (void*)&tq, tq._name, _last_vruntime - tq._vruntime);
2988 }
2989 tq._vruntime = std::max(_last_vruntime, tq._vruntime);
2990 auto now = reactor::now();
2991 tq._waittime += now - tq._ts;
2992 tq._ts = now;
2993 _activating_task_queues.push_back(&tq);
2994 }
2995
2996 void reactor::service_highres_timer() noexcept {
2997 complete_timers(_timers, _expired_timers, [this] () noexcept {
2998 if (!_timers.empty()) {
2999 enable_timer(_timers.get_next_timeout());
3000 }
3001 });
3002 }
3003
3004 int reactor::run() noexcept {
3005 try {
3006 return do_run();
3007 } catch (const std::exception& e) {
3008 seastar_logger.error(e.what());
3009 print_with_backtrace("exception running reactor main loop");
3010 _exit(1);
3011 }
3012 }
3013
3014 int reactor::do_run() {
3015 #ifndef SEASTAR_ASAN_ENABLED
3016 // SIGSTKSZ is too small when using asan. We also don't need to
3017 // handle SIGSEGV ourselves when using asan, so just don't install
3018 // a signal handler stack.
3019 auto signal_stack = install_signal_handler_stack();
3020 #else
3021 (void)install_signal_handler_stack;
3022 #endif
3023
3024 register_metrics();
3025
3026 // The order in which we execute the pollers is very important for performance.
3027 //
3028 // This is because events that are generated in one poller may feed work into others. If
3029 // they were reversed, we'd only be able to do that work in the next task quota.
3030 //
3031 // One example is the relationship between the smp poller and the I/O submission poller:
3032 // If the smp poller runs first, requests from remote I/O queues can be dispatched right away
3033 //
3034 // We will run the pollers in the following order:
3035 //
3036 // 1. SMP: any remote event arrives before anything else
3037 // 2. reap kernel events completion: storage related completions may free up space in the I/O
3038 // queue.
3039 // 4. I/O queue: must be after reap, to free up events. If new slots are freed may submit I/O
3040 // 5. kernel submission: for I/O, will submit what was generated from last step.
3041 // 6. reap kernel events completion: some of the submissions from last step may return immediately.
3042 // For example if we are dealing with poll() on a fd that has events.
3043 poller smp_poller(std::make_unique<smp_pollfn>(*this));
3044
3045 poller reap_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this));
3046 poller io_queue_submission_poller(std::make_unique<io_queue_submission_pollfn>(*this));
3047 poller kernel_submit_work_poller(std::make_unique<kernel_submit_work_pollfn>(*this));
3048 poller final_real_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this));
3049
3050 poller batch_flush_poller(std::make_unique<batch_flush_pollfn>(*this));
3051 poller execution_stage_poller(std::make_unique<execution_stage_pollfn>());
3052
3053 start_aio_eventfd_loop();
3054
3055 if (_id == 0 && _cfg.auto_handle_sigint_sigterm) {
3056 if (_handle_sigint) {
3057 _signals.handle_signal_once(SIGINT, [this] { stop(); });
3058 }
3059 _signals.handle_signal_once(SIGTERM, [this] { stop(); });
3060 }
3061
3062 // Start initialization in the background.
3063 // Communicate when done using _start_promise.
3064 (void)_cpu_started.wait(smp::count).then([this] {
3065 (void)_network_stack->initialize().then([this] {
3066 _start_promise.set_value();
3067 });
3068 });
3069 // Wait for network stack in the background and then signal all cpus.
3070 (void)_network_stack_ready->then([this] (std::unique_ptr<network_stack> stack) {
3071 _network_stack = std::move(stack);
3072 return smp::invoke_on_all([] {
3073 engine()._cpu_started.signal();
3074 });
3075 });
3076
3077 poller syscall_poller(std::make_unique<syscall_pollfn>(*this));
3078
3079 poller drain_cross_cpu_freelist(std::make_unique<drain_cross_cpu_freelist_pollfn>());
3080
3081 poller expire_lowres_timers(std::make_unique<lowres_timer_pollfn>(*this));
3082 poller sig_poller(std::make_unique<signal_pollfn>(*this));
3083
3084 using namespace std::chrono_literals;
3085 timer<lowres_clock> load_timer;
3086 auto last_idle = _total_idle;
3087 auto idle_start = now(), idle_end = idle_start;
3088 load_timer.set_callback([this, &last_idle, &idle_start, &idle_end] () mutable {
3089 _total_idle += idle_end - idle_start;
3090 auto load = double((_total_idle - last_idle).count()) / double(std::chrono::duration_cast<sched_clock::duration>(1s).count());
3091 last_idle = _total_idle;
3092 load = std::min(load, 1.0);
3093 idle_start = idle_end;
3094 _loads.push_front(load);
3095 if (_loads.size() > 5) {
3096 auto drop = _loads.back();
3097 _loads.pop_back();
3098 _load -= (drop/5);
3099 }
3100 _load += (load/5);
3101 });
3102 load_timer.arm_periodic(1s);
3103
3104 itimerspec its = seastar::posix::to_relative_itimerspec(_task_quota, _task_quota);
3105 _task_quota_timer.timerfd_settime(0, its);
3106 auto& task_quote_itimerspec = its;
3107
3108 struct sigaction sa_block_notifier = {};
3109 sa_block_notifier.sa_handler = &reactor::block_notifier;
3110 sa_block_notifier.sa_flags = SA_RESTART;
3111 auto r = sigaction(internal::cpu_stall_detector::signal_number(), &sa_block_notifier, nullptr);
3112 assert(r == 0);
3113
3114 bool idle = false;
3115
3116 std::function<bool()> check_for_work = [this] () {
3117 return poll_once() || have_more_tasks();
3118 };
3119 std::function<bool()> pure_check_for_work = [this] () {
3120 return pure_poll_once() || have_more_tasks();
3121 };
3122 while (true) {
3123 run_some_tasks();
3124 if (_stopped) {
3125 load_timer.cancel();
3126 // Final tasks may include sending the last response to cpu 0, so run them
3127 while (have_more_tasks()) {
3128 run_some_tasks();
3129 }
3130 while (!_at_destroy_tasks->_q.empty()) {
3131 run_tasks(*_at_destroy_tasks);
3132 }
3133 _finished_running_tasks = true;
3134 _smp->arrive_at_event_loop_end();
3135 if (_id == 0) {
3136 _smp->join_all();
3137 }
3138 break;
3139 }
3140
3141 _polls++;
3142
3143 lowres_clock::update(); // Don't delay expiring lowres timers
3144 if (check_for_work()) {
3145 if (idle) {
3146 _total_idle += idle_end - idle_start;
3147 account_idle(idle_end - idle_start);
3148 idle_start = idle_end;
3149 idle = false;
3150 }
3151 } else {
3152 idle_end = now();
3153 if (!idle) {
3154 idle_start = idle_end;
3155 idle = true;
3156 }
3157 bool go_to_sleep = true;
3158 try {
3159 // we can't run check_for_work(), because that can run tasks in the context
3160 // of the idle handler which change its state, without the idle handler expecting
3161 // it. So run pure_check_for_work() instead.
3162 auto handler_result = _idle_cpu_handler(pure_check_for_work);
3163 go_to_sleep = handler_result == idle_cpu_handler_result::no_more_work;
3164 } catch (...) {
3165 report_exception("Exception while running idle cpu handler", std::current_exception());
3166 }
3167 if (go_to_sleep) {
3168 internal::cpu_relax();
3169 if (idle_end - idle_start > _max_poll_time) {
3170 // Turn off the task quota timer to avoid spurious wakeups
3171 struct itimerspec zero_itimerspec = {};
3172 _task_quota_timer.timerfd_settime(0, zero_itimerspec);
3173 auto start_sleep = now();
3174 _cpu_stall_detector->start_sleep();
3175 sleep();
3176 _cpu_stall_detector->end_sleep();
3177 // We may have slept for a while, so freshen idle_end
3178 idle_end = now();
3179 _total_sleep += idle_end - start_sleep;
3180 _task_quota_timer.timerfd_settime(0, task_quote_itimerspec);
3181 }
3182 } else {
3183 // We previously ran pure_check_for_work(), might not actually have performed
3184 // any work.
3185 check_for_work();
3186 }
3187 }
3188 }
3189 // To prevent ordering issues from rising, destroy the I/O queue explicitly at this point.
3190 // This is needed because the reactor is destroyed from the thread_local destructors. If
3191 // the I/O queue happens to use any other infrastructure that is also kept this way (for
3192 // instance, collectd), we will not have any way to guarantee who is destroyed first.
3193 _io_queues.clear();
3194 return _return;
3195 }
3196
3197 void
3198 reactor::sleep() {
3199 for (auto i = _pollers.begin(); i != _pollers.end(); ++i) {
3200 auto ok = (*i)->try_enter_interrupt_mode();
3201 if (!ok) {
3202 while (i != _pollers.begin()) {
3203 (*--i)->exit_interrupt_mode();
3204 }
3205 return;
3206 }
3207 }
3208
3209 _backend->wait_and_process_events(&_active_sigmask);
3210
3211 for (auto i = _pollers.rbegin(); i != _pollers.rend(); ++i) {
3212 (*i)->exit_interrupt_mode();
3213 }
3214 }
3215
3216 bool
3217 reactor::poll_once() {
3218 bool work = false;
3219 for (auto c : _pollers) {
3220 work |= c->poll();
3221 }
3222
3223 return work;
3224 }
3225
3226 bool
3227 reactor::pure_poll_once() {
3228 for (auto c : _pollers) {
3229 if (c->pure_poll()) {
3230 return true;
3231 }
3232 }
3233 return false;
3234 }
3235
3236 namespace internal {
3237
3238 class poller::registration_task final : public task {
3239 private:
3240 poller* _p;
3241 public:
3242 explicit registration_task(poller* p) : _p(p) {}
3243 virtual void run_and_dispose() noexcept override {
3244 if (_p) {
3245 engine().register_poller(_p->_pollfn.get());
3246 _p->_registration_task = nullptr;
3247 }
3248 delete this;
3249 }
3250 task* waiting_task() noexcept override { return nullptr; }
3251 void cancel() {
3252 _p = nullptr;
3253 }
3254 void moved(poller* p) {
3255 _p = p;
3256 }
3257 };
3258
3259 class poller::deregistration_task final : public task {
3260 private:
3261 std::unique_ptr<pollfn> _p;
3262 public:
3263 explicit deregistration_task(std::unique_ptr<pollfn>&& p) : _p(std::move(p)) {}
3264 virtual void run_and_dispose() noexcept override {
3265 engine().unregister_poller(_p.get());
3266 delete this;
3267 }
3268 task* waiting_task() noexcept override { return nullptr; }
3269 };
3270
3271 }
3272
3273 void reactor::register_poller(pollfn* p) {
3274 _pollers.push_back(p);
3275 }
3276
3277 void reactor::unregister_poller(pollfn* p) {
3278 _pollers.erase(std::find(_pollers.begin(), _pollers.end(), p));
3279 }
3280
3281 void reactor::replace_poller(pollfn* old, pollfn* neww) {
3282 std::replace(_pollers.begin(), _pollers.end(), old, neww);
3283 }
3284
3285 namespace internal {
3286
3287 poller::poller(poller&& x) noexcept
3288 : _pollfn(std::move(x._pollfn)), _registration_task(std::exchange(x._registration_task, nullptr)) {
3289 if (_pollfn && _registration_task) {
3290 _registration_task->moved(this);
3291 }
3292 }
3293
3294 poller&
3295 poller::operator=(poller&& x) noexcept {
3296 if (this != &x) {
3297 this->~poller();
3298 new (this) poller(std::move(x));
3299 }
3300 return *this;
3301 }
3302
3303 void
3304 poller::do_register() noexcept {
3305 // We can't just insert a poller into reactor::_pollers, because we
3306 // may be running inside a poller ourselves, and so in the middle of
3307 // iterating reactor::_pollers itself. So we schedule a task to add
3308 // the poller instead.
3309 auto task = new registration_task(this);
3310 engine().add_task(task);
3311 _registration_task = task;
3312 }
3313
3314 poller::~poller() {
3315 // We can't just remove the poller from reactor::_pollers, because we
3316 // may be running inside a poller ourselves, and so in the middle of
3317 // iterating reactor::_pollers itself. So we schedule a task to remove
3318 // the poller instead.
3319 //
3320 // Since we don't want to call the poller after we exit the destructor,
3321 // we replace it atomically with another one, and schedule a task to
3322 // delete the replacement.
3323 if (_pollfn) {
3324 if (_registration_task) {
3325 // not added yet, so don't do it at all.
3326 _registration_task->cancel();
3327 delete _registration_task;
3328 } else if (!engine()._finished_running_tasks) {
3329 // If _finished_running_tasks, the call to add_task() below will just
3330 // leak it, since no one will call task::run_and_dispose(). Just leave
3331 // the poller there, the reactor will never use it.
3332 auto dummy = make_pollfn([] { return false; });
3333 auto dummy_p = dummy.get();
3334 auto task = new deregistration_task(std::move(dummy));
3335 engine().add_task(task);
3336 engine().replace_poller(_pollfn.get(), dummy_p);
3337 }
3338 }
3339 }
3340
3341 }
3342
3343 syscall_work_queue::syscall_work_queue()
3344 : _pending()
3345 , _completed()
3346 , _start_eventfd(0) {
3347 }
3348
3349 void syscall_work_queue::submit_item(std::unique_ptr<syscall_work_queue::work_item> item) {
3350 (void)_queue_has_room.wait().then_wrapped([this, item = std::move(item)] (future<> f) mutable {
3351 // propagate wait failure via work_item
3352 if (f.failed()) {
3353 item->set_exception(f.get_exception());
3354 return;
3355 }
3356 _pending.push(item.release());
3357 _start_eventfd.signal(1);
3358 });
3359 }
3360
3361 unsigned syscall_work_queue::complete() {
3362 std::array<work_item*, queue_length> tmp_buf;
3363 auto end = tmp_buf.data();
3364 auto nr = _completed.consume_all([&] (work_item* wi) {
3365 *end++ = wi;
3366 });
3367 for (auto p = tmp_buf.data(); p != end; ++p) {
3368 auto wi = *p;
3369 wi->complete();
3370 delete wi;
3371 }
3372 _queue_has_room.signal(nr);
3373 return nr;
3374 }
3375
3376
3377 smp_message_queue::smp_message_queue(reactor* from, reactor* to)
3378 : _pending(to)
3379 , _completed(from)
3380 {
3381 }
3382
3383 smp_message_queue::~smp_message_queue()
3384 {
3385 if (_pending.remote != _completed.remote) {
3386 _tx.a.~aa();
3387 }
3388 }
3389
3390 void smp_message_queue::stop() {
3391 _metrics.clear();
3392 }
3393
3394 void smp_message_queue::move_pending() {
3395 auto begin = _tx.a.pending_fifo.cbegin();
3396 auto end = _tx.a.pending_fifo.cend();
3397 end = _pending.push(begin, end);
3398 if (begin == end) {
3399 return;
3400 }
3401 auto nr = end - begin;
3402 _pending.maybe_wakeup();
3403 _tx.a.pending_fifo.erase(begin, end);
3404 _current_queue_length += nr;
3405 _last_snt_batch = nr;
3406 _sent += nr;
3407 }
3408
3409 bool smp_message_queue::pure_poll_tx() const {
3410 // can't use read_available(), not available on older boost
3411 // empty() is not const, so need const_cast.
3412 return !const_cast<lf_queue&>(_completed).empty();
3413 }
3414
3415 void smp_message_queue::submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<smp_message_queue::work_item> item) {
3416 // matching signal() in process_completions()
3417 auto ssg_id = internal::smp_service_group_id(item->ssg);
3418 auto& sem = get_smp_service_groups_semaphore(ssg_id, t);
3419 // Future indirectly forwarded to `item`.
3420 (void)get_units(sem, 1, timeout).then_wrapped([this, item = std::move(item)] (future<smp_service_group_semaphore_units> units_fut) mutable {
3421 if (units_fut.failed()) {
3422 item->fail_with(units_fut.get_exception());
3423 ++_compl;
3424 ++_last_cmpl_batch;
3425 return;
3426 }
3427 _tx.a.pending_fifo.push_back(item.get());
3428 // no exceptions from this point
3429 item.release();
3430 units_fut.get0().release();
3431 if (_tx.a.pending_fifo.size() >= batch_size) {
3432 move_pending();
3433 }
3434 });
3435 }
3436
3437 void smp_message_queue::respond(work_item* item) {
3438 _completed_fifo.push_back(item);
3439 if (_completed_fifo.size() >= batch_size || engine()._stopped) {
3440 flush_response_batch();
3441 }
3442 }
3443
3444 void smp_message_queue::flush_response_batch() {
3445 if (!_completed_fifo.empty()) {
3446 auto begin = _completed_fifo.cbegin();
3447 auto end = _completed_fifo.cend();
3448 end = _completed.push(begin, end);
3449 if (begin == end) {
3450 return;
3451 }
3452 _completed.maybe_wakeup();
3453 _completed_fifo.erase(begin, end);
3454 }
3455 }
3456
3457 bool smp_message_queue::has_unflushed_responses() const {
3458 return !_completed_fifo.empty();
3459 }
3460
3461 bool smp_message_queue::pure_poll_rx() const {
3462 // can't use read_available(), not available on older boost
3463 // empty() is not const, so need const_cast.
3464 return !const_cast<lf_queue&>(_pending).empty();
3465 }
3466
3467 void
3468 smp_message_queue::lf_queue::maybe_wakeup() {
3469 // Called after lf_queue_base::push().
3470 //
3471 // This is read-after-write, which wants memory_order_seq_cst,
3472 // but we insert that barrier using systemwide_memory_barrier()
3473 // because seq_cst is so expensive.
3474 //
3475 // However, we do need a compiler barrier:
3476 std::atomic_signal_fence(std::memory_order_seq_cst);
3477 if (remote->_sleeping.load(std::memory_order_relaxed)) {
3478 // We are free to clear it, because we're sending a signal now
3479 remote->_sleeping.store(false, std::memory_order_relaxed);
3480 remote->wakeup();
3481 }
3482 }
3483
3484 smp_message_queue::lf_queue::~lf_queue() {
3485 consume_all([] (work_item* ptr) {
3486 delete ptr;
3487 });
3488 }
3489
3490
3491 template<size_t PrefetchCnt, typename Func>
3492 size_t smp_message_queue::process_queue(lf_queue& q, Func process) {
3493 // copy batch to local memory in order to minimize
3494 // time in which cross-cpu data is accessed
3495 work_item* items[queue_length + PrefetchCnt];
3496 work_item* wi;
3497 if (!q.pop(wi))
3498 return 0;
3499 // start prefetching first item before popping the rest to overlap memory
3500 // access with potential cache miss the second pop may cause
3501 prefetch<2>(wi);
3502 auto nr = q.pop(items);
3503 std::fill(std::begin(items) + nr, std::begin(items) + nr + PrefetchCnt, nr ? items[nr - 1] : wi);
3504 unsigned i = 0;
3505 do {
3506 prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + PrefetchCnt);
3507 process(wi);
3508 wi = items[i++];
3509 } while(i <= nr);
3510
3511 return nr + 1;
3512 }
3513
3514 size_t smp_message_queue::process_completions(shard_id t) {
3515 auto nr = process_queue<prefetch_cnt*2>(_completed, [t] (work_item* wi) {
3516 wi->complete();
3517 auto ssg_id = internal::smp_service_group_id(wi->ssg);
3518 get_smp_service_groups_semaphore(ssg_id, t).signal();
3519 delete wi;
3520 });
3521 _current_queue_length -= nr;
3522 _compl += nr;
3523 _last_cmpl_batch = nr;
3524
3525 return nr;
3526 }
3527
3528 void smp_message_queue::flush_request_batch() {
3529 if (!_tx.a.pending_fifo.empty()) {
3530 move_pending();
3531 }
3532 }
3533
3534 size_t smp_message_queue::process_incoming() {
3535 auto nr = process_queue<prefetch_cnt>(_pending, [] (work_item* wi) {
3536 wi->process();
3537 });
3538 _received += nr;
3539 _last_rcv_batch = nr;
3540 return nr;
3541 }
3542
3543 void smp_message_queue::start(unsigned cpuid) {
3544 _tx.init();
3545 namespace sm = seastar::metrics;
3546 char instance[10];
3547 std::snprintf(instance, sizeof(instance), "%u-%u", this_shard_id(), cpuid);
3548 _metrics.add_group("smp", {
3549 // queue_length value:GAUGE:0:U
3550 // Absolute value of num packets in last tx batch.
3551 sm::make_queue_length("send_batch_queue_length", _last_snt_batch, sm::description("Current send batch queue length"), {sm::shard_label(instance)})(sm::metric_disabled),
3552 sm::make_queue_length("receive_batch_queue_length", _last_rcv_batch, sm::description("Current receive batch queue length"), {sm::shard_label(instance)})(sm::metric_disabled),
3553 sm::make_queue_length("complete_batch_queue_length", _last_cmpl_batch, sm::description("Current complete batch queue length"), {sm::shard_label(instance)})(sm::metric_disabled),
3554 sm::make_queue_length("send_queue_length", _current_queue_length, sm::description("Current send queue length"), {sm::shard_label(instance)})(sm::metric_disabled),
3555 // total_operations value:DERIVE:0:U
3556 sm::make_counter("total_received_messages", _received, sm::description("Total number of received messages"), {sm::shard_label(instance)})(sm::metric_disabled),
3557 // total_operations value:DERIVE:0:U
3558 sm::make_counter("total_sent_messages", _sent, sm::description("Total number of sent messages"), {sm::shard_label(instance)})(sm::metric_disabled),
3559 // total_operations value:DERIVE:0:U
3560 sm::make_counter("total_completed_messages", _compl, sm::description("Total number of messages completed"), {sm::shard_label(instance)})(sm::metric_disabled)
3561 });
3562 }
3563
3564 readable_eventfd writeable_eventfd::read_side() {
3565 return readable_eventfd(_fd.dup());
3566 }
3567
3568 file_desc writeable_eventfd::try_create_eventfd(size_t initial) {
3569 assert(size_t(int(initial)) == initial);
3570 return file_desc::eventfd(initial, EFD_CLOEXEC);
3571 }
3572
3573 void writeable_eventfd::signal(size_t count) {
3574 uint64_t c = count;
3575 auto r = _fd.write(&c, sizeof(c));
3576 assert(r == sizeof(c));
3577 }
3578
3579 writeable_eventfd readable_eventfd::write_side() {
3580 return writeable_eventfd(_fd.get_file_desc().dup());
3581 }
3582
3583 file_desc readable_eventfd::try_create_eventfd(size_t initial) {
3584 assert(size_t(int(initial)) == initial);
3585 return file_desc::eventfd(initial, EFD_CLOEXEC | EFD_NONBLOCK);
3586 }
3587
3588 future<size_t> readable_eventfd::wait() {
3589 return engine().readable(*_fd._s).then([this] {
3590 uint64_t count;
3591 int r = ::read(_fd.get_fd(), &count, sizeof(count));
3592 assert(r == sizeof(count));
3593 return make_ready_future<size_t>(count);
3594 });
3595 }
3596
3597 void schedule(task* t) noexcept {
3598 engine().add_task(t);
3599 }
3600
3601 void schedule_urgent(task* t) noexcept {
3602 engine().add_urgent_task(t);
3603 }
3604
3605 }
3606
3607 bool operator==(const ::sockaddr_in a, const ::sockaddr_in b) {
3608 return (a.sin_addr.s_addr == b.sin_addr.s_addr) && (a.sin_port == b.sin_port);
3609 }
3610
3611 namespace seastar {
3612
3613 static bool kernel_supports_aio_fsync() {
3614 return internal::kernel_uname().whitelisted({"4.18"});
3615 }
3616
3617 static program_options::selection_value<network_stack_factory> create_network_stacks_option(reactor_options& zis) {
3618 using value_type = program_options::selection_value<network_stack_factory>;
3619 value_type::candidates candidates;
3620 std::vector<std::string> net_stack_names;
3621
3622 auto deleter = [] (network_stack_factory* p) { delete p; };
3623
3624 std::string default_stack;
3625 for (auto reg_func : {register_native_stack, register_posix_stack}) {
3626 auto s = reg_func();
3627 if (s.is_default) {
3628 default_stack = s.name;
3629 }
3630 candidates.push_back({s.name, {new network_stack_factory(std::move(s.factory)), deleter}, std::move(s.opts)});
3631 net_stack_names.emplace_back(s.name);
3632 }
3633
3634 return program_options::selection_value<network_stack_factory>(zis, "network-stack", std::move(candidates), default_stack,
3635 format("select network stack (valid values: {})", format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")));
3636 }
3637
3638 static program_options::selection_value<reactor_backend_selector>::candidates backend_selector_candidates() {
3639 using value_type = program_options::selection_value<reactor_backend_selector>;
3640 value_type::candidates candidates;
3641
3642 auto deleter = [] (reactor_backend_selector* p) { delete p; };
3643
3644 for (auto&& be : reactor_backend_selector::available()) {
3645 auto name = be.name();
3646 candidates.push_back({std::move(name), {new reactor_backend_selector(std::move(be)), deleter}, {}});
3647 }
3648 return candidates;
3649 }
3650
3651 reactor_options::reactor_options(program_options::option_group* parent_group)
3652 : program_options::option_group(parent_group, "Core options")
3653 , network_stack(create_network_stacks_option(*this))
3654 , poll_mode(*this, "poll-mode", "poll continuously (100% cpu use)")
3655 , idle_poll_time_us(*this, "idle-poll-time-us", reactor::calculate_poll_time() / 1us,
3656 "idle polling time in microseconds (reduce for overprovisioned environments or laptops)")
3657 , poll_aio(*this, "poll-aio", true,
3658 "busy-poll for disk I/O (reduces latency and increases throughput)")
3659 , task_quota_ms(*this, "task-quota-ms", 0.5, "Max time (ms) between polls")
3660 , io_latency_goal_ms(*this, "io-latency-goal-ms", {}, "Max time (ms) io operations must take (1.5 * task-quota-ms if not set)")
3661 , max_task_backlog(*this, "max-task-backlog", 1000, "Maximum number of task backlog to allow; above this we ignore I/O")
3662 , blocked_reactor_notify_ms(*this, "blocked-reactor-notify-ms", 25, "threshold in miliseconds over which the reactor is considered blocked if no progress is made")
3663 , blocked_reactor_reports_per_minute(*this, "blocked-reactor-reports-per-minute", 5, "Maximum number of backtraces reported by stall detector per minute")
3664 , blocked_reactor_report_format_oneline(*this, "blocked-reactor-report-format-oneline", true, "Print a simplified backtrace on a single line")
3665 , relaxed_dma(*this, "relaxed-dma", "allow using buffered I/O if DMA is not available (reduces performance)")
3666 , linux_aio_nowait(*this, "linux-aio-nowait", aio_nowait_supported,
3667 "use the Linux NOWAIT AIO feature, which reduces reactor stalls due to aio (autodetected)")
3668 , unsafe_bypass_fsync(*this, "unsafe-bypass-fsync", false, "Bypass fsync(), may result in data loss. Use for testing on consumer drives")
3669 , kernel_page_cache(*this, "kernel-page-cache", false,
3670 "Use the kernel page cache. This disables DMA (O_DIRECT)."
3671 " Useful for short-lived functional tests with a small data set.")
3672 , overprovisioned(*this, "overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0")
3673 , abort_on_seastar_bad_alloc(*this, "abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory")
3674 , force_aio_syscalls(*this, "force-aio-syscalls", false,
3675 "Force io_getevents(2) to issue a system call, instead of bypassing the kernel when possible."
3676 " This makes strace output more useful, but slows down the application")
3677 , dump_memory_diagnostics_on_alloc_failure_kind(*this, "dump-memory-diagnostics-on-alloc-failure-kind", memory::alloc_failure_kind::critical,
3678 "Dump diagnostics of the seastar allocator state on allocation failure."
3679 " Accepted values: none, critical (default), all. When set to critical, only allocations marked as critical will trigger diagnostics dump."
3680 " The diagnostics will be written to the seastar_memory logger, with error level."
3681 " Note that if the seastar_memory logger is set to debug or trace level, the diagnostics will be logged irrespective of this setting.")
3682 , reactor_backend(*this, "reactor-backend", backend_selector_candidates(), reactor_backend_selector::default_backend().name(),
3683 format("Internal reactor implementation ({})", reactor_backend_selector::available()))
3684 , aio_fsync(*this, "aio-fsync", kernel_supports_aio_fsync(),
3685 "Use Linux aio for fsync() calls. This reduces latency; requires Linux 4.18 or later.")
3686 , max_networking_io_control_blocks(*this, "max-networking-io-control-blocks", 10000,
3687 "Maximum number of I/O control blocks (IOCBs) to allocate per shard. This translates to the number of sockets supported per shard."
3688 " Requires tuning /proc/sys/fs/aio-max-nr. Only valid for the linux-aio reactor backend (see --reactor-backend).")
3689 #ifdef SEASTAR_HEAPPROF
3690 , heapprof(*this, "heapprof", "enable seastar heap profiling")
3691 #else
3692 , heapprof(*this, "heapprof", program_options::unused{})
3693 #endif
3694 , no_handle_interrupt(*this, "no-handle-interrupt", "ignore SIGINT (for gdb)")
3695 {
3696 }
3697
3698 smp_options::smp_options(program_options::option_group* parent_group)
3699 : program_options::option_group(parent_group, "SMP options")
3700 , smp(*this, "smp", {}, "number of threads (default: one per CPU)")
3701 , cpuset(*this, "cpuset", {}, "CPUs to use (in cpuset(7) format; default: all))")
3702 , memory(*this, "memory", std::nullopt, "memory to use, in bytes (ex: 4G) (default: all)")
3703 , reserve_memory(*this, "reserve-memory", {}, "memory reserved to OS (if --memory not specified)")
3704 , hugepages(*this, "hugepages", {}, "path to accessible hugetlbfs mount (typically /dev/hugepages/something)")
3705 , lock_memory(*this, "lock-memory", {}, "lock all memory (prevents swapping)")
3706 , thread_affinity(*this, "thread-affinity", true, "pin threads to their cpus (disable for overprovisioning)")
3707 #ifdef SEASTAR_HAVE_HWLOC
3708 , num_io_queues(*this, "num-io-queues", {}, "Number of IO queues. Each IO unit will be responsible for a fraction of the IO requests. Defaults to the number of threads")
3709 , num_io_groups(*this, "num-io-groups", {}, "Number of IO groups. Each IO group will be responsible for a fraction of the IO requests. Defaults to the number of NUMA nodes")
3710 #else
3711 , num_io_queues(*this, "num-io-queues", program_options::unused{})
3712 , num_io_groups(*this, "num-io-groups", program_options::unused{})
3713 #endif
3714 , io_properties_file(*this, "io-properties-file", {}, "path to a YAML file describing the characteristics of the I/O Subsystem")
3715 , io_properties(*this, "io-properties", {}, "a YAML string describing the characteristics of the I/O Subsystem")
3716 , mbind(*this, "mbind", true, "enable mbind")
3717 #ifndef SEASTAR_NO_EXCEPTION_HACK
3718 , enable_glibc_exception_scaling_workaround(*this, "enable-glibc-exception-scaling-workaround", true, "enable workaround for glibc/gcc c++ exception scalablity problem")
3719 #else
3720 , enable_glibc_exception_scaling_workaround(*this, "enable-glibc-exception-scaling-workaround", program_options::unused{})
3721 #endif
3722 #ifdef SEASTAR_HAVE_HWLOC
3723 , allow_cpus_in_remote_numa_nodes(*this, "allow-cpus-in-remote-numa-nodes", true, "if some CPUs are found not to have any local NUMA nodes, allow assigning them to remote ones")
3724 #else
3725 , allow_cpus_in_remote_numa_nodes(*this, "allow-cpus-in-remote-numa-nodes", program_options::unused{})
3726 #endif
3727 {
3728 }
3729
3730 thread_local scollectd::impl scollectd_impl;
3731
3732 scollectd::impl & scollectd::get_impl() {
3733 return scollectd_impl;
3734 }
3735
3736 struct reactor_deleter {
3737 void operator()(reactor* p) {
3738 p->~reactor();
3739 free(p);
3740 }
3741 };
3742
3743 thread_local std::unique_ptr<reactor, reactor_deleter> reactor_holder;
3744
3745 thread_local smp_message_queue** smp::_qs;
3746 thread_local std::thread::id smp::_tmain;
3747 unsigned smp::count = 0;
3748
3749 void smp::start_all_queues()
3750 {
3751 for (unsigned c = 0; c < count; c++) {
3752 if (c != this_shard_id()) {
3753 _qs[c][this_shard_id()].start(c);
3754 }
3755 }
3756 _alien._qs[this_shard_id()].start();
3757 }
3758
3759 #ifdef SEASTAR_HAVE_DPDK
3760
3761 int dpdk_thread_adaptor(void* f)
3762 {
3763 (*static_cast<std::function<void ()>*>(f))();
3764 return 0;
3765 }
3766
3767 #endif
3768
3769 void smp::join_all()
3770 {
3771 #ifdef SEASTAR_HAVE_DPDK
3772 if (_using_dpdk) {
3773 rte_eal_mp_wait_lcore();
3774 return;
3775 }
3776 #endif
3777 for (auto&& t: smp::_threads) {
3778 t.join();
3779 }
3780 }
3781
3782 void smp::pin(unsigned cpu_id) {
3783 if (_using_dpdk) {
3784 // dpdk does its own pinning
3785 return;
3786 }
3787 pin_this_thread(cpu_id);
3788 }
3789
3790 void smp::arrive_at_event_loop_end() {
3791 if (_all_event_loops_done) {
3792 _all_event_loops_done->wait();
3793 }
3794 }
3795
3796 void smp::allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg) {
3797 assert(!reactor_holder);
3798
3799 // we cannot just write "local_engin = new reactor" since reactor's constructor
3800 // uses local_engine
3801 void *buf;
3802 int r = posix_memalign(&buf, cache_line_size, sizeof(reactor));
3803 assert(r == 0);
3804 local_engine = reinterpret_cast<reactor*>(buf);
3805 *internal::this_shard_id_ptr() = id;
3806 new (buf) reactor(this->shared_from_this(), _alien, id, std::move(rbs), cfg);
3807 reactor_holder.reset(local_engine);
3808 }
3809
3810 void smp::cleanup() noexcept {
3811 smp::_threads = std::vector<posix_thread>();
3812 _thread_loops.clear();
3813 }
3814
3815 void smp::cleanup_cpu() {
3816 size_t cpuid = this_shard_id();
3817
3818 if (_qs) {
3819 for(unsigned i = 0; i < smp::count; i++) {
3820 _qs[i][cpuid].stop();
3821 }
3822 }
3823 if (_alien._qs) {
3824 _alien._qs[cpuid].stop();
3825 }
3826 }
3827
3828 void smp::create_thread(std::function<void ()> thread_loop) {
3829 if (_using_dpdk) {
3830 _thread_loops.push_back(std::move(thread_loop));
3831 } else {
3832 _threads.emplace_back(std::move(thread_loop));
3833 }
3834 }
3835
3836 // Installs handler for Signal which ensures that Func is invoked only once
3837 // in the whole program and that after it is invoked the default handler is restored.
3838 template<int Signal, void(*Func)()>
3839 void install_oneshot_signal_handler() {
3840 static bool handled = false;
3841 static util::spinlock lock;
3842
3843 struct sigaction sa;
3844 sa.sa_sigaction = [](int sig, siginfo_t *info, void *p) {
3845 std::lock_guard<util::spinlock> g(lock);
3846 if (!handled) {
3847 handled = true;
3848 Func();
3849 signal(sig, SIG_DFL);
3850 }
3851 };
3852 sigfillset(&sa.sa_mask);
3853 sa.sa_flags = SA_SIGINFO | SA_RESTART;
3854 if (Signal == SIGSEGV) {
3855 sa.sa_flags |= SA_ONSTACK;
3856 }
3857 auto r = ::sigaction(Signal, &sa, nullptr);
3858 throw_system_error_on(r == -1);
3859 }
3860
3861 static void reraise_signal(int signo) {
3862 signal(signo, SIG_DFL);
3863 pthread_kill(pthread_self(), signo);
3864 }
3865
3866 static void sigsegv_action() noexcept {
3867 print_with_backtrace("Segmentation fault");
3868 reraise_signal(SIGSEGV);
3869 }
3870
3871 static void sigabrt_action() noexcept {
3872 print_with_backtrace("Aborting");
3873 reraise_signal(SIGABRT);
3874 }
3875
3876 void smp::qs_deleter::operator()(smp_message_queue** qs) const {
3877 for (unsigned i = 0; i < smp::count; i++) {
3878 for (unsigned j = 0; j < smp::count; j++) {
3879 qs[i][j].~smp_message_queue();
3880 }
3881 ::operator delete[](qs[i]);
3882 }
3883 delete[](qs);
3884 }
3885
3886 class disk_config_params {
3887 private:
3888 unsigned _num_io_groups = 0;
3889 std::unordered_map<dev_t, mountpoint_params> _mountpoints;
3890 std::chrono::duration<double> _latency_goal;
3891
3892 public:
3893 uint64_t per_io_group(uint64_t qty, unsigned nr_groups) const noexcept {
3894 return std::max(qty / nr_groups, 1ul);
3895 }
3896
3897 unsigned num_io_groups() const noexcept { return _num_io_groups; }
3898
3899 std::chrono::duration<double> latency_goal() const {
3900 return _latency_goal;
3901 }
3902
3903 double latency_goal_opt(const reactor_options& opts) const {
3904 return opts.io_latency_goal_ms ?
3905 opts.io_latency_goal_ms.get_value() :
3906 opts.task_quota_ms.get_value() * 1.5;
3907 }
3908
3909 void parse_config(const smp_options& smp_opts, const reactor_options& reactor_opts) {
3910 seastar_logger.debug("smp::count: {}", smp::count);
3911 _latency_goal = std::chrono::duration_cast<std::chrono::duration<double>>(latency_goal_opt(reactor_opts) * 1ms);
3912 seastar_logger.debug("latency_goal: {}", latency_goal().count());
3913
3914 if (smp_opts.num_io_groups) {
3915 _num_io_groups = smp_opts.num_io_groups.get_value();
3916 if (!_num_io_groups) {
3917 throw std::runtime_error("num-io-groups must be greater than zero");
3918 }
3919 } else if (smp_opts.num_io_queues) {
3920 seastar_logger.warn("the --num-io-queues option is deprecated, switch to --num-io-groups instead");
3921 }
3922 if (smp_opts.io_properties_file && smp_opts.io_properties) {
3923 throw std::runtime_error("Both io-properties and io-properties-file specified. Don't know which to trust!");
3924 }
3925
3926 std::optional<YAML::Node> doc;
3927 if (smp_opts.io_properties_file) {
3928 doc = YAML::LoadFile(smp_opts.io_properties_file.get_value());
3929 } else if (smp_opts.io_properties) {
3930 doc = YAML::Load(smp_opts.io_properties.get_value());
3931 }
3932
3933 if (doc) {
3934 if (!doc->IsMap()) {
3935 throw std::runtime_error("Bogus io-properties (did you mix up --io-properties and --io-properties-file?)");
3936 }
3937 for (auto&& section : *doc) {
3938 auto sec_name = section.first.as<std::string>();
3939 if (sec_name != "disks") {
3940 throw std::runtime_error(fmt::format("While parsing I/O options: section {} currently unsupported.", sec_name));
3941 }
3942 auto disks = section.second.as<std::vector<mountpoint_params>>();
3943 for (auto& d : disks) {
3944 struct ::stat buf;
3945 auto ret = stat(d.mountpoint.c_str(), &buf);
3946 if (ret < 0) {
3947 throw std::runtime_error(fmt::format("Couldn't stat {}", d.mountpoint));
3948 }
3949
3950 auto st_dev = S_ISBLK(buf.st_mode) ? buf.st_rdev : buf.st_dev;
3951 if (_mountpoints.count(st_dev)) {
3952 throw std::runtime_error(fmt::format("Mountpoint {} already configured", d.mountpoint));
3953 }
3954 if (_mountpoints.size() >= reactor::max_queues) {
3955 throw std::runtime_error(fmt::format("Configured number of queues {} is larger than the maximum {}",
3956 _mountpoints.size(), reactor::max_queues));
3957 }
3958 if (d.read_bytes_rate == 0 || d.write_bytes_rate == 0 ||
3959 d.read_req_rate == 0 || d.write_req_rate == 0) {
3960 throw std::runtime_error(fmt::format("R/W bytes and req rates must not be zero"));
3961 }
3962
3963 seastar_logger.debug("dev_id: {} mountpoint: {}", st_dev, d.mountpoint);
3964 _mountpoints.emplace(st_dev, d);
3965 }
3966 }
3967 }
3968
3969 // Placeholder for unconfigured disks.
3970 mountpoint_params d = {};
3971 _mountpoints.emplace(0, d);
3972 }
3973
3974 struct io_queue::config generate_config(dev_t devid, unsigned nr_groups) const {
3975 seastar_logger.debug("generate_config dev_id: {}", devid);
3976 const mountpoint_params& p = _mountpoints.at(devid);
3977 struct io_queue::config cfg;
3978
3979 cfg.devid = devid;
3980
3981 if (p.read_bytes_rate != std::numeric_limits<uint64_t>::max()) {
3982 cfg.blocks_count_rate = (io_queue::read_request_base_count * (unsigned long)per_io_group(p.read_bytes_rate, nr_groups)) >> io_queue::block_size_shift;
3983 cfg.disk_blocks_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_bytes_rate) / p.write_bytes_rate;
3984 }
3985 if (p.read_req_rate != std::numeric_limits<uint64_t>::max()) {
3986 cfg.req_count_rate = io_queue::read_request_base_count * (unsigned long)per_io_group(p.read_req_rate, nr_groups);
3987 cfg.disk_req_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_req_rate) / p.write_req_rate;
3988 }
3989 if (p.read_saturation_length != std::numeric_limits<uint64_t>::max()) {
3990 cfg.disk_read_saturation_length = p.read_saturation_length;
3991 }
3992 if (p.write_saturation_length != std::numeric_limits<uint64_t>::max()) {
3993 cfg.disk_write_saturation_length = p.write_saturation_length;
3994 }
3995 cfg.mountpoint = p.mountpoint;
3996 cfg.duplex = p.duplex;
3997 cfg.rate_factor = p.rate_factor;
3998 cfg.rate_limit_duration = latency_goal();
3999 // Block count limit should not be less than the minimal IO size on the device
4000 // On the other hand, even this is not good enough -- in the worst case the
4001 // scheduler will self-tune to allow for the single 64k request, while it would
4002 // be better to sacrifice some IO latency, but allow for larger concurrency
4003 cfg.block_count_limit_min = (64 << 10) >> io_queue::block_size_shift;
4004
4005 return cfg;
4006 }
4007
4008 auto device_ids() {
4009 return boost::adaptors::keys(_mountpoints);
4010 }
4011 };
4012
4013 unsigned smp::adjust_max_networking_aio_io_control_blocks(unsigned network_iocbs)
4014 {
4015 static unsigned constexpr storage_iocbs = reactor::max_aio;
4016 static unsigned constexpr preempt_iocbs = 2;
4017
4018 auto aio_max_nr = read_first_line_as<unsigned>("/proc/sys/fs/aio-max-nr");
4019 auto aio_nr = read_first_line_as<unsigned>("/proc/sys/fs/aio-nr");
4020 auto available_aio = aio_max_nr - aio_nr;
4021 auto requested_aio_network = network_iocbs * smp::count;
4022 auto requested_aio_other = (storage_iocbs + preempt_iocbs) * smp::count;
4023 auto requested_aio = requested_aio_network + requested_aio_other;
4024 auto network_iocbs_old = network_iocbs;
4025
4026 if (available_aio < requested_aio) {
4027 seastar_logger.warn("Requested AIO slots too large, please increase request capacity in /proc/sys/fs/aio-max-nr. available:{} requested:{}", available_aio, requested_aio);
4028 if (available_aio >= requested_aio_other + smp::count) { // at least one queue for each shard
4029 network_iocbs = (available_aio - requested_aio_other) / smp::count;
4030 seastar_logger.warn("max-networking-io-control-blocks adjusted from {} to {}, since AIO slots are unavailable", network_iocbs_old, network_iocbs);
4031 } else {
4032 throw std::runtime_error("Could not setup Async I/O: Not enough request capacity in /proc/sys/fs/aio-max-nr. Try increasing that number or reducing the amount of logical CPUs available for your application");
4033 }
4034 }
4035
4036 return network_iocbs;
4037 }
4038
4039 void smp::configure(const smp_options& smp_opts, const reactor_options& reactor_opts)
4040 {
4041 #ifndef SEASTAR_NO_EXCEPTION_HACK
4042 if (smp_opts.enable_glibc_exception_scaling_workaround.get_value()) {
4043 init_phdr_cache();
4044 }
4045 #endif
4046
4047 // Mask most, to prevent threads (esp. dpdk helper threads)
4048 // from servicing a signal. Individual reactors will unmask signals
4049 // as they become prepared to handle them.
4050 //
4051 // We leave some signals unmasked since we don't handle them ourself.
4052 sigset_t sigs;
4053 sigfillset(&sigs);
4054 for (auto sig : {SIGHUP, SIGQUIT, SIGILL, SIGABRT, SIGFPE, SIGSEGV,
4055 SIGALRM, SIGCONT, SIGSTOP, SIGTSTP, SIGTTIN, SIGTTOU}) {
4056 sigdelset(&sigs, sig);
4057 }
4058 if (!reactor_opts._auto_handle_sigint_sigterm) {
4059 sigdelset(&sigs, SIGINT);
4060 sigdelset(&sigs, SIGTERM);
4061 }
4062 pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
4063
4064 #ifndef SEASTAR_ASAN_ENABLED
4065 // We don't need to handle SIGSEGV when asan is enabled.
4066 install_oneshot_signal_handler<SIGSEGV, sigsegv_action>();
4067 #else
4068 (void)sigsegv_action;
4069 #endif
4070 install_oneshot_signal_handler<SIGABRT, sigabrt_action>();
4071
4072 #ifdef SEASTAR_HAVE_DPDK
4073 const auto* native_stack = dynamic_cast<const net::native_stack_options*>(reactor_opts.network_stack.get_selected_candidate_opts());
4074 _using_dpdk = native_stack && native_stack->dpdk_pmd;
4075 #endif
4076 auto thread_affinity = smp_opts.thread_affinity.get_value();
4077 if (reactor_opts.overprovisioned
4078 && smp_opts.thread_affinity.defaulted()) {
4079 thread_affinity = false;
4080 }
4081 if (!thread_affinity && _using_dpdk) {
4082 fmt::print("warning: --thread-affinity 0 ignored in dpdk mode\n");
4083 }
4084 auto mbind = smp_opts.mbind.get_value();
4085 if (!thread_affinity) {
4086 mbind = false;
4087 }
4088
4089 resource::configuration rc;
4090
4091 smp::_tmain = std::this_thread::get_id();
4092 resource::cpuset cpu_set = get_current_cpuset();
4093
4094 if (smp_opts.cpuset) {
4095 auto opts_cpuset = smp_opts.cpuset.get_value();
4096 // CPUs that are not available are those pinned by
4097 // --cpuset but not present in current task set
4098 std::set<unsigned int> not_available_cpus;
4099 std::set_difference(opts_cpuset.begin(), opts_cpuset.end(),
4100 cpu_set.begin(), cpu_set.end(),
4101 std::inserter(not_available_cpus, not_available_cpus.end()));
4102
4103 if (!not_available_cpus.empty()) {
4104 std::ostringstream not_available_cpus_list;
4105 for (auto cpu_id : not_available_cpus) {
4106 not_available_cpus_list << " " << cpu_id;
4107 }
4108 seastar_logger.error("Bad value for --cpuset:{} not allowed. Shutting down.", not_available_cpus_list.str());
4109 exit(1);
4110 }
4111 cpu_set = opts_cpuset;
4112 }
4113
4114 if (smp_opts.smp) {
4115 smp::count = smp_opts.smp.get_value();
4116 } else {
4117 smp::count = cpu_set.size();
4118 }
4119 std::vector<reactor*> reactors(smp::count);
4120 if (smp_opts.memory) {
4121 rc.total_memory = parse_memory_size(smp_opts.memory.get_value());
4122 #ifdef SEASTAR_HAVE_DPDK
4123 if (smp_opts.hugepages &&
4124 !reactor_opts.network_stack.get_selected_candidate_name().compare("native") &&
4125 _using_dpdk) {
4126 size_t dpdk_memory = dpdk::eal::mem_size(smp::count);
4127
4128 if (dpdk_memory >= rc.total_memory) {
4129 std::cerr<<"Can't run with the given amount of memory: ";
4130 std::cerr<<smp_opts.memory.get_value();
4131 std::cerr<<". Consider giving more."<<std::endl;
4132 exit(1);
4133 }
4134
4135 //
4136 // Subtract the memory we are about to give to DPDK from the total
4137 // amount of memory we are allowed to use.
4138 //
4139 rc.total_memory.value() -= dpdk_memory;
4140 }
4141 #endif
4142 }
4143 if (smp_opts.reserve_memory) {
4144 rc.reserve_memory = parse_memory_size(smp_opts.reserve_memory.get_value());
4145 }
4146 rc.reserve_additional_memory = smp_opts.reserve_additional_memory;
4147 std::optional<std::string> hugepages_path;
4148 if (smp_opts.hugepages) {
4149 hugepages_path = smp_opts.hugepages.get_value();
4150 }
4151 auto mlock = false;
4152 if (smp_opts.lock_memory) {
4153 mlock = smp_opts.lock_memory.get_value();
4154 }
4155 if (mlock) {
4156 auto extra_flags = 0;
4157 #ifdef MCL_ONFAULT
4158 // Linux will serialize faulting in anonymous memory, and also
4159 // serialize marking them as locked. This can take many minutes on
4160 // terabyte class machines, so fault them in the future to spread
4161 // out the cost. This isn't good since we'll see contention if
4162 // multiple shards fault in memory at once, but if that work can be
4163 // in parallel to regular reactor work on other shards.
4164 extra_flags |= MCL_ONFAULT; // Linux 4.4+
4165 #endif
4166 auto r = mlockall(MCL_CURRENT | MCL_FUTURE | extra_flags);
4167 if (r) {
4168 // Don't hard fail for now, it's hard to get the configuration right
4169 fmt::print("warning: failed to mlockall: {}\n", strerror(errno));
4170 }
4171 }
4172
4173 rc.cpus = smp::count;
4174 rc.cpu_set = std::move(cpu_set);
4175
4176 disk_config_params disk_config;
4177 disk_config.parse_config(smp_opts, reactor_opts);
4178 for (auto& id : disk_config.device_ids()) {
4179 rc.devices.push_back(id);
4180 }
4181 rc.num_io_groups = disk_config.num_io_groups();
4182
4183 #ifdef SEASTAR_HAVE_HWLOC
4184 if (smp_opts.allow_cpus_in_remote_numa_nodes.get_value()) {
4185 rc.assign_orphan_cpus = true;
4186 }
4187 #endif
4188
4189 auto resources = resource::allocate(rc);
4190 logger::set_shard_field_width(std::ceil(std::log10(smp::count)));
4191 std::vector<resource::cpu> allocations = std::move(resources.cpus);
4192 if (thread_affinity) {
4193 smp::pin(allocations[0].cpu_id);
4194 }
4195 if (smp_opts.memory_allocator == memory_allocator::seastar) {
4196 memory::configure(allocations[0].mem, mbind, hugepages_path);
4197 }
4198
4199 if (reactor_opts.abort_on_seastar_bad_alloc) {
4200 memory::enable_abort_on_allocation_failure();
4201 }
4202
4203 if (reactor_opts.dump_memory_diagnostics_on_alloc_failure_kind) {
4204 memory::set_dump_memory_diagnostics_on_alloc_failure_kind(reactor_opts.dump_memory_diagnostics_on_alloc_failure_kind.get_value());
4205 }
4206
4207 reactor_config reactor_cfg;
4208 reactor_cfg.auto_handle_sigint_sigterm = reactor_opts._auto_handle_sigint_sigterm;
4209 reactor_cfg.max_networking_aio_io_control_blocks = adjust_max_networking_aio_io_control_blocks(reactor_opts.max_networking_io_control_blocks.get_value());
4210
4211 #ifdef SEASTAR_HEAPPROF
4212 bool heapprof_enabled = reactor_opts.heapprof;
4213 if (heapprof_enabled) {
4214 memory::set_heap_profiling_enabled(heapprof_enabled);
4215 }
4216 #else
4217 bool heapprof_enabled = false;
4218 #endif
4219
4220 #ifdef SEASTAR_HAVE_DPDK
4221 if (_using_dpdk) {
4222 dpdk::eal::cpuset cpus;
4223 for (auto&& a : allocations) {
4224 cpus[a.cpu_id] = true;
4225 }
4226 dpdk::eal::init(cpus, reactor_opts._argv0, hugepages_path, native_stack ? bool(native_stack->dpdk_pmd) : false);
4227 }
4228 #endif
4229
4230 // Better to put it into the smp class, but at smp construction time
4231 // correct smp::count is not known.
4232 boost::barrier reactors_registered(smp::count);
4233 boost::barrier smp_queues_constructed(smp::count);
4234 // We use shared_ptr since this thread can exit while other threads are still unlocking
4235 auto inited = std::make_shared<boost::barrier>(smp::count);
4236
4237 auto ioq_topology = std::move(resources.ioq_topology);
4238
4239 // ATTN: The ioq_topology value is referenced by below lambdas which are
4240 // then copied to other shard's threads, so each shard has a copy of the
4241 // ioq_topology on stack, but (!) still references and uses the value
4242 // from shard-0. This access is race-free because
4243 // 1. The .shard_to_group is not modified
4244 // 2. The .queues is pre-resize()-d in advance, so the vector itself
4245 // doesn't change; existing slots are accessed by owning shards only
4246 // without interference
4247 // 3. The .groups manipulations are guarded by the .lock lock (but it's
4248 // also pre-resize()-d in advance)
4249
4250 auto alloc_io_queues = [&ioq_topology, &disk_config] (shard_id shard) {
4251 for (auto& topo : ioq_topology) {
4252 auto& io_info = topo.second;
4253 auto group_idx = io_info.shard_to_group[shard];
4254 std::shared_ptr<io_group> group;
4255
4256 {
4257 std::lock_guard _(io_info.lock);
4258 auto& iog = io_info.groups[group_idx];
4259 if (!iog) {
4260 struct io_queue::config qcfg = disk_config.generate_config(topo.first, io_info.groups.size());
4261 iog = std::make_shared<io_group>(std::move(qcfg));
4262 seastar_logger.debug("allocate {} IO group", group_idx);
4263 }
4264 group = iog;
4265 }
4266
4267 io_info.queues[shard] = std::make_unique<io_queue>(std::move(group), engine()._io_sink);
4268 seastar_logger.debug("attached {} queue to {} IO group", shard, group_idx);
4269 }
4270 };
4271
4272 auto assign_io_queues = [&ioq_topology] (shard_id shard) {
4273 for (auto& topo : ioq_topology) {
4274 auto queue = std::move(topo.second.queues[shard]);
4275 assert(queue);
4276 engine()._io_queues.emplace(topo.first, std::move(queue));
4277
4278 auto num_io_groups = topo.second.groups.size();
4279 if (engine()._num_io_groups == 0) {
4280 engine()._num_io_groups = num_io_groups;
4281 } else if (engine()._num_io_groups != num_io_groups) {
4282 throw std::logic_error(format("Number of IO-groups mismatch, {} != {}", engine()._num_io_groups, num_io_groups));
4283 }
4284 }
4285 };
4286
4287 _all_event_loops_done.emplace(smp::count);
4288
4289 auto backend_selector = reactor_opts.reactor_backend.get_selected_candidate();
4290 seastar_logger.info("Reactor backend: {}", backend_selector);
4291
4292 unsigned i;
4293 auto smp_tmain = smp::_tmain;
4294 for (i = 1; i < smp::count; i++) {
4295 auto allocation = allocations[i];
4296 create_thread([this, smp_tmain, inited, &reactors_registered, &smp_queues_constructed, &smp_opts, &reactor_opts, &reactors, hugepages_path, i, allocation, assign_io_queues, alloc_io_queues, thread_affinity, heapprof_enabled, mbind, backend_selector, reactor_cfg] {
4297 try {
4298 // initialize thread_locals that are equal across all reacto threads of this smp instance
4299 smp::_tmain = smp_tmain;
4300 auto thread_name = seastar::format("reactor-{}", i);
4301 pthread_setname_np(pthread_self(), thread_name.c_str());
4302 if (thread_affinity) {
4303 smp::pin(allocation.cpu_id);
4304 }
4305 if (smp_opts.memory_allocator == memory_allocator::seastar) {
4306 memory::configure(allocation.mem, mbind, hugepages_path);
4307 }
4308 if (heapprof_enabled) {
4309 memory::set_heap_profiling_enabled(heapprof_enabled);
4310 }
4311 sigset_t mask;
4312 sigfillset(&mask);
4313 for (auto sig : { SIGSEGV }) {
4314 sigdelset(&mask, sig);
4315 }
4316 auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
4317 throw_pthread_error(r);
4318 init_default_smp_service_group(i);
4319 lowres_clock::update();
4320 allocate_reactor(i, backend_selector, reactor_cfg);
4321 reactors[i] = &engine();
4322 alloc_io_queues(i);
4323 reactors_registered.wait();
4324 smp_queues_constructed.wait();
4325 // _qs_owner is only initialized here
4326 _qs = _qs_owner.get();
4327 start_all_queues();
4328 assign_io_queues(i);
4329 inited->wait();
4330 engine().configure(reactor_opts);
4331 engine().do_run();
4332 } catch (const std::exception& e) {
4333 seastar_logger.error(e.what());
4334 _exit(1);
4335 }
4336 });
4337 }
4338
4339 init_default_smp_service_group(0);
4340 lowres_clock::update();
4341 try {
4342 allocate_reactor(0, backend_selector, reactor_cfg);
4343 } catch (const std::exception& e) {
4344 seastar_logger.error(e.what());
4345 _exit(1);
4346 }
4347
4348 reactors[0] = &engine();
4349 alloc_io_queues(0);
4350
4351 #ifdef SEASTAR_HAVE_DPDK
4352 if (_using_dpdk) {
4353 auto it = _thread_loops.begin();
4354 RTE_LCORE_FOREACH_SLAVE(i) {
4355 rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&*(it++)), i);
4356 }
4357 }
4358 #endif
4359
4360 reactors_registered.wait();
4361 _qs_owner = decltype(smp::_qs_owner){new smp_message_queue* [smp::count], qs_deleter{}};
4362 _qs = _qs_owner.get();
4363 for(unsigned i = 0; i < smp::count; i++) {
4364 smp::_qs_owner[i] = reinterpret_cast<smp_message_queue*>(operator new[] (sizeof(smp_message_queue) * smp::count));
4365 for (unsigned j = 0; j < smp::count; ++j) {
4366 new (&smp::_qs_owner[i][j]) smp_message_queue(reactors[j], reactors[i]);
4367 }
4368 }
4369 _alien._qs = alien::instance::create_qs(reactors);
4370 smp_queues_constructed.wait();
4371 start_all_queues();
4372 assign_io_queues(0);
4373 inited->wait();
4374
4375 engine().configure(reactor_opts);
4376 }
4377
4378 bool smp::poll_queues() {
4379 size_t got = 0;
4380 for (unsigned i = 0; i < count; i++) {
4381 if (this_shard_id() != i) {
4382 auto& rxq = _qs[this_shard_id()][i];
4383 rxq.flush_response_batch();
4384 got += rxq.has_unflushed_responses();
4385 got += rxq.process_incoming();
4386 auto& txq = _qs[i][this_shard_id()];
4387 txq.flush_request_batch();
4388 got += txq.process_completions(i);
4389 }
4390 }
4391 return got != 0;
4392 }
4393
4394 bool smp::pure_poll_queues() {
4395 for (unsigned i = 0; i < count; i++) {
4396 if (this_shard_id() != i) {
4397 auto& rxq = _qs[this_shard_id()][i];
4398 rxq.flush_response_batch();
4399 auto& txq = _qs[i][this_shard_id()];
4400 txq.flush_request_batch();
4401 if (rxq.pure_poll_rx() || txq.pure_poll_tx() || rxq.has_unflushed_responses()) {
4402 return true;
4403 }
4404 }
4405 }
4406 return false;
4407 }
4408
4409 __thread reactor* local_engine;
4410
4411 void report_exception(std::string_view message, std::exception_ptr eptr) noexcept {
4412 seastar_logger.error("{}: {}", message, eptr);
4413 }
4414
4415 future<> check_direct_io_support(std::string_view path) noexcept {
4416 struct w {
4417 sstring path;
4418 open_flags flags;
4419 std::function<future<>()> cleanup;
4420
4421 static w parse(sstring path, std::optional<directory_entry_type> type) {
4422 if (!type) {
4423 throw std::invalid_argument(format("Could not open file at {}. Make sure it exists", path));
4424 }
4425
4426 if (type == directory_entry_type::directory) {
4427 auto fpath = path + "/.o_direct_test";
4428 return w{fpath, open_flags::wo | open_flags::create | open_flags::truncate, [fpath] { return remove_file(fpath); }};
4429 } else if ((type == directory_entry_type::regular) || (type == directory_entry_type::link)) {
4430 return w{path, open_flags::ro, [] { return make_ready_future<>(); }};
4431 } else {
4432 throw std::invalid_argument(format("{} neither a directory nor file. Can't be opened with O_DIRECT", path));
4433 }
4434 };
4435 };
4436
4437 // Allocating memory for a sstring can throw, hence the futurize_invoke
4438 return futurize_invoke([path] {
4439 return engine().file_type(path).then([path = sstring(path)] (auto type) {
4440 auto w = w::parse(path, type);
4441 return open_file_dma(w.path, w.flags).then_wrapped([path = w.path, cleanup = std::move(w.cleanup)] (future<file> f) {
4442 try {
4443 auto fd = f.get0();
4444 return cleanup().finally([fd = std::move(fd)] () mutable {
4445 return fd.close();
4446 });
4447 } catch (std::system_error& e) {
4448 if (e.code() == std::error_code(EINVAL, std::system_category())) {
4449 report_exception(format("Could not open file at {}. Does your filesystem support O_DIRECT?", path), std::current_exception());
4450 }
4451 throw;
4452 }
4453 });
4454 });
4455 });
4456 }
4457
4458 server_socket listen(socket_address sa) {
4459 return engine().listen(sa);
4460 }
4461
4462 server_socket listen(socket_address sa, listen_options opts) {
4463 return engine().listen(sa, opts);
4464 }
4465
4466 future<connected_socket> connect(socket_address sa) {
4467 return engine().connect(sa);
4468 }
4469
4470 future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) {
4471 return engine().connect(sa, local, proto);
4472 }
4473
4474 socket make_socket() {
4475 return engine().net().socket();
4476 }
4477
4478 net::udp_channel make_udp_channel() {
4479 return engine().net().make_udp_channel();
4480 }
4481
4482 net::udp_channel make_udp_channel(const socket_address& local) {
4483 return engine().net().make_udp_channel(local);
4484 }
4485
4486 void reactor::add_high_priority_task(task* t) noexcept {
4487 add_urgent_task(t);
4488 // break .then() chains
4489 request_preemption();
4490 }
4491
4492
4493 void set_idle_cpu_handler(idle_cpu_handler&& handler) {
4494 engine().set_idle_cpu_handler(std::move(handler));
4495 }
4496
4497 namespace experimental {
4498 future<std::tuple<file_desc, file_desc>> make_pipe() {
4499 return engine().make_pipe();
4500 }
4501
4502 future<process> spawn_process(const std::filesystem::path& pathname,
4503 spawn_parameters params) {
4504 return process::spawn(pathname, std::move(params));
4505 }
4506
4507 future<process> spawn_process(const std::filesystem::path& pathname) {
4508 return process::spawn(pathname);
4509 }
4510 }
4511
4512 static
4513 bool
4514 virtualized() {
4515 return fs::exists("/sys/hypervisor/type");
4516 }
4517
4518 std::chrono::nanoseconds
4519 reactor::calculate_poll_time() {
4520 // In a non-virtualized environment, select a poll time
4521 // that is competitive with halt/unhalt.
4522 //
4523 // In a virutalized environment, IPIs are slow and dominate
4524 // sleep/wake (mprotect/tgkill), so increase poll time to reduce
4525 // so we don't sleep in a request/reply workload
4526 return virtualized() ? 2000us : 200us;
4527 }
4528
4529 future<>
4530 yield() noexcept {
4531 memory::scoped_critical_alloc_section _;
4532 auto tsk = make_task([] {});
4533 schedule(tsk);
4534 return tsk->get_future();
4535 }
4536
4537 future<> check_for_io_immediately() noexcept {
4538 memory::scoped_critical_alloc_section _;
4539 engine().force_poll();
4540 auto tsk = make_task(default_scheduling_group(), [] {});
4541 schedule(tsk);
4542 return tsk->get_future();
4543 }
4544
4545 future<> later() noexcept {
4546 return check_for_io_immediately();
4547 }
4548
4549 void add_to_flush_poller(output_stream<char>& os) noexcept {
4550 engine()._flush_batching.push_back(os);
4551 }
4552
4553 steady_clock_type::duration reactor::total_idle_time() {
4554 return _total_idle;
4555 }
4556
4557 steady_clock_type::duration reactor::total_busy_time() {
4558 return now() - _start_time - _total_idle;
4559 }
4560
4561 std::chrono::nanoseconds reactor::total_steal_time() {
4562 // Steal time: this mimics the concept some Hypervisors have about Steal time.
4563 // That is the time in which a VM has something to run, but is not running because some other
4564 // process (another VM or the hypervisor itself) is in control.
4565 //
4566 // For us, we notice that during the time in which we were not sleeping (either running or busy
4567 // polling while idle), we should be accumulating thread runtime. If we are not, that's because
4568 // someone stole it from us.
4569 //
4570 // Because this is totally in userspace we can miss some events. For instance, if the seastar
4571 // process is ready to run but the kernel hasn't scheduled us yet, that would be technically
4572 // steal time but we have no ways to account it.
4573 //
4574 // But what we have here should be good enough and at least has a well defined meaning.
4575 return std::chrono::duration_cast<std::chrono::nanoseconds>(now() - _start_time - _total_sleep) -
4576 std::chrono::duration_cast<std::chrono::nanoseconds>(thread_cputime_clock::now().time_since_epoch());
4577 }
4578
4579 static std::atomic<unsigned long> s_used_scheduling_group_ids_bitmap{3}; // 0=main, 1=atexit
4580 static std::atomic<unsigned long> s_next_scheduling_group_specific_key{0};
4581
4582 static
4583 int
4584 allocate_scheduling_group_id() noexcept {
4585 static_assert(max_scheduling_groups() <= std::numeric_limits<unsigned long>::digits, "more scheduling groups than available bits");
4586 auto b = s_used_scheduling_group_ids_bitmap.load(std::memory_order_relaxed);
4587 auto nb = b;
4588 unsigned i = 0;
4589 do {
4590 if (__builtin_popcountl(b) == max_scheduling_groups()) {
4591 return -1;
4592 }
4593 i = count_trailing_zeros(~b);
4594 nb = b | (1ul << i);
4595 } while (!s_used_scheduling_group_ids_bitmap.compare_exchange_weak(b, nb, std::memory_order_relaxed));
4596 return i;
4597 }
4598
4599 static
4600 unsigned long
4601 allocate_scheduling_group_specific_key() noexcept {
4602 return s_next_scheduling_group_specific_key.fetch_add(1, std::memory_order_relaxed);
4603 }
4604
4605 static
4606 void
4607 deallocate_scheduling_group_id(unsigned id) noexcept {
4608 s_used_scheduling_group_ids_bitmap.fetch_and(~(1ul << id), std::memory_order_relaxed);
4609 }
4610
4611 void
4612 reactor::allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key) {
4613 auto& sg_data = _scheduling_group_specific_data;
4614 auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
4615 this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key.id()+1));
4616 this_sg.specific_vals[key.id()] =
4617 aligned_alloc(sg_data.scheduling_group_key_configs[key.id()].alignment,
4618 sg_data.scheduling_group_key_configs[key.id()].allocation_size);
4619 if (!this_sg.specific_vals[key.id()]) {
4620 std::abort();
4621 }
4622 if (sg_data.scheduling_group_key_configs[key.id()].constructor) {
4623 sg_data.scheduling_group_key_configs[key.id()].constructor(this_sg.specific_vals[key.id()]);
4624 }
4625 }
4626
4627 future<>
4628 reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, float shares) {
4629 auto& sg_data = _scheduling_group_specific_data;
4630 auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
4631 this_sg.queue_is_initialized = true;
4632 _task_queues.resize(std::max<size_t>(_task_queues.size(), sg._id + 1));
4633 _task_queues[sg._id] = std::make_unique<task_queue>(sg._id, name, shares);
4634 unsigned long num_keys = s_next_scheduling_group_specific_key.load(std::memory_order_relaxed);
4635
4636 return with_scheduling_group(sg, [this, num_keys, sg] () {
4637 for (unsigned long key_id = 0; key_id < num_keys; key_id++) {
4638 allocate_scheduling_group_specific_data(sg, scheduling_group_key(key_id));
4639 }
4640 });
4641 }
4642
4643 future<>
4644 reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) {
4645 auto& sg_data = _scheduling_group_specific_data;
4646 sg_data.scheduling_group_key_configs.resize(std::max<size_t>(sg_data.scheduling_group_key_configs.size(), key.id() + 1));
4647 sg_data.scheduling_group_key_configs[key.id()] = cfg;
4648 return parallel_for_each(_task_queues, [this, cfg, key] (std::unique_ptr<task_queue>& tq) {
4649 if (tq) {
4650 scheduling_group sg = scheduling_group(tq->_id);
4651 return with_scheduling_group(sg, [this, key, sg] () {
4652 allocate_scheduling_group_specific_data(sg, key);
4653 });
4654 }
4655 return make_ready_future();
4656 });
4657 }
4658
4659 future<>
4660 reactor::destroy_scheduling_group(scheduling_group sg) noexcept {
4661 if (sg._id >= max_scheduling_groups()) {
4662 on_fatal_internal_error(seastar_logger, format("Invalid scheduling_group {}", sg._id));
4663 }
4664 return with_scheduling_group(sg, [this, sg] () {
4665 auto& sg_data = _scheduling_group_specific_data;
4666 auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
4667 for (unsigned long key_id = 0; key_id < sg_data.scheduling_group_key_configs.size(); key_id++) {
4668 void* val = this_sg.specific_vals[key_id];
4669 if (val) {
4670 if (sg_data.scheduling_group_key_configs[key_id].destructor) {
4671 sg_data.scheduling_group_key_configs[key_id].destructor(val);
4672 }
4673 free(val);
4674 this_sg.specific_vals[key_id] = nullptr;
4675 }
4676 }
4677 }).then( [this, sg] () {
4678 auto& sg_data = _scheduling_group_specific_data;
4679 auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
4680 this_sg.queue_is_initialized = false;
4681 _task_queues[sg._id].reset();
4682 });
4683
4684 }
4685
4686 void
4687 internal::no_such_scheduling_group(scheduling_group sg) {
4688 throw std::invalid_argument(format("The scheduling group does not exist ({})", internal::scheduling_group_index(sg)));
4689 }
4690
4691 const sstring&
4692 scheduling_group::name() const noexcept {
4693 return engine()._task_queues[_id]->_name;
4694 }
4695
4696 void
4697 scheduling_group::set_shares(float shares) noexcept {
4698 engine()._task_queues[_id]->set_shares(shares);
4699 }
4700
4701 future<scheduling_group>
4702 create_scheduling_group(sstring name, float shares) noexcept {
4703 auto aid = allocate_scheduling_group_id();
4704 if (aid < 0) {
4705 return make_exception_future<scheduling_group>(std::runtime_error(fmt::format("Scheduling group limit exceeded while creating {}", name)));
4706 }
4707 auto id = static_cast<unsigned>(aid);
4708 assert(id < max_scheduling_groups());
4709 auto sg = scheduling_group(id);
4710 return smp::invoke_on_all([sg, name, shares] {
4711 return engine().init_scheduling_group(sg, name, shares);
4712 }).then([sg] {
4713 return make_ready_future<scheduling_group>(sg);
4714 });
4715 }
4716
4717 future<scheduling_group_key>
4718 scheduling_group_key_create(scheduling_group_key_config cfg) noexcept {
4719 scheduling_group_key key = allocate_scheduling_group_specific_key();
4720 return smp::invoke_on_all([key, cfg] {
4721 return engine().init_new_scheduling_group_key(key, cfg);
4722 }).then([key] {
4723 return make_ready_future<scheduling_group_key>(key);
4724 });
4725 }
4726
4727 future<>
4728 rename_priority_class(io_priority_class pc, sstring new_name) {
4729 return pc.rename(std::move(new_name));
4730 }
4731
4732 future<>
4733 destroy_scheduling_group(scheduling_group sg) noexcept {
4734 if (sg == default_scheduling_group()) {
4735 return make_exception_future<>(make_backtraced_exception_ptr<std::runtime_error>("Attempt to destroy the default scheduling group"));
4736 }
4737 if (sg == current_scheduling_group()) {
4738 return make_exception_future<>(make_backtraced_exception_ptr<std::runtime_error>("Attempt to destroy the current scheduling group"));
4739 }
4740 return smp::invoke_on_all([sg] {
4741 return engine().destroy_scheduling_group(sg);
4742 }).then([sg] {
4743 deallocate_scheduling_group_id(sg._id);
4744 });
4745 }
4746
4747 future<>
4748 rename_scheduling_group(scheduling_group sg, sstring new_name) noexcept {
4749 if (sg == default_scheduling_group()) {
4750 return make_exception_future<>(make_backtraced_exception_ptr<std::runtime_error>("Attempt to rename the default scheduling group"));
4751 }
4752 return smp::invoke_on_all([sg, new_name] {
4753 engine()._task_queues[sg._id]->rename(new_name);
4754 });
4755 }
4756
4757 namespace internal {
4758
4759 inline
4760 sched_clock::duration
4761 timeval_to_duration(::timeval tv) {
4762 return std::chrono::seconds(tv.tv_sec) + std::chrono::microseconds(tv.tv_usec);
4763 }
4764
4765 class reactor_stall_sampler : public reactor::pollfn {
4766 sched_clock::time_point _run_start;
4767 ::rusage _run_start_rusage;
4768 uint64_t _kernel_stalls = 0;
4769 sched_clock::duration _nonsleep_cpu_time = {};
4770 sched_clock::duration _nonsleep_wall_time = {};
4771 private:
4772 static ::rusage get_rusage() {
4773 struct ::rusage ru;
4774 ::getrusage(RUSAGE_THREAD, &ru);
4775 return ru;
4776 }
4777 static sched_clock::duration cpu_time(const ::rusage& ru) {
4778 return timeval_to_duration(ru.ru_stime) + timeval_to_duration(ru.ru_utime);
4779 }
4780 void mark_run_start() {
4781 _run_start = reactor::now();
4782 _run_start_rusage = get_rusage();
4783 }
4784 void mark_run_end() {
4785 auto start_nvcsw = _run_start_rusage.ru_nvcsw;
4786 auto start_cpu_time = cpu_time(_run_start_rusage);
4787 auto start_time = _run_start;
4788 _run_start = reactor::now();
4789 _run_start_rusage = get_rusage();
4790 _kernel_stalls += _run_start_rusage.ru_nvcsw - start_nvcsw;
4791 _nonsleep_cpu_time += cpu_time(_run_start_rusage) - start_cpu_time;
4792 _nonsleep_wall_time += _run_start - start_time;
4793 }
4794 public:
4795 reactor_stall_sampler() { mark_run_start(); }
4796 virtual bool poll() override { return false; }
4797 virtual bool pure_poll() override { return false; }
4798 virtual bool try_enter_interrupt_mode() override {
4799 // try_enter_interrupt_mode marks the end of a reactor run that should be context-switch free
4800 mark_run_end();
4801 return true;
4802 }
4803 virtual void exit_interrupt_mode() override {
4804 // start a reactor run that should be context switch free
4805 mark_run_start();
4806 }
4807 stall_report report() const {
4808 stall_report r;
4809 // mark_run_end() with an immediate mark_run_start() is logically a no-op,
4810 // but each one of them has an effect, so they can't be marked const
4811 const_cast<reactor_stall_sampler*>(this)->mark_run_end();
4812 r.kernel_stalls = _kernel_stalls;
4813 r.run_wall_time = _nonsleep_wall_time;
4814 r.stall_time = _nonsleep_wall_time - _nonsleep_cpu_time;
4815 const_cast<reactor_stall_sampler*>(this)->mark_run_start();
4816 return r;
4817 }
4818 };
4819
4820 future<stall_report>
4821 report_reactor_stalls(noncopyable_function<future<> ()> uut) {
4822 auto reporter = std::make_unique<reactor_stall_sampler>();
4823 auto p_reporter = reporter.get();
4824 auto poller = reactor::poller(std::move(reporter));
4825 return uut().then([poller = std::move(poller), p_reporter] () mutable {
4826 return p_reporter->report();
4827 });
4828 }
4829
4830 std::ostream& operator<<(std::ostream& os, const stall_report& sr) {
4831 auto to_ms = [] (sched_clock::duration d) -> float {
4832 return std::chrono::duration<float>(d) / 1ms;
4833 };
4834 return os << format("{} stalls, {} ms stall time, {} ms run time", sr.kernel_stalls, to_ms(sr.stall_time), to_ms(sr.run_wall_time));
4835 }
4836
4837 size_t scheduling_group_count() {
4838 auto b = s_used_scheduling_group_ids_bitmap.load(std::memory_order_relaxed);
4839 return __builtin_popcountl(b);
4840 }
4841
4842 }
4843
4844 #ifdef SEASTAR_TASK_BACKTRACE
4845
4846 void task::make_backtrace() noexcept {
4847 memory::disable_backtrace_temporarily dbt;
4848 try {
4849 _bt = make_lw_shared<simple_backtrace>(current_backtrace_tasklocal());
4850 } catch (...) {
4851 _bt = nullptr;
4852 }
4853 }
4854
4855 #endif
4856
4857 }