]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/core/reactor.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / reactor.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2014 Cloudius Systems
20 */
21
22#define __user /* empty */ // for xfs includes, below
23
24#include <cinttypes>
25#include <sys/syscall.h>
26#include <sys/vfs.h>
27#include <sys/statfs.h>
28#include <sys/time.h>
29#include <sys/resource.h>
f67539c2 30#include <sys/inotify.h>
11fdf7f2
TL
31#include <seastar/core/task.hh>
32#include <seastar/core/reactor.hh>
33#include <seastar/core/memory.hh>
34#include <seastar/core/posix.hh>
35#include <seastar/net/packet.hh>
36#include <seastar/net/stack.hh>
37#include <seastar/net/posix-stack.hh>
38#include <seastar/net/native-stack.hh>
39#include <seastar/core/resource.hh>
40#include <seastar/core/print.hh>
41#include "core/scollectd-impl.hh"
42#include <seastar/util/conversions.hh>
f67539c2
TL
43#include <seastar/core/loop.hh>
44#include <seastar/core/with_scheduling_group.hh>
11fdf7f2 45#include <seastar/core/thread.hh>
f67539c2 46#include <seastar/core/make_task.hh>
11fdf7f2
TL
47#include <seastar/core/systemwide_memory_barrier.hh>
48#include <seastar/core/report_exception.hh>
49#include <seastar/core/stall_sampler.hh>
50#include <seastar/core/thread_cputime_clock.hh>
9f95a23c
TL
51#include <seastar/core/abort_on_ebadf.hh>
52#include <seastar/core/io_queue.hh>
53#include <seastar/core/internal/io_desc.hh>
f67539c2
TL
54#include <seastar/core/internal/buffer_allocator.hh>
55#include <seastar/core/scheduling_specific.hh>
11fdf7f2
TL
56#include <seastar/util/log.hh>
57#include "core/file-impl.hh"
9f95a23c
TL
58#include "core/reactor_backend.hh"
59#include "core/syscall_result.hh"
60#include "core/thread_pool.hh"
11fdf7f2 61#include "syscall_work_queue.hh"
9f95a23c
TL
62#include "cgroup.hh"
63#include "uname.hh"
11fdf7f2
TL
64#include <cassert>
65#include <unistd.h>
66#include <fcntl.h>
67#include <sys/eventfd.h>
68#include <sys/poll.h>
69#include <boost/lexical_cast.hpp>
11fdf7f2
TL
70#include <boost/thread/barrier.hpp>
71#include <boost/algorithm/string/classification.hpp>
72#include <boost/algorithm/string/split.hpp>
73#include <boost/iterator/counting_iterator.hpp>
74#include <boost/range/numeric.hpp>
75#include <boost/range/algorithm/sort.hpp>
76#include <boost/range/algorithm/remove_if.hpp>
9f95a23c 77#include <boost/range/algorithm/find_if.hpp>
11fdf7f2
TL
78#include <boost/algorithm/clamp.hpp>
79#include <boost/range/adaptor/transformed.hpp>
80#include <boost/range/adaptor/map.hpp>
81#include <boost/version.hpp>
82#include <atomic>
11fdf7f2
TL
83#include <dirent.h>
84#include <linux/types.h> // for xfs, below
85#include <sys/ioctl.h>
86#include <xfs/linux.h>
87#define min min /* prevent xfs.h from defining min() as a macro */
88#include <xfs/xfs.h>
89#undef min
90#ifdef SEASTAR_HAVE_DPDK
91#include <seastar/core/dpdk_rte.hh>
92#include <rte_lcore.h>
93#include <rte_launch.h>
94#endif
95#include <seastar/core/prefetch.hh>
96#include <exception>
97#include <regex>
9f95a23c 98#include <fstream>
11fdf7f2
TL
99#ifdef __GNUC__
100#include <iostream>
101#include <system_error>
102#include <cxxabi.h>
103#endif
104
105#ifdef SEASTAR_SHUFFLE_TASK_QUEUE
106#include <random>
107#endif
108
109#include <sys/mman.h>
110#include <sys/utsname.h>
111#include <linux/falloc.h>
112#include <linux/magic.h>
113#include <seastar/util/backtrace.hh>
114#include <seastar/util/spinlock.hh>
115#include <seastar/util/print_safe.hh>
116#include <sys/sdt.h>
117
118#ifdef HAVE_OSV
119#include <osv/newpoll.hh>
120#endif
121
122#if defined(__x86_64__) || defined(__i386__)
123#include <xmmintrin.h>
124#endif
125
126#include <seastar/util/defer.hh>
127#include <seastar/core/alien.hh>
128#include <seastar/core/metrics.hh>
129#include <seastar/core/execution_stage.hh>
130#include <seastar/core/exception_hacks.hh>
131#include "stall_detector.hh"
f67539c2 132#include <seastar/util/memory_diagnostics.hh>
11fdf7f2
TL
133
134#include <yaml-cpp/yaml.h>
135
136#ifdef SEASTAR_TASK_HISTOGRAM
137#include <typeinfo>
138#endif
139
140namespace seastar {
141
142struct mountpoint_params {
143 std::string mountpoint = "none";
144 uint64_t read_bytes_rate = std::numeric_limits<uint64_t>::max();
145 uint64_t write_bytes_rate = std::numeric_limits<uint64_t>::max();
146 uint64_t read_req_rate = std::numeric_limits<uint64_t>::max();
147 uint64_t write_req_rate = std::numeric_limits<uint64_t>::max();
148 uint64_t num_io_queues = 0; // calculated
149};
150
151}
9f95a23c 152
11fdf7f2
TL
153namespace YAML {
154template<>
155struct convert<seastar::mountpoint_params> {
156 static bool decode(const Node& node, seastar::mountpoint_params& mp) {
157 using namespace seastar;
158 mp.mountpoint = node["mountpoint"].as<std::string>().c_str();
159 mp.read_bytes_rate = parse_memory_size(node["read_bandwidth"].as<std::string>());
160 mp.read_req_rate = parse_memory_size(node["read_iops"].as<std::string>());
161 mp.write_bytes_rate = parse_memory_size(node["write_bandwidth"].as<std::string>());
162 mp.write_req_rate = parse_memory_size(node["write_iops"].as<std::string>());
163 return true;
164 }
165};
166}
167
168namespace seastar {
169
9f95a23c
TL
170seastar::logger seastar_logger("seastar");
171seastar::logger sched_logger("scheduler");
172
f67539c2
TL
173shard_id reactor::cpu_id() const {
174 assert(_id == this_shard_id());
175 return _id;
176}
177
11fdf7f2
TL
178io_priority_class
179reactor::register_one_priority_class(sstring name, uint32_t shares) {
180 return io_queue::register_one_priority_class(std::move(name), shares);
181}
182
183future<>
184reactor::update_shares_for_class(io_priority_class pc, uint32_t shares) {
185 return parallel_for_each(_io_queues, [pc, shares] (auto& queue) {
186 return queue.second->update_shares_for_class(pc, shares);
187 });
188}
189
9f95a23c 190future<>
f67539c2 191reactor::rename_priority_class(io_priority_class pc, sstring new_name) noexcept {
9f95a23c 192
f67539c2 193 return futurize_invoke([pc, new_name = std::move(new_name)] () mutable {
9f95a23c
TL
194 // Taking the lock here will prevent from newly registered classes
195 // to register under the old name (and will prevent undefined
196 // behavior since this array is shared cross shards. However, it
197 // doesn't prevent the case where a newly registered class (that
198 // got registered right after the lock release) will be unnecessarily
199 // renamed. This is not a real problem and it is a lot better than
200 // holding the lock until all cross shard activity is over.
201
202 try {
f67539c2
TL
203 if (!io_queue::rename_one_priority_class(pc, new_name)) {
204 return make_ready_future<>();
9f95a23c 205 }
9f95a23c
TL
206 } catch (...) {
207 sched_logger.error("exception while trying to rename priority group with id {} to \"{}\" ({})",
208 pc.id(), new_name, std::current_exception());
209 std::rethrow_exception(std::current_exception());
210 }
f67539c2 211 return smp::invoke_on_all([pc, new_name = std::move(new_name)] {
9f95a23c
TL
212 for (auto&& queue : engine()._io_queues) {
213 queue.second->rename_priority_class(pc, new_name);
214 }
215 });
216 });
217}
218
219future<std::tuple<pollable_fd, socket_address>>
220reactor::do_accept(pollable_fd_state& listenfd) {
11fdf7f2
TL
221 return readable_or_writeable(listenfd).then([this, &listenfd] () mutable {
222 socket_address sa;
9f95a23c
TL
223 listenfd.maybe_no_more_recv();
224 auto maybe_fd = listenfd.fd.try_accept(sa, SOCK_NONBLOCK | SOCK_CLOEXEC);
225 if (!maybe_fd) {
226 // We speculated that we will have an another connection, but got a false
227 // positive. Try again without speculation.
228 return do_accept(listenfd);
229 }
230 // Speculate that there is another connection on this listening socket, to avoid
231 // a task-quota delay. Usually this will fail, but accept is a rare-enough operation
232 // that it is worth the false positive in order to withstand a connection storm
233 // without having to accept at a rate of 1 per task quota.
234 listenfd.speculate_epoll(EPOLLIN);
235 pollable_fd pfd(std::move(*maybe_fd), pollable_fd::speculation(EPOLLOUT));
236 return make_ready_future<std::tuple<pollable_fd, socket_address>>(std::make_tuple(std::move(pfd), std::move(sa)));
237 });
238}
239
240future<> reactor::do_connect(pollable_fd_state& pfd, socket_address& sa) {
241 pfd.fd.connect(sa.u.sa, sa.length());
242 return pfd.writeable().then([&pfd]() mutable {
243 auto err = pfd.fd.getsockopt<int>(SOL_SOCKET, SO_ERROR);
244 if (err != 0) {
245 throw std::system_error(err, std::system_category());
246 }
247 return make_ready_future<>();
11fdf7f2
TL
248 });
249}
250
251future<size_t>
9f95a23c 252reactor::do_read_some(pollable_fd_state& fd, void* buffer, size_t len) {
11fdf7f2
TL
253 return readable(fd).then([this, &fd, buffer, len] () mutable {
254 auto r = fd.fd.read(buffer, len);
255 if (!r) {
9f95a23c 256 return do_read_some(fd, buffer, len);
11fdf7f2
TL
257 }
258 if (size_t(*r) == len) {
259 fd.speculate_epoll(EPOLLIN);
260 }
261 return make_ready_future<size_t>(*r);
262 });
263}
264
f67539c2
TL
265future<temporary_buffer<char>>
266reactor::do_read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) {
267 return fd.readable().then([this, &fd, ba] {
268 auto buffer = ba->allocate_buffer();
269 auto r = fd.fd.read(buffer.get_write(), buffer.size());
270 if (!r) {
271 // Speculation failure, try again with real polling this time
272 // Note we release the buffer and will reallocate it when poll
273 // completes.
274 return do_read_some(fd, ba);
275 }
276 if (size_t(*r) == buffer.size()) {
277 fd.speculate_epoll(EPOLLIN);
278 }
279 buffer.trim(*r);
280 return make_ready_future<temporary_buffer<char>>(std::move(buffer));
281 });
282}
283
11fdf7f2 284future<size_t>
9f95a23c 285reactor::do_read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) {
11fdf7f2
TL
286 return readable(fd).then([this, &fd, iov = iov] () mutable {
287 ::msghdr mh = {};
288 mh.msg_iov = &iov[0];
289 mh.msg_iovlen = iov.size();
290 auto r = fd.fd.recvmsg(&mh, 0);
291 if (!r) {
9f95a23c 292 return do_read_some(fd, iov);
11fdf7f2
TL
293 }
294 if (size_t(*r) == iovec_len(iov)) {
295 fd.speculate_epoll(EPOLLIN);
296 }
297 return make_ready_future<size_t>(*r);
298 });
299}
300
301future<size_t>
9f95a23c 302reactor::do_write_some(pollable_fd_state& fd, const void* buffer, size_t len) {
11fdf7f2
TL
303 return writeable(fd).then([this, &fd, buffer, len] () mutable {
304 auto r = fd.fd.send(buffer, len, MSG_NOSIGNAL);
305 if (!r) {
9f95a23c 306 return do_write_some(fd, buffer, len);
11fdf7f2
TL
307 }
308 if (size_t(*r) == len) {
309 fd.speculate_epoll(EPOLLOUT);
310 }
311 return make_ready_future<size_t>(*r);
312 });
313}
314
9f95a23c
TL
315future<size_t>
316reactor::do_write_some(pollable_fd_state& fd, net::packet& p) {
317 return writeable(fd).then([this, &fd, &p] () mutable {
318 static_assert(offsetof(iovec, iov_base) == offsetof(net::fragment, base) &&
319 sizeof(iovec::iov_base) == sizeof(net::fragment::base) &&
320 offsetof(iovec, iov_len) == offsetof(net::fragment, size) &&
321 sizeof(iovec::iov_len) == sizeof(net::fragment::size) &&
322 alignof(iovec) == alignof(net::fragment) &&
323 sizeof(iovec) == sizeof(net::fragment)
324 , "net::fragment and iovec should be equivalent");
325
326 iovec* iov = reinterpret_cast<iovec*>(p.fragment_array());
327 msghdr mh = {};
328 mh.msg_iov = iov;
329 mh.msg_iovlen = std::min<size_t>(p.nr_frags(), IOV_MAX);
330 auto r = fd.fd.sendmsg(&mh, MSG_NOSIGNAL);
331 if (!r) {
332 return do_write_some(fd, p);
333 }
334 if (size_t(*r) == p.len()) {
335 fd.speculate_epoll(EPOLLOUT);
336 }
337 return make_ready_future<size_t>(*r);
338 });
339}
340
11fdf7f2
TL
341future<>
342reactor::write_all_part(pollable_fd_state& fd, const void* buffer, size_t len, size_t completed) {
343 if (completed == len) {
344 return make_ready_future<>();
345 } else {
9f95a23c 346 return _backend->write_some(fd, static_cast<const char*>(buffer) + completed, len - completed).then(
11fdf7f2
TL
347 [&fd, buffer, len, completed, this] (size_t part) mutable {
348 return write_all_part(fd, buffer, len, completed + part);
349 });
350 }
351}
352
353future<>
354reactor::write_all(pollable_fd_state& fd, const void* buffer, size_t len) {
355 assert(len);
356 return write_all_part(fd, buffer, len, 0);
357}
358
9f95a23c
TL
359future<size_t> pollable_fd_state::read_some(char* buffer, size_t size) {
360 return engine()._backend->read_some(*this, buffer, size);
11fdf7f2
TL
361}
362
9f95a23c
TL
363future<size_t> pollable_fd_state::read_some(uint8_t* buffer, size_t size) {
364 return engine()._backend->read_some(*this, buffer, size);
11fdf7f2
TL
365}
366
9f95a23c
TL
367future<size_t> pollable_fd_state::read_some(const std::vector<iovec>& iov) {
368 return engine()._backend->read_some(*this, iov);
11fdf7f2
TL
369}
370
f67539c2
TL
371future<temporary_buffer<char>> pollable_fd_state::read_some(internal::buffer_allocator* ba) {
372 return engine()._backend->read_some(*this, ba);
373}
374
9f95a23c
TL
375future<size_t> pollable_fd_state::write_some(net::packet& p) {
376 return engine()._backend->write_some(*this, p);
11fdf7f2
TL
377}
378
9f95a23c
TL
379future<> pollable_fd_state::write_all(const char* buffer, size_t size) {
380 return engine().write_all(*this, buffer, size);
11fdf7f2
TL
381}
382
9f95a23c
TL
383future<> pollable_fd_state::write_all(const uint8_t* buffer, size_t size) {
384 return engine().write_all(*this, buffer, size);
11fdf7f2
TL
385}
386
9f95a23c 387future<> pollable_fd_state::write_all(net::packet& p) {
11fdf7f2
TL
388 return write_some(p).then([this, &p] (size_t size) {
389 if (p.len() == size) {
390 return make_ready_future<>();
391 }
392 p.trim_front(size);
393 return write_all(p);
394 });
395}
396
9f95a23c
TL
397future<> pollable_fd_state::readable() {
398 return engine().readable(*this);
11fdf7f2
TL
399}
400
9f95a23c
TL
401future<> pollable_fd_state::writeable() {
402 return engine().writeable(*this);
11fdf7f2
TL
403}
404
9f95a23c
TL
405future<> pollable_fd_state::readable_or_writeable() {
406 return engine().readable_or_writeable(*this);
11fdf7f2
TL
407}
408
409void
9f95a23c
TL
410pollable_fd_state::abort_reader() {
411 engine().abort_reader(*this);
11fdf7f2
TL
412}
413
414void
9f95a23c
TL
415pollable_fd_state::abort_writer() {
416 engine().abort_writer(*this);
11fdf7f2
TL
417}
418
9f95a23c
TL
419future<std::tuple<pollable_fd, socket_address>> pollable_fd_state::accept() {
420 return engine()._backend->accept(*this);
11fdf7f2
TL
421}
422
9f95a23c
TL
423future<> pollable_fd_state::connect(socket_address& sa) {
424 return engine()._backend->connect(*this, sa);
425}
426
427future<size_t> pollable_fd_state::recvmsg(struct msghdr *msg) {
11fdf7f2 428 maybe_no_more_recv();
9f95a23c
TL
429 return engine().readable(*this).then([this, msg] {
430 auto r = fd.recvmsg(msg, 0);
11fdf7f2
TL
431 if (!r) {
432 return recvmsg(msg);
433 }
434 // We always speculate here to optimize for throughput in a workload
435 // with multiple outstanding requests. This way the caller can consume
436 // all messages without resorting to epoll. However this adds extra
437 // recvmsg() call when we hit the empty queue condition, so it may
438 // hurt request-response workload in which the queue is empty when we
439 // initially enter recvmsg(). If that turns out to be a problem, we can
440 // improve speculation by using recvmmsg().
9f95a23c 441 speculate_epoll(EPOLLIN);
11fdf7f2
TL
442 return make_ready_future<size_t>(*r);
443 });
444};
445
9f95a23c 446future<size_t> pollable_fd_state::sendmsg(struct msghdr* msg) {
11fdf7f2 447 maybe_no_more_send();
9f95a23c
TL
448 return engine().writeable(*this).then([this, msg] () mutable {
449 auto r = fd.sendmsg(msg, 0);
11fdf7f2
TL
450 if (!r) {
451 return sendmsg(msg);
452 }
453 // For UDP this will always speculate. We can't know if there's room
454 // or not, but most of the time there should be so the cost of mis-
455 // speculation is amortized.
456 if (size_t(*r) == iovec_len(msg->msg_iov, msg->msg_iovlen)) {
9f95a23c 457 speculate_epoll(EPOLLOUT);
11fdf7f2
TL
458 }
459 return make_ready_future<size_t>(*r);
460 });
461}
462
9f95a23c 463future<size_t> pollable_fd_state::sendto(socket_address addr, const void* buf, size_t len) {
11fdf7f2 464 maybe_no_more_send();
9f95a23c
TL
465 return engine().writeable(*this).then([this, buf, len, addr] () mutable {
466 auto r = fd.sendto(addr, buf, len, 0);
11fdf7f2
TL
467 if (!r) {
468 return sendto(std::move(addr), buf, len);
469 }
470 // See the comment about speculation in sendmsg().
471 if (size_t(*r) == len) {
9f95a23c 472 speculate_epoll(EPOLLOUT);
11fdf7f2
TL
473 }
474 return make_ready_future<size_t>(*r);
475 });
476}
477
478namespace internal {
479
480#ifdef SEASTAR_TASK_HISTOGRAM
481
482class task_histogram {
483 static constexpr unsigned max_countdown = 1'000'000;
484 std::unordered_map<std::type_index, uint64_t> _histogram;
485 unsigned _countdown_to_print = max_countdown;
486public:
487 void add(const task& t) {
488 ++_histogram[std::type_index(typeid(t))];
489 if (!--_countdown_to_print) {
490 print();
491 _countdown_to_print = max_countdown;
492 _histogram.clear();
493 }
494 }
495 void print() const {
496 seastar::fmt::print("task histogram, {:d} task types {:d} tasks\n", _histogram.size(), max_countdown - _countdown_to_print);
497 for (auto&& type_count : _histogram) {
498 auto&& type = type_count.first;
499 auto&& count = type_count.second;
500 seastar::fmt::print(" {:10d} {}\n", count, type.name());
501 }
502 }
503};
504
505thread_local task_histogram this_thread_task_histogram;
506
507#endif
508
509void task_histogram_add_task(const task& t) {
510#ifdef SEASTAR_TASK_HISTOGRAM
511 this_thread_task_histogram.add(t);
512#endif
513}
514
515}
516
517using namespace std::chrono_literals;
f67539c2 518namespace fs = std::filesystem;
11fdf7f2
TL
519
520using namespace net;
521
522using namespace internal;
523using namespace internal::linux_abi;
524
11fdf7f2
TL
525std::atomic<lowres_clock_impl::steady_rep> lowres_clock_impl::counters::_steady_now;
526std::atomic<lowres_clock_impl::system_rep> lowres_clock_impl::counters::_system_now;
527std::atomic<manual_clock::rep> manual_clock::_now;
528constexpr std::chrono::milliseconds lowres_clock_impl::_granularity;
529
530constexpr unsigned reactor::max_queues;
531constexpr unsigned reactor::max_aio_per_queue;
532
9f95a23c
TL
533// Broken (returns spurious EIO). Cause/fix unknown.
534bool aio_nowait_supported = false;
535
11fdf7f2
TL
536static bool sched_debug() {
537 return false;
538}
539
540template <typename... Args>
541void
542sched_print(const char* fmt, Args&&... args) {
543 if (sched_debug()) {
544 sched_logger.trace(fmt, std::forward<Args>(args)...);
545 }
546}
547
9f95a23c
TL
548static std::atomic<bool> abort_on_ebadf = { false };
549
550void set_abort_on_ebadf(bool do_abort) {
551 abort_on_ebadf.store(do_abort);
552}
553
554bool is_abort_on_ebadf_enabled() {
555 return abort_on_ebadf.load();
556}
557
11fdf7f2
TL
558timespec to_timespec(steady_clock_type::time_point t) {
559 using ns = std::chrono::nanoseconds;
560 auto n = std::chrono::duration_cast<ns>(t.time_since_epoch()).count();
561 return { n / 1'000'000'000, n % 1'000'000'000 };
562}
563
564lowres_clock_impl::lowres_clock_impl() {
565 update();
566 _timer.set_callback(&lowres_clock_impl::update);
567 _timer.arm_periodic(_granularity);
568}
569
f67539c2 570void lowres_clock_impl::update() noexcept {
11fdf7f2
TL
571 auto const steady_count =
572 std::chrono::duration_cast<steady_duration>(base_steady_clock::now().time_since_epoch()).count();
573
574 auto const system_count =
575 std::chrono::duration_cast<system_duration>(base_system_clock::now().time_since_epoch()).count();
576
577 counters::_steady_now.store(steady_count, std::memory_order_relaxed);
578 counters::_system_now.store(system_count, std::memory_order_relaxed);
579}
580
9f95a23c
TL
581template <typename Clock>
582inline
583timer<Clock>::~timer() {
584 if (_queued) {
585 engine().del_timer(this);
11fdf7f2 586 }
11fdf7f2
TL
587}
588
9f95a23c
TL
589template <typename Clock>
590inline
f67539c2 591void timer<Clock>::arm(time_point until, std::optional<duration> period) noexcept {
9f95a23c
TL
592 arm_state(until, period);
593 engine().add_timer(this);
11fdf7f2
TL
594}
595
9f95a23c
TL
596template <typename Clock>
597inline
f67539c2 598void timer<Clock>::readd_periodic() noexcept {
9f95a23c
TL
599 arm_state(Clock::now() + _period.value(), {_period.value()});
600 engine().queue_timer(this);
11fdf7f2
TL
601}
602
9f95a23c
TL
603template <typename Clock>
604inline
f67539c2 605bool timer<Clock>::cancel() noexcept {
9f95a23c
TL
606 if (!_armed) {
607 return false;
11fdf7f2 608 }
9f95a23c
TL
609 _armed = false;
610 if (_queued) {
611 engine().del_timer(this);
612 _queued = false;
11fdf7f2 613 }
9f95a23c 614 return true;
11fdf7f2
TL
615}
616
9f95a23c
TL
617template class timer<steady_clock_type>;
618template class timer<lowres_clock>;
619template class timer<manual_clock>;
11fdf7f2
TL
620
621reactor::signals::signals() : _pending_signals(0) {
622}
623
624reactor::signals::~signals() {
625 sigset_t mask;
626 sigfillset(&mask);
627 ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
628}
629
9f95a23c 630reactor::signals::signal_handler::signal_handler(int signo, noncopyable_function<void ()>&& handler)
11fdf7f2 631 : _handler(std::move(handler)) {
11fdf7f2
TL
632}
633
9f95a23c
TL
634void
635reactor::signals::handle_signal(int signo, noncopyable_function<void ()>&& handler) {
636 _signal_handlers.emplace(std::piecewise_construct,
637 std::make_tuple(signo), std::make_tuple(signo, std::move(handler)));
638
11fdf7f2 639 struct sigaction sa;
9f95a23c
TL
640 sa.sa_sigaction = [](int sig, siginfo_t *info, void *p) {
641 engine()._backend->signal_received(sig, info, p);
642 };
11fdf7f2
TL
643 sa.sa_mask = make_empty_sigset_mask();
644 sa.sa_flags = SA_SIGINFO | SA_RESTART;
645 auto r = ::sigaction(signo, &sa, nullptr);
646 throw_system_error_on(r == -1);
647 auto mask = make_sigset_mask(signo);
648 r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
649 throw_pthread_error(r);
650}
651
652void
9f95a23c 653reactor::signals::handle_signal_once(int signo, noncopyable_function<void ()>&& handler) {
11fdf7f2
TL
654 return handle_signal(signo, [fired = false, handler = std::move(handler)] () mutable {
655 if (!fired) {
656 fired = true;
657 handler();
658 }
659 });
660}
661
662bool reactor::signals::poll_signal() {
663 auto signals = _pending_signals.load(std::memory_order_relaxed);
664 if (signals) {
665 _pending_signals.fetch_and(~signals, std::memory_order_relaxed);
666 for (size_t i = 0; i < sizeof(signals)*8; i++) {
667 if (signals & (1ull << i)) {
668 _signal_handlers.at(i)._handler();
669 }
670 }
671 }
672 return signals;
673}
674
675bool reactor::signals::pure_poll_signal() const {
676 return _pending_signals.load(std::memory_order_relaxed);
677}
678
679void reactor::signals::action(int signo, siginfo_t* siginfo, void* ignore) {
9f95a23c 680 engine().start_handling_signal();
11fdf7f2
TL
681 engine()._signals._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed);
682}
683
684void reactor::signals::failed_to_handle(int signo) {
685 char tname[64];
686 pthread_getname_np(pthread_self(), tname, sizeof(tname));
687 auto tid = syscall(SYS_gettid);
688 seastar_logger.error("Failed to handle signal {} on thread {} ({}): engine not ready", signo, tid, tname);
689}
690
9f95a23c 691void reactor::handle_signal(int signo, noncopyable_function<void ()>&& handler) {
11fdf7f2
TL
692 _signals.handle_signal(signo, std::move(handler));
693}
694
11fdf7f2
TL
695// Accumulates an in-memory backtrace and flush to stderr eventually.
696// Async-signal safe.
697class backtrace_buffer {
698 static constexpr unsigned _max_size = 8 << 10;
699 unsigned _pos = 0;
700 char _buf[_max_size];
701public:
702 void flush() noexcept {
703 print_safe(_buf, _pos);
704 _pos = 0;
705 }
706
707 void append(const char* str, size_t len) noexcept {
708 if (_pos + len >= _max_size) {
709 flush();
710 }
711 memcpy(_buf + _pos, str, len);
712 _pos += len;
713 }
714
715 void append(const char* str) noexcept { append(str, strlen(str)); }
716
717 template <typename Integral>
718 void append_decimal(Integral n) noexcept {
719 char buf[sizeof(n) * 3];
720 auto len = convert_decimal_safe(buf, sizeof(buf), n);
721 append(buf, len);
722 }
723
724 template <typename Integral>
725 void append_hex(Integral ptr) noexcept {
726 char buf[sizeof(ptr) * 2];
727 convert_zero_padded_hex_safe(buf, sizeof(buf), ptr);
728 append(buf, sizeof(buf));
729 }
730
731 void append_backtrace() noexcept {
732 backtrace([this] (frame f) {
733 append(" ");
734 if (!f.so->name.empty()) {
735 append(f.so->name.c_str(), f.so->name.size());
736 append("+");
737 }
738
739 append("0x");
740 append_hex(f.addr);
741 append("\n");
742 });
743 }
744};
745
746static void print_with_backtrace(backtrace_buffer& buf) noexcept {
747 if (local_engine) {
748 buf.append(" on shard ");
f67539c2 749 buf.append_decimal(this_shard_id());
11fdf7f2
TL
750 }
751
752 buf.append(".\nBacktrace:\n");
753 buf.append_backtrace();
754 buf.flush();
755}
756
757static void print_with_backtrace(const char* cause) noexcept {
758 backtrace_buffer buf;
759 buf.append(cause);
760 print_with_backtrace(buf);
761}
762
763// Installs signal handler stack for current thread.
764// The stack remains installed as long as the returned object is kept alive.
765// When it goes out of scope the previous handler is restored.
766static decltype(auto) install_signal_handler_stack() {
767 size_t size = SIGSTKSZ;
768 auto mem = std::make_unique<char[]>(size);
769 stack_t stack;
770 stack_t prev_stack;
771 stack.ss_sp = mem.get();
772 stack.ss_flags = 0;
773 stack.ss_size = size;
774 auto r = sigaltstack(&stack, &prev_stack);
775 throw_system_error_on(r == -1);
776 return defer([mem = std::move(mem), prev_stack] () mutable {
777 try {
778 auto r = sigaltstack(&prev_stack, NULL);
779 throw_system_error_on(r == -1);
780 } catch (...) {
781 mem.release(); // We failed to restore previous stack, must leak it.
782 seastar_logger.error("Failed to restore signal stack: {}", std::current_exception());
783 }
784 });
785}
786
787reactor::task_queue::task_queue(unsigned id, sstring name, float shares)
788 : _shares(std::max(shares, 1.0f))
789 , _reciprocal_shares_times_2_power_32((uint64_t(1) << 32) / _shares)
790 , _id(id)
f67539c2 791 , _ts(std::chrono::steady_clock::now())
11fdf7f2 792 , _name(name) {
9f95a23c
TL
793 register_stats();
794}
795
796void
797reactor::task_queue::register_stats() {
798 seastar::metrics::metric_groups new_metrics;
11fdf7f2
TL
799 namespace sm = seastar::metrics;
800 static auto group = sm::label("group");
801 auto group_label = group(_name);
9f95a23c 802 new_metrics.add_group("scheduler", {
11fdf7f2
TL
803 sm::make_counter("runtime_ms", [this] {
804 return std::chrono::duration_cast<std::chrono::milliseconds>(_runtime).count();
805 }, sm::description("Accumulated runtime of this task queue; an increment rate of 1000ms per second indicates full utilization"),
806 {group_label}),
f67539c2
TL
807 sm::make_counter("waittime_ms", [this] {
808 return std::chrono::duration_cast<std::chrono::milliseconds>(_waittime).count();
809 }, sm::description("Accumulated waittime of this task queue; an increment rate of 1000ms per second indicates queue is waiting for something (e.g. IO)"),
810 {group_label}),
811 sm::make_counter("starvetime_ms", [this] {
812 return std::chrono::duration_cast<std::chrono::milliseconds>(_starvetime).count();
813 }, sm::description("Accumulated starvation time of this task queue; an increment rate of 1000ms per second indicates the scheduler feels really bad"),
814 {group_label}),
11fdf7f2
TL
815 sm::make_counter("tasks_processed", _tasks_processed,
816 sm::description("Count of tasks executing on this queue; indicates together with runtime_ms indicates length of tasks"),
817 {group_label}),
818 sm::make_gauge("queue_length", [this] { return _q.size(); },
819 sm::description("Size of backlog on this queue, in tasks; indicates whether the queue is busy and/or contended"),
820 {group_label}),
821 sm::make_gauge("shares", [this] { return _shares; },
822 sm::description("Shares allocated to this queue"),
823 {group_label}),
824 sm::make_derive("time_spent_on_task_quota_violations_ms", [this] {
825 return _time_spent_on_task_quota_violations / 1ms;
826 }, sm::description("Total amount in milliseconds we were in violation of the task quota"),
827 {group_label}),
828 });
9f95a23c
TL
829 _metrics = std::exchange(new_metrics, {});
830}
831
832void
833reactor::task_queue::rename(sstring new_name) {
834 if (_name != new_name) {
835 _name = new_name;
836 register_stats();
837 }
11fdf7f2
TL
838}
839
9f95a23c
TL
840#ifdef __clang__
841__attribute__((no_sanitize("undefined"))) // multiplication below may overflow; we check for that
842#elif defined(__GNUC__)
843[[gnu::no_sanitize_undefined]]
844#endif
11fdf7f2
TL
845inline
846int64_t
847reactor::task_queue::to_vruntime(sched_clock::duration runtime) const {
848 auto scaled = (runtime.count() * _reciprocal_shares_times_2_power_32) >> 32;
849 // Prevent overflow from returning ridiculous values
850 return std::max<int64_t>(scaled, 0);
851}
852
853void
f67539c2 854reactor::task_queue::set_shares(float shares) noexcept {
11fdf7f2
TL
855 _shares = std::max(shares, 1.0f);
856 _reciprocal_shares_times_2_power_32 = (uint64_t(1) << 32) / _shares;
857}
858
859void
860reactor::account_runtime(task_queue& tq, sched_clock::duration runtime) {
9f95a23c 861 if (runtime > (2 * _task_quota)) {
11fdf7f2
TL
862 tq._time_spent_on_task_quota_violations += runtime - _task_quota;
863 }
864 tq._vruntime += tq.to_vruntime(runtime);
865 tq._runtime += runtime;
866}
867
868void
869reactor::account_idle(sched_clock::duration runtime) {
870 // anything to do here?
871}
872
873struct reactor::task_queue::indirect_compare {
874 bool operator()(const task_queue* tq1, const task_queue* tq2) const {
875 return tq1->_vruntime < tq2->_vruntime;
876 }
877};
878
9f95a23c
TL
879reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
880 : _cfg(cfg)
881 , _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC))
11fdf7f2 882 , _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
11fdf7f2
TL
883 , _id(id)
884#ifdef HAVE_OSV
885 , _timer_thread(
886 [&] { timer_thread_func(); }, sched::thread::attr().stack(4096).name("timer_thread").pin(sched::cpu::current()))
887 , _engine_thread(sched::thread::current())
888#endif
889 , _cpu_started(0)
f67539c2 890 , _cpu_stall_detector(std::make_unique<cpu_stall_detector>())
11fdf7f2
TL
891 , _reuseport(posix_reuseport_detect())
892 , _thread_pool(std::make_unique<thread_pool>(this, seastar::format("syscall-{}", id))) {
9f95a23c
TL
893 /*
894 * The _backend assignment is here, not on the initialization list as
895 * the chosen backend constructor may want to handle signals and thus
896 * needs the _signals._signal_handlers map to be initialized.
897 */
898 _backend = rbs.create(this);
f67539c2 899 *internal::get_scheduling_group_specific_thread_local_data_ptr() = &_scheduling_group_specific_data;
11fdf7f2
TL
900 _task_queues.push_back(std::make_unique<task_queue>(0, "main", 1000));
901 _task_queues.push_back(std::make_unique<task_queue>(1, "atexit", 1000));
902 _at_destroy_tasks = _task_queues.back().get();
903 g_need_preempt = &_preemption_monitor;
904 seastar::thread_impl::init();
905 _backend->start_tick();
9f95a23c 906
11fdf7f2
TL
907#ifdef HAVE_OSV
908 _timer_thread.start();
909#else
910 sigset_t mask;
911 sigemptyset(&mask);
11fdf7f2 912 sigaddset(&mask, cpu_stall_detector::signal_number());
9f95a23c 913 auto r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
11fdf7f2
TL
914 assert(r == 0);
915#endif
916 memory::set_reclaim_hook([this] (std::function<void ()> reclaim_fn) {
917 add_high_priority_task(make_task(default_scheduling_group(), [fn = std::move(reclaim_fn)] {
918 fn();
919 }));
920 });
921}
922
923reactor::~reactor() {
924 sigset_t mask;
925 sigemptyset(&mask);
926 sigaddset(&mask, cpu_stall_detector::signal_number());
927 auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
928 assert(r == 0);
929
930 _backend->stop_tick();
931 auto eraser = [](auto& list) {
932 while (!list.empty()) {
933 auto& timer = *list.begin();
934 timer.cancel();
935 }
936 };
937 eraser(_expired_timers);
938 eraser(_expired_lowres_timers);
939 eraser(_expired_manual_timers);
f67539c2 940 auto& sg_data = _scheduling_group_specific_data;
9f95a23c
TL
941 for (auto&& tq : _task_queues) {
942 if (tq) {
f67539c2 943 auto& this_sg = sg_data.per_scheduling_group_data[tq->_id];
9f95a23c
TL
944 // The following line will preserve the convention that constructor and destructor functions
945 // for the per sg values are called in the context of the containing scheduling group.
946 *internal::current_scheduling_group_ptr() = scheduling_group(tq->_id);
f67539c2
TL
947 for (size_t key : boost::irange<size_t>(0, sg_data.scheduling_group_key_configs.size())) {
948 void* val = this_sg.specific_vals[key];
9f95a23c 949 if (val) {
f67539c2
TL
950 if (sg_data.scheduling_group_key_configs[key].destructor) {
951 sg_data.scheduling_group_key_configs[key].destructor(val);
9f95a23c
TL
952 }
953 free(val);
f67539c2 954 this_sg.specific_vals[key] = nullptr;
9f95a23c
TL
955 }
956 }
957 }
958 }
11fdf7f2
TL
959}
960
961future<> reactor::readable(pollable_fd_state& fd) {
962 return _backend->readable(fd);
963}
964
965future<> reactor::writeable(pollable_fd_state& fd) {
966 return _backend->writeable(fd);
967}
968
969future<> reactor::readable_or_writeable(pollable_fd_state& fd) {
970 return _backend->readable_or_writeable(fd);
971}
972
11fdf7f2
TL
973void reactor::abort_reader(pollable_fd_state& fd) {
974 // TCP will respond to shutdown(SHUT_RD) by returning ECONNABORT on the next read,
975 // but UDP responds by returning AGAIN. The no_more_recv flag tells us to convert
976 // EAGAIN to ECONNABORT in that case.
977 fd.no_more_recv = true;
978 return fd.fd.shutdown(SHUT_RD);
979}
980
981void reactor::abort_writer(pollable_fd_state& fd) {
982 // TCP will respond to shutdown(SHUT_WR) by returning ECONNABORT on the next write,
983 // but UDP responds by returning AGAIN. The no_more_recv flag tells us to convert
984 // EAGAIN to ECONNABORT in that case.
985 fd.no_more_send = true;
986 return fd.fd.shutdown(SHUT_WR);
987}
988
989void reactor::set_strict_dma(bool value) {
990 _strict_o_direct = value;
991}
992
993void reactor::set_bypass_fsync(bool value) {
994 _bypass_fsync = value;
995}
996
997void
998reactor::reset_preemption_monitor() {
999 return _backend->reset_preemption_monitor();
1000}
1001
11fdf7f2
TL
1002void
1003reactor::request_preemption() {
1004 return _backend->request_preemption();
1005}
1006
9f95a23c
TL
1007void reactor::start_handling_signal() {
1008 return _backend->start_handling_signal();
11fdf7f2
TL
1009}
1010
f67539c2
TL
1011cpu_stall_detector::cpu_stall_detector(cpu_stall_detector_config cfg)
1012 : _shard_id(this_shard_id()) {
9f95a23c
TL
1013 // glib's backtrace() calls dlopen("libgcc_s.so.1") once to resolve unwind related symbols.
1014 // If first stall detector invocation happens during another dlopen() call the calling thread
1015 // will deadlock. The dummy call here makes sure that backtrace's initialization happens in
1016 // a safe place.
1017 backtrace([] (frame) {});
11fdf7f2
TL
1018 update_config(cfg);
1019 struct sigevent sev = {};
1020 sev.sigev_notify = SIGEV_THREAD_ID;
1021 sev.sigev_signo = signal_number();
1022 sev._sigev_un._tid = syscall(SYS_gettid);
1023 int err = timer_create(CLOCK_THREAD_CPUTIME_ID, &sev, &_timer);
1024 if (err) {
1025 throw std::system_error(std::error_code(err, std::system_category()));
1026 }
9f95a23c
TL
1027
1028 namespace sm = seastar::metrics;
1029
1030 _metrics.add_group("stall_detector", {
1031 sm::make_derive("reported", _total_reported, sm::description("Total number of reported stalls, look in the traces for the exact reason"))});
1032
1033
11fdf7f2
TL
1034 // note: if something is added here that can, it should take care to destroy _timer.
1035}
1036
1037cpu_stall_detector::~cpu_stall_detector() {
1038 timer_delete(_timer);
1039}
1040
1041cpu_stall_detector_config
1042cpu_stall_detector::get_config() const {
1043 return _config;
1044}
1045
1046void cpu_stall_detector::update_config(cpu_stall_detector_config cfg) {
1047 _config = cfg;
1048 _threshold = std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold);
1049 _slack = std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold * cfg.slack);
1050 _stall_detector_reports_per_minute = cfg.stall_detector_reports_per_minute;
1051 _max_reports_per_minute = cfg.stall_detector_reports_per_minute;
1052 _rearm_timer_at = std::chrono::steady_clock::now();
1053}
1054
1055void cpu_stall_detector::maybe_report() {
1056 if (_reported++ < _max_reports_per_minute) {
1057 generate_trace();
1058 }
1059}
1060// We use a tick at every timer firing so we can report suppressed backtraces.
1061// Best case it's a correctly predicted branch. If a backtrace had happened in
1062// the near past it's an increment and two branches.
1063//
1064// We can do it a cheaper if we don't report suppressed backtraces.
1065void cpu_stall_detector::on_signal() {
9f95a23c
TL
1066 auto tasks_processed = engine().tasks_processed();
1067 auto last_seen = _last_tasks_processed_seen.load(std::memory_order_relaxed);
1068 if (!last_seen) {
1069 return; // stall detector in not active
1070 } else if (last_seen == tasks_processed) { // no task was processed - report
11fdf7f2
TL
1071 maybe_report();
1072 _report_at <<= 1;
9f95a23c
TL
1073 } else {
1074 _last_tasks_processed_seen.store(tasks_processed, std::memory_order_relaxed);
11fdf7f2 1075 }
9f95a23c 1076 arm_timer();
11fdf7f2
TL
1077}
1078
1079void cpu_stall_detector::report_suppressions(std::chrono::steady_clock::time_point now) {
1080 if (now > _minute_mark + 60s) {
9f95a23c
TL
1081 if (_reported > _max_reports_per_minute) {
1082 auto suppressed = _reported - _max_reports_per_minute;
11fdf7f2
TL
1083 backtrace_buffer buf;
1084 // Reuse backtrace buffer infrastructure so we don't have to allocate here
9f95a23c
TL
1085 buf.append("Rate-limit: suppressed ");
1086 buf.append_decimal(suppressed);
1087 suppressed == 1 ? buf.append(" backtrace") : buf.append(" backtraces");
11fdf7f2
TL
1088 buf.append(" on shard ");
1089 buf.append_decimal(_shard_id);
1090 buf.append("\n");
1091 buf.flush();
1092 }
1093 _reported = 0;
1094 _minute_mark = now;
1095 }
1096}
1097
1098void cpu_stall_detector::arm_timer() {
1099 auto its = posix::to_relative_itimerspec(_threshold * _report_at + _slack, 0s);
1100 timer_settime(_timer, 0, &its, nullptr);
1101}
1102
1103void cpu_stall_detector::start_task_run(std::chrono::steady_clock::time_point now) {
1104 if (now > _rearm_timer_at) {
1105 report_suppressions(now);
1106 _report_at = 1;
1107 _run_started_at = now;
1108 _rearm_timer_at = now + _threshold * _report_at;
1109 arm_timer();
1110 }
9f95a23c 1111 _last_tasks_processed_seen.store(engine().tasks_processed(), std::memory_order_relaxed);
11fdf7f2
TL
1112 std::atomic_signal_fence(std::memory_order_release); // Don't delay this write, so the signal handler can see it
1113}
1114
1115void cpu_stall_detector::end_task_run(std::chrono::steady_clock::time_point now) {
1116 std::atomic_signal_fence(std::memory_order_acquire); // Don't hoist this write, so the signal handler can see it
9f95a23c 1117 _last_tasks_processed_seen.store(0, std::memory_order_relaxed);
11fdf7f2
TL
1118}
1119
1120void cpu_stall_detector::start_sleep() {
1121 auto its = posix::to_relative_itimerspec(0s, 0s);
1122 timer_settime(_timer, 0, &its, nullptr);
1123 _rearm_timer_at = std::chrono::steady_clock::now();
1124}
1125
1126void cpu_stall_detector::end_sleep() {
1127}
1128
1129void
1130reactor::task_quota_timer_thread_fn() {
1131 auto thread_name = seastar::format("timer-{}", _id);
1132 pthread_setname_np(pthread_self(), thread_name.c_str());
1133
1134 sigset_t mask;
9f95a23c 1135 sigfillset(&mask);
11fdf7f2
TL
1136 for (auto sig : { SIGSEGV }) {
1137 sigdelset(&mask, sig);
1138 }
1139 auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
1140 if (r) {
9f95a23c 1141 seastar_logger.error("Thread {}: failed to block signals. Aborting.", thread_name.c_str());
11fdf7f2
TL
1142 abort();
1143 }
1144
1145 // We need to wait until task quota is set before we can calculate how many ticks are to
1146 // a minute. Technically task_quota is used from many threads, but since it is read-only here
1147 // and only used during initialization we will avoid complicating the code.
1148 {
1149 uint64_t events;
1150 _task_quota_timer.read(&events, 8);
1151 request_preemption();
1152 }
1153
1154 while (!_dying.load(std::memory_order_relaxed)) {
1155 uint64_t events;
1156 _task_quota_timer.read(&events, 8);
1157 request_preemption();
1158
1159 // We're in a different thread, but guaranteed to be on the same core, so even
1160 // a signal fence is overdoing it
1161 std::atomic_signal_fence(std::memory_order_seq_cst);
1162 }
1163}
9f95a23c 1164void
11fdf7f2
TL
1165reactor::update_blocked_reactor_notify_ms(std::chrono::milliseconds ms) {
1166 auto cfg = _cpu_stall_detector->get_config();
1167 if (ms != cfg.threshold) {
1168 cfg.threshold = ms;
1169 _cpu_stall_detector->update_config(cfg);
1170 seastar_logger.info("updated: blocked-reactor-notify-ms={}", ms.count());
1171 }
1172}
1173
1174std::chrono::milliseconds
1175reactor::get_blocked_reactor_notify_ms() const {
1176 auto d = _cpu_stall_detector->get_config().threshold;
1177 return std::chrono::duration_cast<std::chrono::milliseconds>(d);
1178}
1179
1180void
1181reactor::set_stall_detector_report_function(std::function<void ()> report) {
1182 auto cfg = _cpu_stall_detector->get_config();
1183 cfg.report = std::move(report);
1184 _cpu_stall_detector->update_config(std::move(cfg));
1185}
1186
1187std::function<void ()>
1188reactor::get_stall_detector_report_function() const {
1189 return _cpu_stall_detector->get_config().report;
1190}
1191
1192void
1193reactor::block_notifier(int) {
9f95a23c 1194 engine()._cpu_stall_detector->on_signal();
11fdf7f2
TL
1195}
1196
1197void
1198cpu_stall_detector::generate_trace() {
1199 auto delta = std::chrono::steady_clock::now() - _run_started_at;
1200
9f95a23c 1201 _total_reported++;
11fdf7f2
TL
1202 if (_config.report) {
1203 _config.report();
1204 return;
1205 }
1206
1207 backtrace_buffer buf;
1208 buf.append("Reactor stalled for ");
1209 buf.append_decimal(uint64_t(delta / 1ms));
1210 buf.append(" ms");
1211 print_with_backtrace(buf);
1212}
1213
1214template <typename T, typename E, typename EnableFunc>
f67539c2 1215void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_fn) noexcept(noexcept(enable_fn())) {
11fdf7f2
TL
1216 expired_timers = timers.expire(timers.now());
1217 for (auto& t : expired_timers) {
1218 t._expired = true;
1219 }
f67539c2 1220 const auto prev_sg = current_scheduling_group();
11fdf7f2
TL
1221 while (!expired_timers.empty()) {
1222 auto t = &*expired_timers.begin();
1223 expired_timers.pop_front();
1224 t->_queued = false;
1225 if (t->_armed) {
1226 t->_armed = false;
1227 if (t->_period) {
1228 t->readd_periodic();
1229 }
1230 try {
f67539c2 1231 *internal::current_scheduling_group_ptr() = t->_sg;
11fdf7f2
TL
1232 t->_callback();
1233 } catch (...) {
1234 seastar_logger.error("Timer callback failed: {}", std::current_exception());
1235 }
1236 }
1237 }
f67539c2
TL
1238 // complete_timers() can be called from the context of run_tasks()
1239 // as well so we need to restore the previous scheduling group (set by run_tasks()).
1240 *internal::current_scheduling_group_ptr() = prev_sg;
11fdf7f2
TL
1241 enable_fn();
1242}
1243
1244#ifdef HAVE_OSV
1245void reactor::timer_thread_func() {
1246 sched::timer tmr(*sched::thread::current());
1247 WITH_LOCK(_timer_mutex) {
1248 while (!_stopped) {
1249 if (_timer_due != 0) {
1250 set_timer(tmr, _timer_due);
1251 _timer_cond.wait(_timer_mutex, &tmr);
1252 if (tmr.expired()) {
1253 _timer_due = 0;
1254 _engine_thread->unsafe_stop();
1255 _pending_tasks.push_front(make_task(default_scheduling_group(), [this] {
1256 complete_timers(_timers, _expired_timers, [this] {
1257 if (!_timers.empty()) {
1258 enable_timer(_timers.get_next_timeout());
1259 }
1260 });
1261 }));
1262 _engine_thread->wake();
1263 } else {
1264 tmr.cancel();
1265 }
1266 } else {
1267 _timer_cond.wait(_timer_mutex);
1268 }
1269 }
1270 }
1271}
1272
1273void reactor::set_timer(sched::timer &tmr, s64 t) {
1274 using namespace osv::clock;
1275 tmr.set(wall::time_point(std::chrono::nanoseconds(t)));
1276}
1277#endif
1278
1279class network_stack_registry {
1280public:
1281 using options = boost::program_options::variables_map;
1282private:
1283 static std::unordered_map<sstring,
9f95a23c 1284 noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)>>& _map() {
11fdf7f2 1285 static std::unordered_map<sstring,
9f95a23c 1286 noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)>> map;
11fdf7f2
TL
1287 return map;
1288 }
1289 static sstring& _default() {
1290 static sstring def;
1291 return def;
1292 }
1293public:
1294 static boost::program_options::options_description& options_description() {
1295 static boost::program_options::options_description opts;
1296 return opts;
1297 }
1298 static void register_stack(sstring name, boost::program_options::options_description opts,
9f95a23c 1299 noncopyable_function<future<std::unique_ptr<network_stack>>(options opts)> create,
11fdf7f2
TL
1300 bool make_default);
1301 static sstring default_stack();
1302 static std::vector<sstring> list();
1303 static future<std::unique_ptr<network_stack>> create(options opts);
1304 static future<std::unique_ptr<network_stack>> create(sstring name, options opts);
1305};
1306
1307void reactor::configure(boost::program_options::variables_map vm) {
9f95a23c 1308 _network_stack_ready = vm.count("network-stack")
11fdf7f2
TL
1309 ? network_stack_registry::create(sstring(vm["network-stack"].as<std::string>()), vm)
1310 : network_stack_registry::create(vm);
11fdf7f2
TL
1311
1312 _handle_sigint = !vm.count("no-handle-interrupt");
1313 auto task_quota = vm["task-quota-ms"].as<double>() * 1ms;
1314 _task_quota = std::chrono::duration_cast<sched_clock::duration>(task_quota);
1315
1316 auto blocked_time = vm["blocked-reactor-notify-ms"].as<unsigned>() * 1ms;
1317 cpu_stall_detector_config csdc;
1318 csdc.threshold = blocked_time;
1319 csdc.stall_detector_reports_per_minute = vm["blocked-reactor-reports-per-minute"].as<unsigned>();
1320 _cpu_stall_detector->update_config(csdc);
1321
1322 _max_task_backlog = vm["max-task-backlog"].as<unsigned>();
1323 _max_poll_time = vm["idle-poll-time-us"].as<unsigned>() * 1us;
1324 if (vm.count("poll-mode")) {
1325 _max_poll_time = std::chrono::nanoseconds::max();
1326 }
1327 if (vm.count("overprovisioned")
1328 && vm["idle-poll-time-us"].defaulted()
1329 && !vm.count("poll-mode")) {
1330 _max_poll_time = 0us;
1331 }
1332 set_strict_dma(!vm.count("relaxed-dma"));
1333 if (!vm["poll-aio"].as<bool>()
1334 || (vm["poll-aio"].defaulted() && vm.count("overprovisioned"))) {
1335 _aio_eventfd = pollable_fd(file_desc::eventfd(0, 0));
1336 }
1337 set_bypass_fsync(vm["unsafe-bypass-fsync"].as<bool>());
1338 _force_io_getevents_syscall = vm["force-aio-syscalls"].as<bool>();
9f95a23c
TL
1339 aio_nowait_supported = vm["linux-aio-nowait"].as<bool>();
1340 _have_aio_fsync = vm["aio-fsync"].as<bool>();
11fdf7f2
TL
1341}
1342
9f95a23c
TL
1343pollable_fd
1344reactor::posix_listen(socket_address sa, listen_options opts) {
1345 auto specific_protocol = (int)(opts.proto);
1346 if (sa.is_af_unix()) {
1347 // no type-safe way to create listen_opts with proto=0
1348 specific_protocol = 0;
1349 }
1350 static auto somaxconn = [] {
f67539c2 1351 std::optional<int> result;
9f95a23c
TL
1352 std::ifstream ifs("/proc/sys/net/core/somaxconn");
1353 if (ifs) {
1354 result = 0;
1355 ifs >> *result;
1356 }
1357 return result;
1358 }();
1359 if (somaxconn && *somaxconn < opts.listen_backlog) {
1360 fmt::print(
1361 "Warning: /proc/sys/net/core/somaxconn is set to {:d} "
1362 "which is lower than the backlog parameter {:d} used for listen(), "
1363 "please change it with `sysctl -w net.core.somaxconn={:d}`\n",
1364 *somaxconn, opts.listen_backlog, opts.listen_backlog);
11fdf7f2 1365 }
9f95a23c
TL
1366
1367 file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, specific_protocol);
1368 if (opts.reuse_address) {
1369 fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1);
11fdf7f2 1370 }
9f95a23c
TL
1371 if (_reuseport && !sa.is_af_unix())
1372 fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);
11fdf7f2 1373
9f95a23c
TL
1374 try {
1375 fd.bind(sa.u.sa, sa.length());
1376 fd.listen(opts.listen_backlog);
1377 } catch (const std::system_error& s) {
1378 throw std::system_error(s.code(), fmt::format("posix_listen failed for address {}", sa));
11fdf7f2 1379 }
11fdf7f2 1380
11fdf7f2
TL
1381 return pollable_fd(std::move(fd));
1382}
1383
1384bool
1385reactor::posix_reuseport_detect() {
1386 return false; // FIXME: reuseport currently leads to heavy load imbalance. Until we fix that, just
1387 // disable it unconditionally.
1388 try {
1389 file_desc fd = file_desc::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
1390 fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);
1391 return true;
1392 } catch(std::system_error& e) {
1393 return false;
1394 }
1395}
1396
9f95a23c
TL
1397void pollable_fd_state::maybe_no_more_recv() {
1398 if (no_more_recv) {
11fdf7f2
TL
1399 throw std::system_error(std::error_code(ECONNABORTED, std::system_category()));
1400 }
1401}
1402
9f95a23c
TL
1403void pollable_fd_state::maybe_no_more_send() {
1404 if (no_more_send) {
11fdf7f2
TL
1405 throw std::system_error(std::error_code(ECONNABORTED, std::system_category()));
1406 }
1407}
1408
9f95a23c
TL
1409void pollable_fd_state::forget() {
1410 engine()._backend->forget(*this);
1411}
1412
1413void intrusive_ptr_release(pollable_fd_state* fd) {
1414 if (!--fd->_refs) {
1415 fd->forget();
1416 }
1417}
1418
1419pollable_fd::pollable_fd(file_desc fd, pollable_fd::speculation speculate)
1420 : _s(engine()._backend->make_pollable_fd_state(std::move(fd), speculate))
1421{}
1422
1423void pollable_fd::shutdown(int how) {
1424 engine()._backend->shutdown(*_s, how);
1425}
1426
f67539c2 1427pollable_fd
9f95a23c
TL
1428reactor::make_pollable_fd(socket_address sa, int proto) {
1429 file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, proto);
f67539c2 1430 return pollable_fd(std::move(fd));
11fdf7f2
TL
1431}
1432
1433future<>
f67539c2 1434reactor::posix_connect(pollable_fd pfd, socket_address sa, socket_address local) {
9f95a23c
TL
1435#ifdef IP_BIND_ADDRESS_NO_PORT
1436 if (!sa.is_af_unix()) {
1437 try {
1438 // do not reserve an ephemeral port when using bind() with port number 0.
1439 // connect() will handle it later. The reason for that is that bind() may fail
1440 // to allocate a port while connect will success, this is because bind() does not
1441 // know dst address and has to find globally unique local port.
f67539c2 1442 pfd.get_file_desc().setsockopt(SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
9f95a23c
TL
1443 } catch (std::system_error& err) {
1444 if (err.code() != std::error_code(ENOPROTOOPT, std::system_category())) {
1445 throw;
1446 }
11fdf7f2 1447 }
9f95a23c
TL
1448 }
1449#endif
1450 if (!local.is_wildcard()) {
1451 // call bind() only if local address is not wildcard
f67539c2 1452 pfd.get_file_desc().bind(local.u.sa, local.length());
9f95a23c 1453 }
f67539c2 1454 return pfd.connect(sa).finally([pfd] {});
11fdf7f2
TL
1455}
1456
1457server_socket
1458reactor::listen(socket_address sa, listen_options opt) {
1459 return server_socket(_network_stack->listen(sa, opt));
1460}
1461
1462future<connected_socket>
1463reactor::connect(socket_address sa) {
1464 return _network_stack->connect(sa);
1465}
1466
1467future<connected_socket>
1468reactor::connect(socket_address sa, socket_address local, transport proto) {
1469 return _network_stack->connect(sa, local, proto);
1470}
1471
9f95a23c
TL
1472sstring io_request::opname() const {
1473 switch (_op) {
1474 case io_request::operation::fdatasync:
1475 return "fdatasync";
1476 case io_request::operation::write:
1477 return "write";
1478 case io_request::operation::writev:
1479 return "vectored write";
1480 case io_request::operation::read:
1481 return "read";
1482 case io_request::operation::readv:
1483 return "vectored read";
1484 case io_request::operation::recv:
1485 return "recv";
1486 case io_request::operation::recvmsg:
1487 return "recvmsg";
1488 case io_request::operation::send:
1489 return "send";
1490 case io_request::operation::sendmsg:
1491 return "sendmsg";
1492 case io_request::operation::accept:
1493 return "accept";
1494 case io_request::operation::connect:
1495 return "connect";
1496 case io_request::operation::poll_add:
1497 return "poll add";
1498 case io_request::operation::poll_remove:
1499 return "poll remove";
1500 case io_request::operation::cancel:
1501 return "cancel";
1502 }
1503 std::abort();
11fdf7f2
TL
1504}
1505
f67539c2
TL
1506void io_completion::complete_with(ssize_t res) {
1507 if (res >= 0) {
1508 complete(res);
1509 return;
1510 }
1511
1512 ++engine()._io_stats.aio_errors;
1513 try {
1514 throw_kernel_error(res);
1515 } catch (...) {
1516 set_exception(std::current_exception());
1517 }
1518}
1519
11fdf7f2 1520void
f67539c2 1521reactor::submit_io(io_completion* desc, io_request req) noexcept {
9f95a23c 1522 req.attach_kernel_completion(desc);
f67539c2
TL
1523 try {
1524 _pending_io.push_back(std::move(req));
1525 } catch (...) {
1526 desc->set_exception(std::current_exception());
1527 }
11fdf7f2
TL
1528}
1529
1530bool
1531reactor::flush_pending_aio() {
1532 for (auto& ioq : my_io_queues) {
1533 ioq->poll_io_queue();
1534 }
9f95a23c 1535 return false;
11fdf7f2
TL
1536}
1537
f67539c2
TL
1538bool
1539reactor::reap_kernel_completions() {
1540 return _backend->reap_kernel_completions();
1541}
1542
11fdf7f2
TL
1543const io_priority_class& default_priority_class() {
1544 static thread_local auto shard_default_class = [] {
1545 return engine().register_one_priority_class("default", 1);
1546 }();
1547 return shard_default_class;
1548}
1549
9f95a23c 1550future<size_t>
f67539c2 1551reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
11fdf7f2
TL
1552 ++_io_stats.aio_reads;
1553 _io_stats.aio_read_bytes += len;
9f95a23c 1554 return ioq->queue_request(pc, len, std::move(req));
11fdf7f2
TL
1555}
1556
9f95a23c 1557future<size_t>
f67539c2 1558reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
11fdf7f2
TL
1559 ++_io_stats.aio_writes;
1560 _io_stats.aio_write_bytes += len;
9f95a23c 1561 return ioq->queue_request(pc, len, std::move(req));
11fdf7f2
TL
1562}
1563
9f95a23c 1564namespace internal {
11fdf7f2 1565
9f95a23c
TL
1566size_t sanitize_iovecs(std::vector<iovec>& iov, size_t disk_alignment) noexcept {
1567 if (iov.size() > IOV_MAX) {
1568 iov.resize(IOV_MAX);
11fdf7f2 1569 }
9f95a23c
TL
1570 auto length = boost::accumulate(iov | boost::adaptors::transformed(std::mem_fn(&iovec::iov_len)), size_t(0));
1571 while (auto rest = length & (disk_alignment - 1)) {
1572 if (iov.back().iov_len <= rest) {
1573 length -= iov.back().iov_len;
1574 iov.pop_back();
11fdf7f2 1575 } else {
9f95a23c
TL
1576 iov.back().iov_len -= rest;
1577 length -= rest;
11fdf7f2 1578 }
11fdf7f2 1579 }
9f95a23c 1580 return length;
11fdf7f2
TL
1581}
1582
11fdf7f2
TL
1583}
1584
1585future<file>
f67539c2
TL
1586reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_options options) noexcept {
1587 return do_with(static_cast<int>(flags), std::move(options), [this, nameref] (auto& open_flags, file_open_options& options) {
1588 sstring name(nameref);
1589 return _thread_pool->submit<syscall_result<int>>([name, &open_flags, &options, strict_o_direct = _strict_o_direct, bypass_fsync = _bypass_fsync] () mutable {
1590 // We want O_DIRECT, except in two cases:
1591 // - tmpfs (which doesn't support it, but works fine anyway)
1592 // - strict_o_direct == false (where we forgive it being not supported)
1593 // Because open() with O_DIRECT will fail, we open it without O_DIRECT, try
1594 // to update it to O_DIRECT with fcntl(), and if that fails, see if we
1595 // can forgive it.
1596 auto is_tmpfs = [] (int fd) {
1597 struct ::statfs buf;
1598 auto r = ::fstatfs(fd, &buf);
1599 if (r == -1) {
1600 return false;
1601 }
1602 return buf.f_type == 0x01021994; // TMPFS_MAGIC
1603 };
1604 open_flags |= O_CLOEXEC;
1605 if (bypass_fsync) {
1606 open_flags &= ~O_DSYNC;
11fdf7f2 1607 }
f67539c2
TL
1608 auto mode = static_cast<mode_t>(options.create_permissions);
1609 int fd = ::open(name.c_str(), open_flags, mode);
1610 if (fd == -1) {
1611 return wrap_syscall<int>(fd);
11fdf7f2 1612 }
f67539c2
TL
1613 int r = ::fcntl(fd, F_SETFL, open_flags | O_DIRECT);
1614 auto maybe_ret = wrap_syscall<int>(r); // capture errno (should be EINVAL)
1615 if (r == -1 && strict_o_direct && !is_tmpfs(fd)) {
1616 ::close(fd);
1617 return maybe_ret;
1618 }
1619 if (fd != -1) {
1620 fsxattr attr = {};
1621 if (options.extent_allocation_size_hint) {
1622 attr.fsx_xflags |= XFS_XFLAG_EXTSIZE;
1623 attr.fsx_extsize = options.extent_allocation_size_hint;
1624 }
1625 // Ignore error; may be !xfs, and just a hint anyway
1626 ::ioctl(fd, XFS_IOC_FSSETXATTR, &attr);
1627 }
1628 return wrap_syscall<int>(fd);
1629 }).then([&options, name = std::move(name), &open_flags] (syscall_result<int> sr) {
1630 sr.throw_fs_exception_if_error("open failed", name);
1631 return make_file_impl(sr.result, options, open_flags);
1632 }).then([] (shared_ptr<file_impl> impl) {
1633 return make_ready_future<file>(std::move(impl));
1634 });
11fdf7f2
TL
1635 });
1636}
1637
1638future<>
f67539c2
TL
1639reactor::remove_file(std::string_view pathname) noexcept {
1640 // Allocating memory for a sstring can throw, hence the futurize_invoke
1641 return futurize_invoke([pathname] {
1642 return engine()._thread_pool->submit<syscall_result<int>>([pathname = sstring(pathname)] {
1643 return wrap_syscall<int>(::remove(pathname.c_str()));
1644 }).then([pathname = sstring(pathname)] (syscall_result<int> sr) {
1645 sr.throw_fs_exception_if_error("remove failed", pathname);
1646 return make_ready_future<>();
1647 });
11fdf7f2
TL
1648 });
1649}
1650
1651future<>
f67539c2
TL
1652reactor::rename_file(std::string_view old_pathname, std::string_view new_pathname) noexcept {
1653 // Allocating memory for a sstring can throw, hence the futurize_invoke
1654 return futurize_invoke([old_pathname, new_pathname] {
1655 return engine()._thread_pool->submit<syscall_result<int>>([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] {
1656 return wrap_syscall<int>(::rename(old_pathname.c_str(), new_pathname.c_str()));
1657 }).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
1658 sr.throw_fs_exception_if_error("rename failed", old_pathname, new_pathname);
1659 return make_ready_future<>();
1660 });
11fdf7f2
TL
1661 });
1662}
1663
1664future<>
f67539c2
TL
1665reactor::link_file(std::string_view oldpath, std::string_view newpath) noexcept {
1666 // Allocating memory for a sstring can throw, hence the futurize_invoke
1667 return futurize_invoke([oldpath, newpath] {
1668 return engine()._thread_pool->submit<syscall_result<int>>([oldpath = sstring(oldpath), newpath = sstring(newpath)] {
1669 return wrap_syscall<int>(::link(oldpath.c_str(), newpath.c_str()));
1670 }).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result<int> sr) {
1671 sr.throw_fs_exception_if_error("link failed", oldpath, newpath);
1672 return make_ready_future<>();
1673 });
11fdf7f2
TL
1674 });
1675}
1676
9f95a23c 1677future<>
f67539c2 1678reactor::chmod(std::string_view name, file_permissions permissions) noexcept {
9f95a23c 1679 auto mode = static_cast<mode_t>(permissions);
f67539c2
TL
1680 // Allocating memory for a sstring can throw, hence the futurize_invoke
1681 return futurize_invoke([name, mode, this] {
1682 return _thread_pool->submit<syscall_result<int>>([name = sstring(name), mode] {
1683 return wrap_syscall<int>(::chmod(name.c_str(), mode));
1684 }).then([name = sstring(name), mode] (syscall_result<int> sr) {
1685 if (sr.result == -1) {
1686 auto reason = format("chmod(0{:o}) failed", mode);
1687 sr.throw_fs_exception(reason, fs::path(name));
1688 }
1689 return make_ready_future<>();
1690 });
9f95a23c
TL
1691 });
1692}
1693
11fdf7f2
TL
1694directory_entry_type stat_to_entry_type(__mode_t type) {
1695 if (S_ISDIR(type)) {
1696 return directory_entry_type::directory;
1697 }
1698 if (S_ISBLK(type)) {
1699 return directory_entry_type::block_device;
1700 }
1701 if (S_ISCHR(type)) {
9f95a23c 1702 return directory_entry_type::char_device;
11fdf7f2
TL
1703 }
1704 if (S_ISFIFO(type)) {
1705 return directory_entry_type::fifo;
1706 }
1707 if (S_ISLNK(type)) {
1708 return directory_entry_type::link;
1709 }
9f95a23c
TL
1710 if (S_ISSOCK(type)) {
1711 return directory_entry_type::socket;
1712 }
1713 if (S_ISREG(type)) {
1714 return directory_entry_type::regular;
1715 }
1716 return directory_entry_type::unknown;
11fdf7f2
TL
1717}
1718
f67539c2
TL
1719future<std::optional<directory_entry_type>>
1720reactor::file_type(std::string_view name, follow_symlink follow) noexcept {
1721 // Allocating memory for a sstring can throw, hence the futurize_invoke
1722 return futurize_invoke([name, follow, this] {
1723 return _thread_pool->submit<syscall_result_extra<struct stat>>([name = sstring(name), follow] {
1724 struct stat st;
1725 auto stat_syscall = follow ? stat : lstat;
1726 auto ret = stat_syscall(name.c_str(), &st);
1727 return wrap_syscall(ret, st);
1728 }).then([name = sstring(name)] (syscall_result_extra<struct stat> sr) {
1729 if (long(sr.result) == -1) {
1730 if (sr.error != ENOENT && sr.error != ENOTDIR) {
1731 sr.throw_fs_exception_if_error("stat failed", name);
1732 }
1733 return make_ready_future<std::optional<directory_entry_type> >
1734 (std::optional<directory_entry_type>() );
11fdf7f2 1735 }
f67539c2
TL
1736 return make_ready_future<std::optional<directory_entry_type> >
1737 (std::optional<directory_entry_type>(stat_to_entry_type(sr.extra.st_mode)) );
1738 });
11fdf7f2
TL
1739 });
1740}
1741
f67539c2
TL
1742future<std::optional<directory_entry_type>>
1743file_type(std::string_view name, follow_symlink follow) noexcept {
1744 return engine().file_type(name, follow);
1745}
1746
9f95a23c
TL
1747static std::chrono::system_clock::time_point
1748timespec_to_time_point(const timespec& ts) {
1749 auto d = std::chrono::duration_cast<std::chrono::system_clock::duration>(
1750 ts.tv_sec * 1s + ts.tv_nsec * 1ns);
1751 return std::chrono::system_clock::time_point(d);
1752}
1753
f67539c2
TL
1754future<struct stat>
1755reactor::fstat(int fd) noexcept {
1756 return _thread_pool->submit<syscall_result_extra<struct stat>>([fd] {
11fdf7f2 1757 struct stat st;
f67539c2 1758 auto ret = ::fstat(fd, &st);
11fdf7f2 1759 return wrap_syscall(ret, st);
f67539c2
TL
1760 }).then([] (syscall_result_extra<struct stat> ret) {
1761 ret.throw_if_error();
1762 return make_ready_future<struct stat>(ret.extra);
1763 });
1764}
1765
1766future<int>
1767reactor::inotify_add_watch(int fd, std::string_view path, uint32_t flags) {
1768 // Allocating memory for a sstring can throw, hence the futurize_invoke
1769 return futurize_invoke([path, fd, flags, this] {
1770 return _thread_pool->submit<syscall_result<int>>([fd, path = sstring(path), flags] {
1771 auto ret = ::inotify_add_watch(fd, path.c_str(), flags);
1772 return wrap_syscall(ret);
1773 }).then([] (syscall_result<int> ret) {
1774 ret.throw_if_error();
1775 return make_ready_future<int>(ret.result);
1776 });
1777 });
1778}
1779
1780future<stat_data>
1781reactor::file_stat(std::string_view pathname, follow_symlink follow) noexcept {
1782 // Allocating memory for a sstring can throw, hence the futurize_invoke
1783 return futurize_invoke([pathname, follow, this] {
1784 return _thread_pool->submit<syscall_result_extra<struct stat>>([pathname = sstring(pathname), follow] {
1785 struct stat st;
1786 auto stat_syscall = follow ? stat : lstat;
1787 auto ret = stat_syscall(pathname.c_str(), &st);
1788 return wrap_syscall(ret, st);
1789 }).then([pathname = sstring(pathname)] (syscall_result_extra<struct stat> sr) {
1790 sr.throw_fs_exception_if_error("stat failed", pathname);
1791 struct stat& st = sr.extra;
1792 stat_data sd;
1793 sd.device_id = st.st_dev;
1794 sd.inode_number = st.st_ino;
1795 sd.mode = st.st_mode;
1796 sd.type = stat_to_entry_type(st.st_mode);
1797 sd.number_of_links = st.st_nlink;
1798 sd.uid = st.st_uid;
1799 sd.gid = st.st_gid;
1800 sd.rdev = st.st_rdev;
1801 sd.size = st.st_size;
1802 sd.block_size = st.st_blksize;
1803 sd.allocated_size = st.st_blocks * 512UL;
1804 sd.time_accessed = timespec_to_time_point(st.st_atim);
1805 sd.time_modified = timespec_to_time_point(st.st_mtim);
1806 sd.time_changed = timespec_to_time_point(st.st_ctim);
1807 return make_ready_future<stat_data>(std::move(sd));
1808 });
9f95a23c
TL
1809 });
1810}
1811
1812future<uint64_t>
f67539c2 1813reactor::file_size(std::string_view pathname) noexcept {
9f95a23c
TL
1814 return file_stat(pathname, follow_symlink::yes).then([] (stat_data sd) {
1815 return make_ready_future<uint64_t>(sd.size);
11fdf7f2
TL
1816 });
1817}
1818
1819future<bool>
f67539c2
TL
1820reactor::file_accessible(std::string_view pathname, access_flags flags) noexcept {
1821 // Allocating memory for a sstring can throw, hence the futurize_invoke
1822 return futurize_invoke([pathname, flags, this] {
1823 return _thread_pool->submit<syscall_result<int>>([pathname = sstring(pathname), flags] {
1824 auto aflags = std::underlying_type_t<access_flags>(flags);
1825 auto ret = ::access(pathname.c_str(), aflags);
1826 return wrap_syscall(ret);
1827 }).then([pathname = sstring(pathname), flags] (syscall_result<int> sr) {
1828 if (sr.result < 0) {
1829 if ((sr.error == ENOENT && flags == access_flags::exists) ||
1830 (sr.error == EACCES && flags != access_flags::exists)) {
1831 return make_ready_future<bool>(false);
1832 }
1833 sr.throw_fs_exception("access failed", fs::path(pathname));
9f95a23c 1834 }
9f95a23c 1835
f67539c2
TL
1836 return make_ready_future<bool>(true);
1837 });
11fdf7f2
TL
1838 });
1839}
1840
1841future<fs_type>
f67539c2
TL
1842reactor::file_system_at(std::string_view pathname) noexcept {
1843 // Allocating memory for a sstring can throw, hence the futurize_invoke
1844 return futurize_invoke([pathname, this] {
1845 return _thread_pool->submit<syscall_result_extra<struct statfs>>([pathname = sstring(pathname)] {
1846 struct statfs st;
1847 auto ret = statfs(pathname.c_str(), &st);
1848 return wrap_syscall(ret, st);
1849 }).then([pathname = sstring(pathname)] (syscall_result_extra<struct statfs> sr) {
1850 static std::unordered_map<long int, fs_type> type_mapper = {
1851 { 0x58465342, fs_type::xfs },
1852 { EXT2_SUPER_MAGIC, fs_type::ext2 },
1853 { EXT3_SUPER_MAGIC, fs_type::ext3 },
1854 { EXT4_SUPER_MAGIC, fs_type::ext4 },
1855 { BTRFS_SUPER_MAGIC, fs_type::btrfs },
1856 { 0x4244, fs_type::hfs },
1857 { TMPFS_MAGIC, fs_type::tmpfs },
1858 };
1859 sr.throw_fs_exception_if_error("statfs failed", pathname);
1860
1861 fs_type ret = fs_type::other;
1862 if (type_mapper.count(sr.extra.f_type) != 0) {
1863 ret = type_mapper.at(sr.extra.f_type);
1864 }
1865 return make_ready_future<fs_type>(ret);
1866 });
1867 });
1868}
1869
1870future<struct statfs>
1871reactor::fstatfs(int fd) noexcept {
1872 return _thread_pool->submit<syscall_result_extra<struct statfs>>([fd] {
11fdf7f2 1873 struct statfs st;
f67539c2 1874 auto ret = ::fstatfs(fd, &st);
11fdf7f2 1875 return wrap_syscall(ret, st);
f67539c2
TL
1876 }).then([] (syscall_result_extra<struct statfs> sr) {
1877 sr.throw_if_error();
1878 struct statfs st = sr.extra;
1879 return make_ready_future<struct statfs>(std::move(st));
11fdf7f2
TL
1880 });
1881}
1882
1883future<struct statvfs>
f67539c2
TL
1884reactor::statvfs(std::string_view pathname) noexcept {
1885 // Allocating memory for a sstring can throw, hence the futurize_invoke
1886 return futurize_invoke([pathname, this] {
1887 return _thread_pool->submit<syscall_result_extra<struct statvfs>>([pathname = sstring(pathname)] {
1888 struct statvfs st;
1889 auto ret = ::statvfs(pathname.c_str(), &st);
1890 return wrap_syscall(ret, st);
1891 }).then([pathname = sstring(pathname)] (syscall_result_extra<struct statvfs> sr) {
1892 sr.throw_fs_exception_if_error("statvfs failed", pathname);
1893 struct statvfs st = sr.extra;
1894 return make_ready_future<struct statvfs>(std::move(st));
1895 });
11fdf7f2
TL
1896 });
1897}
1898
1899future<file>
f67539c2
TL
1900reactor::open_directory(std::string_view name) noexcept {
1901 // Allocating memory for a sstring can throw, hence the futurize_invoke
1902 return futurize_invoke([name, this] {
1903 auto oflags = O_DIRECTORY | O_CLOEXEC | O_RDONLY;
1904 return _thread_pool->submit<syscall_result<int>>([name = sstring(name), oflags] {
1905 return wrap_syscall<int>(::open(name.c_str(), oflags));
1906 }).then([name = sstring(name), oflags] (syscall_result<int> sr) {
1907 sr.throw_fs_exception_if_error("open failed", name);
1908 return make_file_impl(sr.result, file_open_options(), oflags);
1909 }).then([] (shared_ptr<file_impl> file_impl) {
1910 return make_ready_future<file>(std::move(file_impl));
1911 });
11fdf7f2
TL
1912 });
1913}
1914
1915future<>
f67539c2
TL
1916reactor::make_directory(std::string_view name, file_permissions permissions) noexcept {
1917 // Allocating memory for a sstring can throw, hence the futurize_invoke
1918 return futurize_invoke([name, permissions, this] {
1919 return _thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
1920 auto mode = static_cast<mode_t>(permissions);
1921 return wrap_syscall<int>(::mkdir(name.c_str(), mode));
1922 }).then([name = sstring(name)] (syscall_result<int> sr) {
1923 sr.throw_fs_exception_if_error("mkdir failed", name);
1924 });
11fdf7f2
TL
1925 });
1926}
1927
1928future<>
f67539c2
TL
1929reactor::touch_directory(std::string_view name, file_permissions permissions) noexcept {
1930 // Allocating memory for a sstring can throw, hence the futurize_invoke
1931 return futurize_invoke([name, permissions] {
1932 return engine()._thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
1933 auto mode = static_cast<mode_t>(permissions);
1934 return wrap_syscall<int>(::mkdir(name.c_str(), mode));
1935 }).then([name = sstring(name)] (syscall_result<int> sr) {
1936 if (sr.result == -1 && sr.error != EEXIST) {
1937 sr.throw_fs_exception("mkdir failed", fs::path(name));
1938 }
1939 return make_ready_future<>();
1940 });
11fdf7f2 1941 });
11fdf7f2
TL
1942}
1943
1944future<>
f67539c2 1945reactor::fdatasync(int fd) noexcept {
9f95a23c
TL
1946 ++_fsyncs;
1947 if (_bypass_fsync) {
11fdf7f2 1948 return make_ready_future<>();
9f95a23c
TL
1949 }
1950 if (_have_aio_fsync) {
f67539c2
TL
1951 // Does not go through the I/O queue, but has to be deleted
1952 struct fsync_io_desc final : public io_completion {
1953 promise<> _pr;
1954 public:
1955 virtual void complete(size_t res) noexcept override {
1956 _pr.set_value();
1957 delete this;
1958 }
11fdf7f2 1959
f67539c2
TL
1960 virtual void set_exception(std::exception_ptr eptr) noexcept override {
1961 _pr.set_exception(std::move(eptr));
1962 delete this;
1963 }
11fdf7f2 1964
f67539c2
TL
1965 future<> get_future() {
1966 return _pr.get_future();
1967 }
1968 };
11fdf7f2 1969
f67539c2
TL
1970 return futurize_invoke([this, fd] {
1971 auto desc = new fsync_io_desc;
1972 auto fut = desc->get_future();
9f95a23c 1973 auto req = io_request::make_fdatasync(fd);
f67539c2 1974 submit_io(desc, std::move(req));
9f95a23c 1975 return fut;
f67539c2 1976 });
9f95a23c
TL
1977 }
1978 return _thread_pool->submit<syscall_result<int>>([fd] {
1979 return wrap_syscall<int>(::fdatasync(fd));
1980 }).then([] (syscall_result<int> sr) {
11fdf7f2 1981 sr.throw_if_error();
9f95a23c 1982 return make_ready_future<>();
11fdf7f2
TL
1983 });
1984}
1985
f67539c2
TL
1986// Note: terminate if arm_highres_timer throws
1987// `when` should always be valid
1988void reactor::enable_timer(steady_clock_type::time_point when) noexcept
11fdf7f2
TL
1989{
1990#ifndef HAVE_OSV
1991 itimerspec its;
1992 its.it_interval = {};
1993 its.it_value = to_timespec(when);
1994 _backend->arm_highres_timer(its);
1995#else
1996 using ns = std::chrono::nanoseconds;
1997 WITH_LOCK(_timer_mutex) {
1998 _timer_due = std::chrono::duration_cast<ns>(when.time_since_epoch()).count();
1999 _timer_cond.wake_one();
2000 }
2001#endif
2002}
2003
f67539c2 2004void reactor::add_timer(timer<steady_clock_type>* tmr) noexcept {
11fdf7f2
TL
2005 if (queue_timer(tmr)) {
2006 enable_timer(_timers.get_next_timeout());
2007 }
2008}
2009
f67539c2 2010bool reactor::queue_timer(timer<steady_clock_type>* tmr) noexcept {
11fdf7f2
TL
2011 return _timers.insert(*tmr);
2012}
2013
f67539c2 2014void reactor::del_timer(timer<steady_clock_type>* tmr) noexcept {
11fdf7f2
TL
2015 if (tmr->_expired) {
2016 _expired_timers.erase(_expired_timers.iterator_to(*tmr));
2017 tmr->_expired = false;
2018 } else {
2019 _timers.remove(*tmr);
2020 }
2021}
2022
f67539c2 2023void reactor::add_timer(timer<lowres_clock>* tmr) noexcept {
11fdf7f2
TL
2024 if (queue_timer(tmr)) {
2025 _lowres_next_timeout = _lowres_timers.get_next_timeout();
2026 }
2027}
2028
f67539c2 2029bool reactor::queue_timer(timer<lowres_clock>* tmr) noexcept {
11fdf7f2
TL
2030 return _lowres_timers.insert(*tmr);
2031}
2032
f67539c2 2033void reactor::del_timer(timer<lowres_clock>* tmr) noexcept {
11fdf7f2
TL
2034 if (tmr->_expired) {
2035 _expired_lowres_timers.erase(_expired_lowres_timers.iterator_to(*tmr));
2036 tmr->_expired = false;
2037 } else {
2038 _lowres_timers.remove(*tmr);
2039 }
2040}
2041
f67539c2 2042void reactor::add_timer(timer<manual_clock>* tmr) noexcept {
11fdf7f2
TL
2043 queue_timer(tmr);
2044}
2045
f67539c2 2046bool reactor::queue_timer(timer<manual_clock>* tmr) noexcept {
11fdf7f2
TL
2047 return _manual_timers.insert(*tmr);
2048}
2049
f67539c2 2050void reactor::del_timer(timer<manual_clock>* tmr) noexcept {
11fdf7f2
TL
2051 if (tmr->_expired) {
2052 _expired_manual_timers.erase(_expired_manual_timers.iterator_to(*tmr));
2053 tmr->_expired = false;
2054 } else {
2055 _manual_timers.remove(*tmr);
2056 }
2057}
2058
9f95a23c 2059void reactor::at_exit(noncopyable_function<future<> ()> func) {
11fdf7f2
TL
2060 assert(!_stopping);
2061 _exit_funcs.push_back(std::move(func));
2062}
2063
2064future<> reactor::run_exit_tasks() {
2065 _stop_requested.broadcast();
2066 _stopping = true;
2067 stop_aio_eventfd_loop();
2068 return do_for_each(_exit_funcs.rbegin(), _exit_funcs.rend(), [] (auto& func) {
2069 return func();
2070 });
2071}
2072
2073void reactor::stop() {
f67539c2 2074 assert(_id == 0);
11fdf7f2
TL
2075 smp::cleanup_cpu();
2076 if (!_stopping) {
9f95a23c
TL
2077 // Run exit tasks locally and then stop all other engines
2078 // in the background and wait on semaphore for all to complete.
2079 // Finally, set _stopped on cpu 0.
2080 (void)run_exit_tasks().then([this] {
2081 return do_with(semaphore(0), [this] (semaphore& sem) {
2082 // Stop other cpus asynchronously, signal when done.
2083 (void)smp::invoke_on_others(0, [] {
2084 smp::cleanup_cpu();
2085 return engine().run_exit_tasks().then([] {
2086 engine()._stopped = true;
11fdf7f2 2087 });
9f95a23c
TL
2088 }).then([&sem]() {
2089 sem.signal();
2090 });
2091 return sem.wait().then([this] {
11fdf7f2
TL
2092 _stopped = true;
2093 });
2094 });
2095 });
2096 }
2097}
2098
2099void reactor::exit(int ret) {
9f95a23c
TL
2100 // Run stop() asynchronously on cpu 0.
2101 (void)smp::submit_to(0, [this, ret] { _return = ret; stop(); });
11fdf7f2
TL
2102}
2103
2104uint64_t
2105reactor::pending_task_count() const {
2106 uint64_t ret = 0;
2107 for (auto&& tq : _task_queues) {
2108 ret += tq->_q.size();
2109 }
2110 return ret;
2111}
2112
2113uint64_t
2114reactor::tasks_processed() const {
9f95a23c 2115 return _global_tasks_processed;
11fdf7f2
TL
2116}
2117
2118void reactor::register_metrics() {
2119
2120 namespace sm = seastar::metrics;
2121
2122 _metric_groups.add_group("reactor", {
2123 sm::make_gauge("tasks_pending", std::bind(&reactor::pending_task_count, this), sm::description("Number of pending tasks in the queue")),
2124 // total_operations value:DERIVE:0:U
2125 sm::make_derive("tasks_processed", std::bind(&reactor::tasks_processed, this), sm::description("Total tasks processed")),
9f95a23c 2126 sm::make_derive("polls", _polls, sm::description("Number of times pollers were executed")),
11fdf7f2
TL
2127 sm::make_derive("timers_pending", std::bind(&decltype(_timers)::size, &_timers), sm::description("Number of tasks in the timer-pending queue")),
2128 sm::make_gauge("utilization", [this] { return (1-_load) * 100; }, sm::description("CPU utilization")),
2129 sm::make_derive("cpu_busy_ms", [this] () -> int64_t { return total_busy_time() / 1ms; },
2130 sm::description("Total cpu busy time in milliseconds")),
2131 sm::make_derive("cpu_steal_time_ms", [this] () -> int64_t { return total_steal_time() / 1ms; },
2132 sm::description("Total steal time, the time in which some other process was running while Seastar was not trying to run (not sleeping)."
2133 "Because this is in userspace, some time that could be legitimally thought as steal time is not accounted as such. For example, if we are sleeping and can wake up but the kernel hasn't woken us up yet.")),
2134 // total_operations value:DERIVE:0:U
2135 sm::make_derive("aio_reads", _io_stats.aio_reads, sm::description("Total aio-reads operations")),
2136
2137 sm::make_total_bytes("aio_bytes_read", _io_stats.aio_read_bytes, sm::description("Total aio-reads bytes")),
2138 // total_operations value:DERIVE:0:U
2139 sm::make_derive("aio_writes", _io_stats.aio_writes, sm::description("Total aio-writes operations")),
2140 sm::make_total_bytes("aio_bytes_write", _io_stats.aio_write_bytes, sm::description("Total aio-writes bytes")),
2141 sm::make_derive("aio_errors", _io_stats.aio_errors, sm::description("Total aio errors")),
2142 // total_operations value:DERIVE:0:U
2143 sm::make_derive("fsyncs", _fsyncs, sm::description("Total number of fsync operations")),
2144 // total_operations value:DERIVE:0:U
2145 sm::make_derive("io_threaded_fallbacks", std::bind(&thread_pool::operation_count, _thread_pool.get()),
2146 sm::description("Total number of io-threaded-fallbacks operations")),
2147
2148 });
2149
2150 _metric_groups.add_group("memory", {
2151 sm::make_derive("malloc_operations", [] { return memory::stats().mallocs(); },
2152 sm::description("Total number of malloc operations")),
2153 sm::make_derive("free_operations", [] { return memory::stats().frees(); }, sm::description("Total number of free operations")),
2154 sm::make_derive("cross_cpu_free_operations", [] { return memory::stats().cross_cpu_frees(); }, sm::description("Total number of cross cpu free")),
2155 sm::make_gauge("malloc_live_objects", [] { return memory::stats().live_objects(); }, sm::description("Number of live objects")),
2156 sm::make_current_bytes("free_memory", [] { return memory::stats().free_memory(); }, sm::description("Free memeory size in bytes")),
2157 sm::make_current_bytes("total_memory", [] { return memory::stats().total_memory(); }, sm::description("Total memeory size in bytes")),
2158 sm::make_current_bytes("allocated_memory", [] { return memory::stats().allocated_memory(); }, sm::description("Allocated memeory size in bytes")),
2159 sm::make_derive("reclaims_operations", [] { return memory::stats().reclaims(); }, sm::description("Total reclaims operations"))
2160 });
2161
2162 _metric_groups.add_group("reactor", {
2163 sm::make_derive("logging_failures", [] { return logging_failures; }, sm::description("Total number of logging failures")),
2164 // total_operations value:DERIVE:0:U
2165 sm::make_derive("cpp_exceptions", _cxx_exceptions, sm::description("Total number of C++ exceptions")),
9f95a23c 2166 sm::make_derive("abandoned_failed_futures", _abandoned_failed_futures, sm::description("Total number of abandoned failed futures, futures destroyed while still containing an exception")),
11fdf7f2
TL
2167 });
2168
11fdf7f2
TL
2169 using namespace seastar::metrics;
2170 _metric_groups.add_group("reactor", {
2171 make_counter("fstream_reads", _io_stats.fstream_reads,
2172 description(
2173 "Counts reads from disk file streams. A high rate indicates high disk activity."
2174 " Contrast with other fstream_read* counters to locate bottlenecks.")),
2175 make_derive("fstream_read_bytes", _io_stats.fstream_read_bytes,
2176 description(
2177 "Counts bytes read from disk file streams. A high rate indicates high disk activity."
2178 " Divide by fstream_reads to determine average read size.")),
2179 make_counter("fstream_reads_blocked", _io_stats.fstream_reads_blocked,
2180 description(
2181 "Counts the number of times a disk read could not be satisfied from read-ahead buffers, and had to block."
2182 " Indicates short streams, or incorrect read ahead configuration.")),
2183 make_derive("fstream_read_bytes_blocked", _io_stats.fstream_read_bytes_blocked,
2184 description(
2185 "Counts the number of bytes read from disk that could not be satisfied from read-ahead buffers, and had to block."
2186 " Indicates short streams, or incorrect read ahead configuration.")),
2187 make_counter("fstream_reads_aheads_discarded", _io_stats.fstream_read_aheads_discarded,
2188 description(
2189 "Counts the number of times a buffer that was read ahead of time and was discarded because it was not needed, wasting disk bandwidth."
2190 " Indicates over-eager read ahead configuration.")),
2191 make_derive("fstream_reads_ahead_bytes_discarded", _io_stats.fstream_read_ahead_discarded_bytes,
2192 description(
2193 "Counts the number of buffered bytes that were read ahead of time and were discarded because they were not needed, wasting disk bandwidth."
2194 " Indicates over-eager read ahead configuration.")),
2195 });
2196}
2197
2198void reactor::run_tasks(task_queue& tq) {
2199 // Make sure new tasks will inherit our scheduling group
2200 *internal::current_scheduling_group_ptr() = scheduling_group(tq._id);
2201 auto& tasks = tq._q;
2202 while (!tasks.empty()) {
9f95a23c 2203 auto tsk = tasks.front();
11fdf7f2
TL
2204 tasks.pop_front();
2205 STAP_PROBE(seastar, reactor_run_tasks_single_start);
2206 task_histogram_add_task(*tsk);
f67539c2 2207 _current_task = tsk;
11fdf7f2 2208 tsk->run_and_dispose();
f67539c2 2209 _current_task = nullptr;
11fdf7f2
TL
2210 STAP_PROBE(seastar, reactor_run_tasks_single_end);
2211 ++tq._tasks_processed;
9f95a23c 2212 ++_global_tasks_processed;
11fdf7f2
TL
2213 // check at end of loop, to allow at least one task to run
2214 if (need_preempt()) {
2215 if (tasks.size() <= _max_task_backlog) {
2216 break;
2217 } else {
2218 // While need_preempt() is set, task execution is inefficient due to
2219 // need_preempt() checks breaking out of loops and .then() calls. See
2220 // #302.
2221 reset_preemption_monitor();
2222 }
2223 }
2224 }
2225}
2226
2227#ifdef SEASTAR_SHUFFLE_TASK_QUEUE
9f95a23c 2228void reactor::shuffle(task*& t, task_queue& q) {
11fdf7f2
TL
2229 static thread_local std::mt19937 gen = std::mt19937(std::default_random_engine()());
2230 std::uniform_int_distribution<size_t> tasks_dist{0, q._q.size() - 1};
2231 auto& to_swap = q._q[tasks_dist(gen)];
2232 std::swap(to_swap, t);
2233}
2234#endif
2235
2236void reactor::force_poll() {
2237 request_preemption();
2238}
2239
2240bool
2241reactor::flush_tcp_batches() {
2242 bool work = _flush_batching.size();
2243 while (!_flush_batching.empty()) {
2244 auto os = std::move(_flush_batching.front());
2245 _flush_batching.pop_front();
2246 os->poll_flush();
2247 }
2248 return work;
2249}
2250
2251bool
f67539c2 2252reactor::do_expire_lowres_timers() noexcept {
11fdf7f2
TL
2253 if (_lowres_next_timeout == lowres_clock::time_point()) {
2254 return false;
2255 }
2256 auto now = lowres_clock::now();
2257 if (now >= _lowres_next_timeout) {
f67539c2 2258 complete_timers(_lowres_timers, _expired_lowres_timers, [this] () noexcept {
11fdf7f2
TL
2259 if (!_lowres_timers.empty()) {
2260 _lowres_next_timeout = _lowres_timers.get_next_timeout();
2261 } else {
2262 _lowres_next_timeout = lowres_clock::time_point();
2263 }
2264 });
2265 return true;
2266 }
2267 return false;
2268}
2269
2270void
f67539c2
TL
2271reactor::expire_manual_timers() noexcept {
2272 complete_timers(_manual_timers, _expired_manual_timers, [] () noexcept {});
11fdf7f2
TL
2273}
2274
2275void
f67539c2 2276manual_clock::expire_timers() noexcept {
11fdf7f2
TL
2277 local_engine->expire_manual_timers();
2278}
2279
2280void
f67539c2 2281manual_clock::advance(manual_clock::duration d) noexcept {
11fdf7f2
TL
2282 _now.fetch_add(d.count());
2283 if (local_engine) {
2284 schedule_urgent(make_task(default_scheduling_group(), &manual_clock::expire_timers));
9f95a23c
TL
2285 // Expire timers on all cores in the background.
2286 (void)smp::invoke_on_all(&manual_clock::expire_timers);
11fdf7f2
TL
2287 }
2288}
2289
2290bool
f67539c2 2291reactor::do_check_lowres_timers() const noexcept {
11fdf7f2
TL
2292 if (_lowres_next_timeout == lowres_clock::time_point()) {
2293 return false;
2294 }
2295 return lowres_clock::now() > _lowres_next_timeout;
2296}
2297
2298#ifndef HAVE_OSV
2299
f67539c2 2300class reactor::kernel_submit_work_pollfn final : public simple_pollfn<true> {
11fdf7f2
TL
2301 reactor& _r;
2302public:
9f95a23c 2303 kernel_submit_work_pollfn(reactor& r) : _r(r) {}
11fdf7f2 2304 virtual bool poll() override final {
9f95a23c 2305 return _r._backend->kernel_submit_work();
11fdf7f2 2306 }
11fdf7f2
TL
2307};
2308
2309#endif
2310
2311class reactor::signal_pollfn final : public reactor::pollfn {
2312 reactor& _r;
2313public:
2314 signal_pollfn(reactor& r) : _r(r) {}
2315 virtual bool poll() final override {
2316 return _r._signals.poll_signal();
2317 }
2318 virtual bool pure_poll() override final {
2319 return _r._signals.pure_poll_signal();
2320 }
2321 virtual bool try_enter_interrupt_mode() override {
2322 // Signals will interrupt our epoll_pwait() call, but
2323 // disable them now to avoid a signal between this point
2324 // and epoll_pwait()
2325 sigset_t block_all;
2326 sigfillset(&block_all);
2327 ::pthread_sigmask(SIG_SETMASK, &block_all, &_r._active_sigmask);
2328 if (poll()) {
2329 // raced already, and lost
2330 exit_interrupt_mode();
2331 return false;
2332 }
2333 return true;
2334 }
2335 virtual void exit_interrupt_mode() override final {
2336 ::pthread_sigmask(SIG_SETMASK, &_r._active_sigmask, nullptr);
2337 }
2338};
2339
f67539c2 2340class reactor::batch_flush_pollfn final : public simple_pollfn<true> {
11fdf7f2
TL
2341 reactor& _r;
2342public:
2343 batch_flush_pollfn(reactor& r) : _r(r) {}
2344 virtual bool poll() final override {
2345 return _r.flush_tcp_batches();
2346 }
11fdf7f2
TL
2347};
2348
9f95a23c
TL
2349class reactor::reap_kernel_completions_pollfn final : public reactor::pollfn {
2350 reactor& _r;
2351public:
2352 reap_kernel_completions_pollfn(reactor& r) : _r(r) {}
2353 virtual bool poll() final override {
f67539c2 2354 return _r.reap_kernel_completions();
9f95a23c
TL
2355 }
2356 virtual bool pure_poll() override final {
2357 return poll(); // actually performs work, but triggers no user continuations, so okay
2358 }
2359 virtual bool try_enter_interrupt_mode() override {
2360 return _r._backend->kernel_events_can_sleep();
2361 }
2362 virtual void exit_interrupt_mode() override final {
2363 }
2364};
2365
f67539c2 2366class reactor::io_queue_submission_pollfn final : public simple_pollfn<true> {
11fdf7f2
TL
2367 reactor& _r;
2368public:
9f95a23c 2369 io_queue_submission_pollfn(reactor& r) : _r(r) {}
11fdf7f2
TL
2370 virtual bool poll() final override {
2371 return _r.flush_pending_aio();
2372 }
11fdf7f2
TL
2373};
2374
f67539c2
TL
2375// Other cpus can queue items for us to free; and they won't notify
2376// us about them. But it's okay to ignore those items, freeing them
2377// doesn't have any side effects.
2378//
2379// We'll take care of those items when we wake up for another reason.
2380class reactor::drain_cross_cpu_freelist_pollfn final : public simple_pollfn<true> {
11fdf7f2
TL
2381public:
2382 virtual bool poll() final override {
2383 return memory::drain_cross_cpu_freelist();
2384 }
11fdf7f2
TL
2385};
2386
2387class reactor::lowres_timer_pollfn final : public reactor::pollfn {
2388 reactor& _r;
2389 // A highres timer is implemented as a waking signal; so
2390 // we arm one when we have a lowres timer during sleep, so
2391 // it can wake us up.
2392 timer<> _nearest_wakeup { [this] { _armed = false; } };
2393 bool _armed = false;
2394public:
2395 lowres_timer_pollfn(reactor& r) : _r(r) {}
2396 virtual bool poll() final override {
2397 return _r.do_expire_lowres_timers();
2398 }
2399 virtual bool pure_poll() final override {
2400 return _r.do_check_lowres_timers();
2401 }
2402 virtual bool try_enter_interrupt_mode() override {
2403 // arm our highres timer so a signal will wake us up
2404 auto next = _r._lowres_next_timeout;
2405 if (next == lowres_clock::time_point()) {
2406 // no pending timers
2407 return true;
2408 }
2409 auto now = lowres_clock::now();
2410 if (next <= now) {
2411 // whoops, go back
2412 return false;
2413 }
2414 _nearest_wakeup.arm(next - now);
2415 _armed = true;
2416 return true;
2417 }
2418 virtual void exit_interrupt_mode() override final {
2419 if (_armed) {
2420 _nearest_wakeup.cancel();
2421 _armed = false;
2422 }
2423 }
2424};
2425
2426class reactor::smp_pollfn final : public reactor::pollfn {
2427 reactor& _r;
2428public:
2429 smp_pollfn(reactor& r) : _r(r) {}
2430 virtual bool poll() final override {
2431 return (smp::poll_queues() |
2432 alien::smp::poll_queues());
2433 }
2434 virtual bool pure_poll() final override {
2435 return (smp::pure_poll_queues() ||
2436 alien::smp::pure_poll_queues());
2437 }
2438 virtual bool try_enter_interrupt_mode() override {
2439 // systemwide_memory_barrier() is very slow if run concurrently,
2440 // so don't go to sleep if it is running now.
2441 _r._sleeping.store(true, std::memory_order_relaxed);
2442 bool barrier_done = try_systemwide_memory_barrier();
2443 if (!barrier_done) {
2444 _r._sleeping.store(false, std::memory_order_relaxed);
2445 return false;
2446 }
2447 if (poll()) {
2448 // raced
2449 _r._sleeping.store(false, std::memory_order_relaxed);
2450 return false;
2451 }
2452 return true;
2453 }
2454 virtual void exit_interrupt_mode() override final {
2455 _r._sleeping.store(false, std::memory_order_relaxed);
2456 }
2457};
2458
2459class reactor::execution_stage_pollfn final : public reactor::pollfn {
2460 internal::execution_stage_manager& _esm;
2461public:
2462 execution_stage_pollfn() : _esm(internal::execution_stage_manager::get()) { }
2463
2464 virtual bool poll() override {
2465 return _esm.flush();
2466 }
2467 virtual bool pure_poll() override {
2468 return _esm.poll();
2469 }
2470 virtual bool try_enter_interrupt_mode() override {
2471 // This is a passive poller, so if a previous poll
2472 // returned false (idle), there's no more work to do.
2473 return true;
2474 }
2475 virtual void exit_interrupt_mode() override { }
2476};
2477
2478class reactor::syscall_pollfn final : public reactor::pollfn {
2479 reactor& _r;
2480public:
2481 syscall_pollfn(reactor& r) : _r(r) {}
2482 virtual bool poll() final override {
2483 return _r._thread_pool->complete();
2484 }
2485 virtual bool pure_poll() override final {
2486 return poll(); // actually performs work, but triggers no user continuations, so okay
2487 }
2488 virtual bool try_enter_interrupt_mode() override {
2489 _r._thread_pool->enter_interrupt_mode();
2490 if (poll()) {
2491 // raced
2492 _r._thread_pool->exit_interrupt_mode();
2493 return false;
2494 }
2495 return true;
2496 }
2497 virtual void exit_interrupt_mode() override final {
2498 _r._thread_pool->exit_interrupt_mode();
2499 }
2500};
2501
11fdf7f2
TL
2502void
2503reactor::wakeup() {
2504 uint64_t one = 1;
2505 ::write(_notify_eventfd.get(), &one, sizeof(one));
2506}
2507
2508void reactor::start_aio_eventfd_loop() {
2509 if (!_aio_eventfd) {
2510 return;
2511 }
2512 future<> loop_done = repeat([this] {
2513 return _aio_eventfd->readable().then([this] {
2514 char garbage[8];
2515 ::read(_aio_eventfd->get_fd(), garbage, 8); // totally uninteresting
2516 return _stopping ? stop_iteration::yes : stop_iteration::no;
2517 });
2518 });
2519 // must use make_lw_shared, because at_exit expects a copyable function
2520 at_exit([loop_done = make_lw_shared(std::move(loop_done))] {
2521 return std::move(*loop_done);
2522 });
2523}
2524
2525void reactor::stop_aio_eventfd_loop() {
2526 if (!_aio_eventfd) {
2527 return;
2528 }
2529 uint64_t one = 1;
2530 ::write(_aio_eventfd->get_fd(), &one, 8);
2531}
2532
2533inline
2534bool
2535reactor::have_more_tasks() const {
2536 return _active_task_queues.size() + _activating_task_queues.size();
2537}
2538
2539void reactor::insert_active_task_queue(task_queue* tq) {
2540 tq->_active = true;
2541 auto& atq = _active_task_queues;
2542 auto less = task_queue::indirect_compare();
2543 if (atq.empty() || less(atq.back(), tq)) {
2544 // Common case: idle->working
2545 // Common case: CPU intensive task queue going to the back
2546 atq.push_back(tq);
2547 } else {
2548 // Common case: newly activated queue preempting everything else
2549 atq.push_front(tq);
2550 // Less common case: newly activated queue behind something already active
2551 size_t i = 0;
2552 while (i + 1 != atq.size() && !less(atq[i], atq[i+1])) {
2553 std::swap(atq[i], atq[i+1]);
2554 ++i;
2555 }
2556 }
2557}
2558
f67539c2
TL
2559reactor::task_queue* reactor::pop_active_task_queue(sched_clock::time_point now) {
2560 task_queue* tq = _active_task_queues.front();
2561 _active_task_queues.pop_front();
2562 tq->_starvetime += now - tq->_ts;
2563 return tq;
2564}
2565
11fdf7f2
TL
2566void
2567reactor::insert_activating_task_queues() {
2568 // Quadratic, but since we expect the common cases in insert_active_task_queue() to dominate, faster
2569 for (auto&& tq : _activating_task_queues) {
2570 insert_active_task_queue(tq);
2571 }
2572 _activating_task_queues.clear();
2573}
2574
2575void
2576reactor::run_some_tasks() {
2577 if (!have_more_tasks()) {
2578 return;
2579 }
2580 sched_print("run_some_tasks: start");
2581 reset_preemption_monitor();
2582
2583 sched_clock::time_point t_run_completed = std::chrono::steady_clock::now();
2584 STAP_PROBE(seastar, reactor_run_tasks_start);
2585 _cpu_stall_detector->start_task_run(t_run_completed);
2586 do {
2587 auto t_run_started = t_run_completed;
2588 insert_activating_task_queues();
f67539c2 2589 task_queue* tq = pop_active_task_queue(t_run_started);
11fdf7f2
TL
2590 sched_print("running tq {} {}", (void*)tq, tq->_name);
2591 tq->_current = true;
2592 _last_vruntime = std::max(tq->_vruntime, _last_vruntime);
2593 run_tasks(*tq);
2594 tq->_current = false;
2595 t_run_completed = std::chrono::steady_clock::now();
2596 auto delta = t_run_completed - t_run_started;
2597 account_runtime(*tq, delta);
2598 sched_print("run complete ({} {}); time consumed {} usec; final vruntime {} empty {}",
2599 (void*)tq, tq->_name, delta / 1us, tq->_vruntime, tq->_q.empty());
f67539c2 2600 tq->_ts = t_run_completed;
11fdf7f2
TL
2601 if (!tq->_q.empty()) {
2602 insert_active_task_queue(tq);
2603 } else {
2604 tq->_active = false;
2605 }
2606 } while (have_more_tasks() && !need_preempt());
2607 _cpu_stall_detector->end_task_run(t_run_completed);
2608 STAP_PROBE(seastar, reactor_run_tasks_end);
2609 *internal::current_scheduling_group_ptr() = default_scheduling_group(); // Prevent inheritance from last group run
2610 sched_print("run_some_tasks: end");
2611}
2612
2613void
2614reactor::activate(task_queue& tq) {
2615 if (tq._active) {
2616 return;
2617 }
2618 sched_print("activating {} {}", (void*)&tq, tq._name);
2619 // If activate() was called, the task queue is likely network-bound or I/O bound, not CPU-bound. As
2620 // such its vruntime will be low, and it will have a large advantage over other task queues. Limit
2621 // the advantage so it doesn't dominate scheduling for a long time, in case it _does_ become CPU
2622 // bound later.
2623 //
2624 // FIXME: different scheduling groups have different sensitivity to jitter, take advantage
2625 if (_last_vruntime > tq._vruntime) {
2626 sched_print("tq {} {} losing vruntime {} due to sleep", (void*)&tq, tq._name, _last_vruntime - tq._vruntime);
2627 }
2628 tq._vruntime = std::max(_last_vruntime, tq._vruntime);
f67539c2
TL
2629 auto now = std::chrono::steady_clock::now();
2630 tq._waittime += now - tq._ts;
2631 tq._ts = now;
11fdf7f2
TL
2632 _activating_task_queues.push_back(&tq);
2633}
2634
f67539c2
TL
2635void reactor::service_highres_timer() noexcept {
2636 complete_timers(_timers, _expired_timers, [this] () noexcept {
11fdf7f2
TL
2637 if (!_timers.empty()) {
2638 enable_timer(_timers.get_next_timeout());
2639 }
2640 });
2641}
2642
2643int reactor::run() {
9f95a23c
TL
2644#ifndef SEASTAR_ASAN_ENABLED
2645 // SIGSTKSZ is too small when using asan. We also don't need to
2646 // handle SIGSEGV ourselves when using asan, so just don't install
2647 // a signal handler stack.
11fdf7f2 2648 auto signal_stack = install_signal_handler_stack();
9f95a23c
TL
2649#else
2650 (void)install_signal_handler_stack;
2651#endif
11fdf7f2
TL
2652
2653 register_metrics();
2654
9f95a23c
TL
2655 // The order in which we execute the pollers is very important for performance.
2656 //
2657 // This is because events that are generated in one poller may feed work into others. If
2658 // they were reversed, we'd only be able to do that work in the next task quota.
2659 //
2660 // One example is the relationship between the smp poller and the I/O submission poller:
2661 // If the smp poller runs first, requests from remote I/O queues can be dispatched right away
2662 //
2663 // We will run the pollers in the following order:
2664 //
2665 // 1. SMP: any remote event arrives before anything else
2666 // 2. reap kernel events completion: storage related completions may free up space in the I/O
2667 // queue.
2668 // 4. I/O queue: must be after reap, to free up events. If new slots are freed may submit I/O
2669 // 5. kernel submission: for I/O, will submit what was generated from last step.
2670 // 6. reap kernel events completion: some of the submissions from last step may return immediately.
2671 // For example if we are dealing with poll() on a fd that has events.
f67539c2 2672 poller smp_poller(std::make_unique<smp_pollfn>(*this));
9f95a23c
TL
2673
2674 poller reap_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this));
2675 poller io_queue_submission_poller(std::make_unique<io_queue_submission_pollfn>(*this));
2676 poller kernel_submit_work_poller(std::make_unique<kernel_submit_work_pollfn>(*this));
2677 poller final_real_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this));
11fdf7f2
TL
2678
2679 poller batch_flush_poller(std::make_unique<batch_flush_pollfn>(*this));
2680 poller execution_stage_poller(std::make_unique<execution_stage_pollfn>());
2681
2682 start_aio_eventfd_loop();
2683
9f95a23c 2684 if (_id == 0 && _cfg.auto_handle_sigint_sigterm) {
11fdf7f2
TL
2685 if (_handle_sigint) {
2686 _signals.handle_signal_once(SIGINT, [this] { stop(); });
2687 }
2688 _signals.handle_signal_once(SIGTERM, [this] { stop(); });
2689 }
2690
9f95a23c
TL
2691 // Start initialization in the background.
2692 // Communicate when done using _start_promise.
2693 (void)_cpu_started.wait(smp::count).then([this] {
2694 (void)_network_stack->initialize().then([this] {
11fdf7f2
TL
2695 _start_promise.set_value();
2696 });
2697 });
9f95a23c
TL
2698 // Wait for network stack in the background and then signal all cpus.
2699 (void)_network_stack_ready->then([this] (std::unique_ptr<network_stack> stack) {
11fdf7f2 2700 _network_stack = std::move(stack);
9f95a23c
TL
2701 return smp::invoke_on_all([] {
2702 engine()._cpu_started.signal();
2703 });
11fdf7f2
TL
2704 });
2705
2706 poller syscall_poller(std::make_unique<syscall_pollfn>(*this));
2707
2708 poller drain_cross_cpu_freelist(std::make_unique<drain_cross_cpu_freelist_pollfn>());
2709
11fdf7f2
TL
2710 poller expire_lowres_timers(std::make_unique<lowres_timer_pollfn>(*this));
2711 poller sig_poller(std::make_unique<signal_pollfn>(*this));
2712
2713 using namespace std::chrono_literals;
2714 timer<lowres_clock> load_timer;
2715 auto last_idle = _total_idle;
2716 auto idle_start = sched_clock::now(), idle_end = idle_start;
2717 load_timer.set_callback([this, &last_idle, &idle_start, &idle_end] () mutable {
2718 _total_idle += idle_end - idle_start;
2719 auto load = double((_total_idle - last_idle).count()) / double(std::chrono::duration_cast<sched_clock::duration>(1s).count());
2720 last_idle = _total_idle;
2721 load = std::min(load, 1.0);
2722 idle_start = idle_end;
2723 _loads.push_front(load);
2724 if (_loads.size() > 5) {
2725 auto drop = _loads.back();
2726 _loads.pop_back();
2727 _load -= (drop/5);
2728 }
2729 _load += (load/5);
2730 });
2731 load_timer.arm_periodic(1s);
2732
2733 itimerspec its = seastar::posix::to_relative_itimerspec(_task_quota, _task_quota);
2734 _task_quota_timer.timerfd_settime(0, its);
2735 auto& task_quote_itimerspec = its;
2736
2737 struct sigaction sa_block_notifier = {};
2738 sa_block_notifier.sa_handler = &reactor::block_notifier;
2739 sa_block_notifier.sa_flags = SA_RESTART;
2740 auto r = sigaction(cpu_stall_detector::signal_number(), &sa_block_notifier, nullptr);
2741 assert(r == 0);
2742
2743 bool idle = false;
2744
2745 std::function<bool()> check_for_work = [this] () {
9f95a23c 2746 return poll_once() || have_more_tasks();
11fdf7f2
TL
2747 };
2748 std::function<bool()> pure_check_for_work = [this] () {
9f95a23c 2749 return pure_poll_once() || have_more_tasks();
11fdf7f2
TL
2750 };
2751 while (true) {
2752 run_some_tasks();
2753 if (_stopped) {
2754 load_timer.cancel();
2755 // Final tasks may include sending the last response to cpu 0, so run them
2756 while (have_more_tasks()) {
2757 run_some_tasks();
2758 }
2759 while (!_at_destroy_tasks->_q.empty()) {
2760 run_tasks(*_at_destroy_tasks);
2761 }
9f95a23c 2762 _finished_running_tasks = true;
11fdf7f2
TL
2763 smp::arrive_at_event_loop_end();
2764 if (_id == 0) {
2765 smp::join_all();
2766 }
2767 break;
2768 }
2769
9f95a23c 2770 _polls++;
11fdf7f2
TL
2771
2772 if (check_for_work()) {
2773 if (idle) {
2774 _total_idle += idle_end - idle_start;
2775 account_idle(idle_end - idle_start);
2776 idle_start = idle_end;
2777 idle = false;
2778 }
2779 } else {
2780 idle_end = sched_clock::now();
2781 if (!idle) {
2782 idle_start = idle_end;
2783 idle = true;
2784 }
2785 bool go_to_sleep = true;
2786 try {
2787 // we can't run check_for_work(), because that can run tasks in the context
2788 // of the idle handler which change its state, without the idle handler expecting
2789 // it. So run pure_check_for_work() instead.
2790 auto handler_result = _idle_cpu_handler(pure_check_for_work);
2791 go_to_sleep = handler_result == idle_cpu_handler_result::no_more_work;
2792 } catch (...) {
2793 report_exception("Exception while running idle cpu handler", std::current_exception());
2794 }
2795 if (go_to_sleep) {
2796 internal::cpu_relax();
2797 if (idle_end - idle_start > _max_poll_time) {
2798 // Turn off the task quota timer to avoid spurious wakeups
2799 struct itimerspec zero_itimerspec = {};
2800 _task_quota_timer.timerfd_settime(0, zero_itimerspec);
2801 auto start_sleep = sched_clock::now();
2802 _cpu_stall_detector->start_sleep();
2803 sleep();
2804 _cpu_stall_detector->end_sleep();
2805 // We may have slept for a while, so freshen idle_end
2806 idle_end = sched_clock::now();
2807 _total_sleep += idle_end - start_sleep;
2808 _task_quota_timer.timerfd_settime(0, task_quote_itimerspec);
2809 }
2810 } else {
2811 // We previously ran pure_check_for_work(), might not actually have performed
2812 // any work.
2813 check_for_work();
2814 }
2815 }
2816 }
2817 // To prevent ordering issues from rising, destroy the I/O queue explicitly at this point.
2818 // This is needed because the reactor is destroyed from the thread_local destructors. If
2819 // the I/O queue happens to use any other infrastructure that is also kept this way (for
2820 // instance, collectd), we will not have any way to guarantee who is destroyed first.
2821 my_io_queues.clear();
2822 return _return;
2823}
2824
2825void
2826reactor::sleep() {
2827 for (auto i = _pollers.begin(); i != _pollers.end(); ++i) {
2828 auto ok = (*i)->try_enter_interrupt_mode();
2829 if (!ok) {
2830 while (i != _pollers.begin()) {
2831 (*--i)->exit_interrupt_mode();
2832 }
2833 return;
2834 }
2835 }
9f95a23c
TL
2836
2837 _backend->wait_and_process_events(&_active_sigmask);
2838
11fdf7f2
TL
2839 for (auto i = _pollers.rbegin(); i != _pollers.rend(); ++i) {
2840 (*i)->exit_interrupt_mode();
2841 }
2842}
2843
11fdf7f2
TL
2844bool
2845reactor::poll_once() {
2846 bool work = false;
2847 for (auto c : _pollers) {
2848 work |= c->poll();
2849 }
2850
2851 return work;
2852}
2853
2854bool
2855reactor::pure_poll_once() {
2856 for (auto c : _pollers) {
2857 if (c->pure_poll()) {
2858 return true;
2859 }
2860 }
2861 return false;
2862}
2863
f67539c2
TL
2864namespace internal {
2865
2866class poller::registration_task final : public task {
11fdf7f2
TL
2867private:
2868 poller* _p;
2869public:
2870 explicit registration_task(poller* p) : _p(p) {}
2871 virtual void run_and_dispose() noexcept override {
2872 if (_p) {
2873 engine().register_poller(_p->_pollfn.get());
2874 _p->_registration_task = nullptr;
2875 }
2876 delete this;
2877 }
f67539c2 2878 task* waiting_task() noexcept override { return nullptr; }
11fdf7f2
TL
2879 void cancel() {
2880 _p = nullptr;
2881 }
2882 void moved(poller* p) {
2883 _p = p;
2884 }
2885};
2886
f67539c2 2887class poller::deregistration_task final : public task {
11fdf7f2
TL
2888private:
2889 std::unique_ptr<pollfn> _p;
2890public:
2891 explicit deregistration_task(std::unique_ptr<pollfn>&& p) : _p(std::move(p)) {}
2892 virtual void run_and_dispose() noexcept override {
2893 engine().unregister_poller(_p.get());
2894 delete this;
2895 }
f67539c2 2896 task* waiting_task() noexcept override { return nullptr; }
11fdf7f2
TL
2897};
2898
f67539c2
TL
2899}
2900
11fdf7f2
TL
2901void reactor::register_poller(pollfn* p) {
2902 _pollers.push_back(p);
2903}
2904
2905void reactor::unregister_poller(pollfn* p) {
2906 _pollers.erase(std::find(_pollers.begin(), _pollers.end(), p));
2907}
2908
2909void reactor::replace_poller(pollfn* old, pollfn* neww) {
2910 std::replace(_pollers.begin(), _pollers.end(), old, neww);
2911}
2912
f67539c2
TL
2913namespace internal {
2914
2915poller::poller(poller&& x) noexcept
9f95a23c 2916 : _pollfn(std::move(x._pollfn)), _registration_task(std::exchange(x._registration_task, nullptr)) {
11fdf7f2
TL
2917 if (_pollfn && _registration_task) {
2918 _registration_task->moved(this);
2919 }
2920}
2921
f67539c2
TL
2922poller&
2923poller::operator=(poller&& x) noexcept {
11fdf7f2
TL
2924 if (this != &x) {
2925 this->~poller();
2926 new (this) poller(std::move(x));
2927 }
2928 return *this;
2929}
2930
2931void
f67539c2 2932poller::do_register() noexcept {
11fdf7f2
TL
2933 // We can't just insert a poller into reactor::_pollers, because we
2934 // may be running inside a poller ourselves, and so in the middle of
2935 // iterating reactor::_pollers itself. So we schedule a task to add
2936 // the poller instead.
9f95a23c
TL
2937 auto task = new registration_task(this);
2938 engine().add_task(task);
2939 _registration_task = task;
11fdf7f2
TL
2940}
2941
f67539c2 2942poller::~poller() {
11fdf7f2
TL
2943 // We can't just remove the poller from reactor::_pollers, because we
2944 // may be running inside a poller ourselves, and so in the middle of
2945 // iterating reactor::_pollers itself. So we schedule a task to remove
2946 // the poller instead.
2947 //
2948 // Since we don't want to call the poller after we exit the destructor,
2949 // we replace it atomically with another one, and schedule a task to
2950 // delete the replacement.
2951 if (_pollfn) {
2952 if (_registration_task) {
2953 // not added yet, so don't do it at all.
2954 _registration_task->cancel();
9f95a23c
TL
2955 delete _registration_task;
2956 } else if (!engine()._finished_running_tasks) {
2957 // If _finished_running_tasks, the call to add_task() below will just
2958 // leak it, since no one will call task::run_and_dispose(). Just leave
2959 // the poller there, the reactor will never use it.
11fdf7f2
TL
2960 auto dummy = make_pollfn([] { return false; });
2961 auto dummy_p = dummy.get();
9f95a23c
TL
2962 auto task = new deregistration_task(std::move(dummy));
2963 engine().add_task(task);
11fdf7f2
TL
2964 engine().replace_poller(_pollfn.get(), dummy_p);
2965 }
2966 }
2967}
2968
f67539c2
TL
2969}
2970
11fdf7f2
TL
2971syscall_work_queue::syscall_work_queue()
2972 : _pending()
2973 , _completed()
2974 , _start_eventfd(0) {
2975}
2976
2977void syscall_work_queue::submit_item(std::unique_ptr<syscall_work_queue::work_item> item) {
f67539c2
TL
2978 (void)_queue_has_room.wait().then_wrapped([this, item = std::move(item)] (future<> f) mutable {
2979 // propagate wait failure via work_item
2980 if (f.failed()) {
2981 item->set_exception(f.get_exception());
2982 return;
2983 }
11fdf7f2
TL
2984 _pending.push(item.release());
2985 _start_eventfd.signal(1);
2986 });
2987}
2988
2989unsigned syscall_work_queue::complete() {
2990 std::array<work_item*, queue_length> tmp_buf;
2991 auto end = tmp_buf.data();
2992 auto nr = _completed.consume_all([&] (work_item* wi) {
2993 *end++ = wi;
2994 });
2995 for (auto p = tmp_buf.data(); p != end; ++p) {
2996 auto wi = *p;
2997 wi->complete();
2998 delete wi;
2999 }
3000 _queue_has_room.signal(nr);
3001 return nr;
3002}
3003
9f95a23c 3004
11fdf7f2
TL
3005smp_message_queue::smp_message_queue(reactor* from, reactor* to)
3006 : _pending(to)
3007 , _completed(from)
3008{
3009}
3010
3011smp_message_queue::~smp_message_queue()
3012{
3013 if (_pending.remote != _completed.remote) {
3014 _tx.a.~aa();
3015 }
3016}
3017
3018void smp_message_queue::stop() {
3019 _metrics.clear();
3020}
3021
3022void smp_message_queue::move_pending() {
3023 auto begin = _tx.a.pending_fifo.cbegin();
3024 auto end = _tx.a.pending_fifo.cend();
3025 end = _pending.push(begin, end);
3026 if (begin == end) {
3027 return;
3028 }
3029 auto nr = end - begin;
3030 _pending.maybe_wakeup();
3031 _tx.a.pending_fifo.erase(begin, end);
3032 _current_queue_length += nr;
3033 _last_snt_batch = nr;
3034 _sent += nr;
3035}
3036
3037bool smp_message_queue::pure_poll_tx() const {
3038 // can't use read_available(), not available on older boost
3039 // empty() is not const, so need const_cast.
3040 return !const_cast<lf_queue&>(_completed).empty();
3041}
3042
9f95a23c
TL
3043void smp_message_queue::submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<smp_message_queue::work_item> item) {
3044 // matching signal() in process_completions()
3045 auto ssg_id = internal::smp_service_group_id(item->ssg);
3046 auto& sem = get_smp_service_groups_semaphore(ssg_id, t);
3047 // Future indirectly forwarded to `item`.
3048 (void)get_units(sem, 1, timeout).then_wrapped([this, item = std::move(item)] (future<smp_service_group_semaphore_units> units_fut) mutable {
3049 if (units_fut.failed()) {
3050 item->fail_with(units_fut.get_exception());
3051 ++_compl;
3052 ++_last_cmpl_batch;
3053 return;
3054 }
11fdf7f2 3055 _tx.a.pending_fifo.push_back(item.get());
9f95a23c 3056 // no exceptions from this point
11fdf7f2 3057 item.release();
9f95a23c 3058 units_fut.get0().release();
11fdf7f2
TL
3059 if (_tx.a.pending_fifo.size() >= batch_size) {
3060 move_pending();
3061 }
9f95a23c 3062 });
11fdf7f2
TL
3063}
3064
3065void smp_message_queue::respond(work_item* item) {
3066 _completed_fifo.push_back(item);
3067 if (_completed_fifo.size() >= batch_size || engine()._stopped) {
3068 flush_response_batch();
3069 }
3070}
3071
3072void smp_message_queue::flush_response_batch() {
3073 if (!_completed_fifo.empty()) {
3074 auto begin = _completed_fifo.cbegin();
3075 auto end = _completed_fifo.cend();
3076 end = _completed.push(begin, end);
3077 if (begin == end) {
3078 return;
3079 }
3080 _completed.maybe_wakeup();
3081 _completed_fifo.erase(begin, end);
3082 }
3083}
3084
3085bool smp_message_queue::has_unflushed_responses() const {
3086 return !_completed_fifo.empty();
3087}
3088
3089bool smp_message_queue::pure_poll_rx() const {
3090 // can't use read_available(), not available on older boost
3091 // empty() is not const, so need const_cast.
3092 return !const_cast<lf_queue&>(_pending).empty();
3093}
3094
3095void
3096smp_message_queue::lf_queue::maybe_wakeup() {
3097 // Called after lf_queue_base::push().
3098 //
3099 // This is read-after-write, which wants memory_order_seq_cst,
3100 // but we insert that barrier using systemwide_memory_barrier()
3101 // because seq_cst is so expensive.
3102 //
3103 // However, we do need a compiler barrier:
3104 std::atomic_signal_fence(std::memory_order_seq_cst);
3105 if (remote->_sleeping.load(std::memory_order_relaxed)) {
3106 // We are free to clear it, because we're sending a signal now
3107 remote->_sleeping.store(false, std::memory_order_relaxed);
3108 remote->wakeup();
3109 }
3110}
3111
9f95a23c
TL
3112smp_message_queue::lf_queue::~lf_queue() {
3113 consume_all([] (work_item* ptr) {
3114 delete ptr;
3115 });
3116}
3117
3118
11fdf7f2
TL
3119template<size_t PrefetchCnt, typename Func>
3120size_t smp_message_queue::process_queue(lf_queue& q, Func process) {
3121 // copy batch to local memory in order to minimize
3122 // time in which cross-cpu data is accessed
3123 work_item* items[queue_length + PrefetchCnt];
3124 work_item* wi;
3125 if (!q.pop(wi))
3126 return 0;
3127 // start prefetching first item before popping the rest to overlap memory
3128 // access with potential cache miss the second pop may cause
3129 prefetch<2>(wi);
3130 auto nr = q.pop(items);
3131 std::fill(std::begin(items) + nr, std::begin(items) + nr + PrefetchCnt, nr ? items[nr - 1] : wi);
3132 unsigned i = 0;
3133 do {
3134 prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + PrefetchCnt);
3135 process(wi);
3136 wi = items[i++];
3137 } while(i <= nr);
3138
3139 return nr + 1;
3140}
3141
9f95a23c
TL
3142size_t smp_message_queue::process_completions(shard_id t) {
3143 auto nr = process_queue<prefetch_cnt*2>(_completed, [t] (work_item* wi) {
11fdf7f2 3144 wi->complete();
9f95a23c
TL
3145 auto ssg_id = smp_service_group_id(wi->ssg);
3146 get_smp_service_groups_semaphore(ssg_id, t).signal();
11fdf7f2
TL
3147 delete wi;
3148 });
3149 _current_queue_length -= nr;
3150 _compl += nr;
3151 _last_cmpl_batch = nr;
3152
3153 return nr;
3154}
3155
3156void smp_message_queue::flush_request_batch() {
3157 if (!_tx.a.pending_fifo.empty()) {
3158 move_pending();
3159 }
3160}
3161
3162size_t smp_message_queue::process_incoming() {
3163 auto nr = process_queue<prefetch_cnt>(_pending, [] (work_item* wi) {
3164 wi->process();
3165 });
3166 _received += nr;
3167 _last_rcv_batch = nr;
3168 return nr;
3169}
3170
3171void smp_message_queue::start(unsigned cpuid) {
3172 _tx.init();
3173 namespace sm = seastar::metrics;
3174 char instance[10];
f67539c2 3175 std::snprintf(instance, sizeof(instance), "%u-%u", this_shard_id(), cpuid);
11fdf7f2
TL
3176 _metrics.add_group("smp", {
3177 // queue_length value:GAUGE:0:U
3178 // Absolute value of num packets in last tx batch.
3179 sm::make_queue_length("send_batch_queue_length", _last_snt_batch, sm::description("Current send batch queue length"), {sm::shard_label(instance)})(sm::metric_disabled),
3180 sm::make_queue_length("receive_batch_queue_length", _last_rcv_batch, sm::description("Current receive batch queue length"), {sm::shard_label(instance)})(sm::metric_disabled),
3181 sm::make_queue_length("complete_batch_queue_length", _last_cmpl_batch, sm::description("Current complete batch queue length"), {sm::shard_label(instance)})(sm::metric_disabled),
3182 sm::make_queue_length("send_queue_length", _current_queue_length, sm::description("Current send queue length"), {sm::shard_label(instance)})(sm::metric_disabled),
3183 // total_operations value:DERIVE:0:U
3184 sm::make_derive("total_received_messages", _received, sm::description("Total number of received messages"), {sm::shard_label(instance)})(sm::metric_disabled),
3185 // total_operations value:DERIVE:0:U
3186 sm::make_derive("total_sent_messages", _sent, sm::description("Total number of sent messages"), {sm::shard_label(instance)})(sm::metric_disabled),
3187 // total_operations value:DERIVE:0:U
3188 sm::make_derive("total_completed_messages", _compl, sm::description("Total number of messages completed"), {sm::shard_label(instance)})(sm::metric_disabled)
3189 });
3190}
3191
11fdf7f2
TL
3192readable_eventfd writeable_eventfd::read_side() {
3193 return readable_eventfd(_fd.dup());
3194}
3195
3196file_desc writeable_eventfd::try_create_eventfd(size_t initial) {
3197 assert(size_t(int(initial)) == initial);
3198 return file_desc::eventfd(initial, EFD_CLOEXEC);
3199}
3200
3201void writeable_eventfd::signal(size_t count) {
3202 uint64_t c = count;
3203 auto r = _fd.write(&c, sizeof(c));
3204 assert(r == sizeof(c));
3205}
3206
3207writeable_eventfd readable_eventfd::write_side() {
3208 return writeable_eventfd(_fd.get_file_desc().dup());
3209}
3210
3211file_desc readable_eventfd::try_create_eventfd(size_t initial) {
3212 assert(size_t(int(initial)) == initial);
3213 return file_desc::eventfd(initial, EFD_CLOEXEC | EFD_NONBLOCK);
3214}
3215
3216future<size_t> readable_eventfd::wait() {
3217 return engine().readable(*_fd._s).then([this] {
3218 uint64_t count;
3219 int r = ::read(_fd.get_fd(), &count, sizeof(count));
3220 assert(r == sizeof(count));
3221 return make_ready_future<size_t>(count);
3222 });
3223}
3224
9f95a23c
TL
3225void schedule(task* t) noexcept {
3226 engine().add_task(t);
11fdf7f2
TL
3227}
3228
9f95a23c
TL
3229void schedule_urgent(task* t) noexcept {
3230 engine().add_urgent_task(t);
11fdf7f2
TL
3231}
3232
3233}
3234
3235bool operator==(const ::sockaddr_in a, const ::sockaddr_in b) {
3236 return (a.sin_addr.s_addr == b.sin_addr.s_addr) && (a.sin_port == b.sin_port);
3237}
3238
3239namespace seastar {
3240
3241void network_stack_registry::register_stack(sstring name,
3242 boost::program_options::options_description opts,
9f95a23c
TL
3243 noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)> create, bool make_default) {
3244 if (_map().count(name)) {
3245 return;
3246 }
11fdf7f2
TL
3247 _map()[name] = std::move(create);
3248 options_description().add(opts);
3249 if (make_default) {
3250 _default() = name;
3251 }
3252}
3253
3254void register_network_stack(sstring name, boost::program_options::options_description opts,
9f95a23c 3255 noncopyable_function<future<std::unique_ptr<network_stack>>(boost::program_options::variables_map)>
11fdf7f2
TL
3256 create,
3257 bool make_default) {
3258 return network_stack_registry::register_stack(
3259 std::move(name), std::move(opts), std::move(create), make_default);
3260}
3261
3262sstring network_stack_registry::default_stack() {
3263 return _default();
3264}
3265
3266std::vector<sstring> network_stack_registry::list() {
3267 std::vector<sstring> ret;
3268 for (auto&& ns : _map()) {
3269 ret.push_back(ns.first);
3270 }
3271 return ret;
3272}
3273
3274future<std::unique_ptr<network_stack>>
3275network_stack_registry::create(options opts) {
3276 return create(_default(), opts);
3277}
3278
3279future<std::unique_ptr<network_stack>>
3280network_stack_registry::create(sstring name, options opts) {
3281 if (!_map().count(name)) {
3282 throw std::runtime_error(format("network stack {} not registered", name));
3283 }
3284 return _map()[name](opts);
3285}
3286
9f95a23c
TL
3287static bool kernel_supports_aio_fsync() {
3288 return kernel_uname().whitelisted({"4.18"});
3289}
3290
11fdf7f2 3291boost::program_options::options_description
9f95a23c 3292reactor::get_options_description(reactor_config cfg) {
11fdf7f2
TL
3293 namespace bpo = boost::program_options;
3294 bpo::options_description opts("Core options");
3295 auto net_stack_names = network_stack_registry::list();
3296 opts.add_options()
3297 ("network-stack", bpo::value<std::string>(),
3298 format("select network stack (valid values: {})",
3299 format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")).c_str())
11fdf7f2
TL
3300 ("poll-mode", "poll continuously (100% cpu use)")
3301 ("idle-poll-time-us", bpo::value<unsigned>()->default_value(calculate_poll_time() / 1us),
3302 "idle polling time in microseconds (reduce for overprovisioned environments or laptops)")
3303 ("poll-aio", bpo::value<bool>()->default_value(true),
3304 "busy-poll for disk I/O (reduces latency and increases throughput)")
9f95a23c 3305 ("task-quota-ms", bpo::value<double>()->default_value(cfg.task_quota / 1ms), "Max time (ms) between polls")
11fdf7f2 3306 ("max-task-backlog", bpo::value<unsigned>()->default_value(1000), "Maximum number of task backlog to allow; above this we ignore I/O")
9f95a23c 3307 ("blocked-reactor-notify-ms", bpo::value<unsigned>()->default_value(200), "threshold in miliseconds over which the reactor is considered blocked if no progress is made")
11fdf7f2
TL
3308 ("blocked-reactor-reports-per-minute", bpo::value<unsigned>()->default_value(5), "Maximum number of backtraces reported by stall detector per minute")
3309 ("relaxed-dma", "allow using buffered I/O if DMA is not available (reduces performance)")
9f95a23c
TL
3310 ("linux-aio-nowait",
3311 bpo::value<bool>()->default_value(aio_nowait_supported),
3312 "use the Linux NOWAIT AIO feature, which reduces reactor stalls due to aio (autodetected)")
11fdf7f2
TL
3313 ("unsafe-bypass-fsync", bpo::value<bool>()->default_value(false), "Bypass fsync(), may result in data loss. Use for testing on consumer drives")
3314 ("overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0")
3315 ("abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory")
3316 ("force-aio-syscalls", bpo::value<bool>()->default_value(false),
3317 "Force io_getevents(2) to issue a system call, instead of bypassing the kernel when possible."
3318 " This makes strace output more useful, but slows down the application")
f67539c2
TL
3319 ("dump-memory-diagnostics-on-alloc-failure-kind", bpo::value<std::string>()->default_value("critical"), "Dump diagnostics of the seastar allocator state on allocation failure."
3320 " Accepted values: never, critical (default), always. When set to critical, only allocations marked as critical will trigger diagnostics dump."
3321 " The diagnostics will be written to the seastar_memory logger, with error level."
3322 " Note that if the seastar_memory logger is set to debug or trace level, the diagnostics will be logged irrespective of this setting.")
11fdf7f2
TL
3323 ("reactor-backend", bpo::value<reactor_backend_selector>()->default_value(reactor_backend_selector::default_backend()),
3324 format("Internal reactor implementation ({})", reactor_backend_selector::available()).c_str())
9f95a23c
TL
3325 ("aio-fsync", bpo::value<bool>()->default_value(kernel_supports_aio_fsync()),
3326 "Use Linux aio for fsync() calls. This reduces latency; requires Linux 4.18 or later.")
11fdf7f2
TL
3327#ifdef SEASTAR_HEAPPROF
3328 ("heapprof", "enable seastar heap profiling")
3329#endif
3330 ;
9f95a23c
TL
3331 if (cfg.auto_handle_sigint_sigterm) {
3332 opts.add_options()
3333 ("no-handle-interrupt", "ignore SIGINT (for gdb)")
3334 ;
3335 }
11fdf7f2
TL
3336 opts.add(network_stack_registry::options_description());
3337 return opts;
3338}
3339
3340boost::program_options::options_description
3341smp::get_options_description()
3342{
3343 namespace bpo = boost::program_options;
3344 bpo::options_description opts("SMP options");
3345 opts.add_options()
3346 ("smp,c", bpo::value<unsigned>(), "number of threads (default: one per CPU)")
3347 ("cpuset", bpo::value<cpuset_bpo_wrapper>(), "CPUs to use (in cpuset(7) format; default: all))")
3348 ("memory,m", bpo::value<std::string>(), "memory to use, in bytes (ex: 4G) (default: all)")
3349 ("reserve-memory", bpo::value<std::string>(), "memory reserved to OS (if --memory not specified)")
3350 ("hugepages", bpo::value<std::string>(), "path to accessible hugetlbfs mount (typically /dev/hugepages/something)")
3351 ("lock-memory", bpo::value<bool>(), "lock all memory (prevents swapping)")
3352 ("thread-affinity", bpo::value<bool>()->default_value(true), "pin threads to their cpus (disable for overprovisioning)")
3353#ifdef SEASTAR_HAVE_HWLOC
3354 ("num-io-queues", bpo::value<unsigned>(), "Number of IO queues. Each IO unit will be responsible for a fraction of the IO requests. Defaults to the number of threads")
3355 ("max-io-requests", bpo::value<unsigned>(), "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of IO queues")
3356#else
3357 ("max-io-requests", bpo::value<unsigned>(), "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of processors")
3358#endif
3359 ("io-properties-file", bpo::value<std::string>(), "path to a YAML file describing the characteristics of the I/O Subsystem")
3360 ("io-properties", bpo::value<std::string>(), "a YAML string describing the characteristics of the I/O Subsystem")
3361 ("mbind", bpo::value<bool>()->default_value(true), "enable mbind")
3362#ifndef SEASTAR_NO_EXCEPTION_HACK
3363 ("enable-glibc-exception-scaling-workaround", bpo::value<bool>()->default_value(true), "enable workaround for glibc/gcc c++ exception scalablity problem")
f67539c2
TL
3364#endif
3365#ifdef SEASTAR_HAVE_HWLOC
3366 ("allow-cpus-in-remote-numa-nodes", bpo::value<bool>()->default_value(true), "if some CPUs are found not to have any local NUMA nodes, allow assigning them to remote ones")
11fdf7f2
TL
3367#endif
3368 ;
3369 return opts;
3370}
3371
3372thread_local scollectd::impl scollectd_impl;
3373
3374scollectd::impl & scollectd::get_impl() {
3375 return scollectd_impl;
3376}
3377
3378struct reactor_deleter {
3379 void operator()(reactor* p) {
3380 p->~reactor();
3381 free(p);
3382 }
3383};
3384
3385thread_local std::unique_ptr<reactor, reactor_deleter> reactor_holder;
3386
3387std::vector<posix_thread> smp::_threads;
3388std::vector<std::function<void ()>> smp::_thread_loops;
f67539c2 3389std::optional<boost::barrier> smp::_all_event_loops_done;
11fdf7f2
TL
3390std::vector<reactor*> smp::_reactors;
3391std::unique_ptr<smp_message_queue*[], smp::qs_deleter> smp::_qs;
3392std::thread::id smp::_tmain;
3393unsigned smp::count = 1;
3394bool smp::_using_dpdk;
3395
3396void smp::start_all_queues()
3397{
3398 for (unsigned c = 0; c < count; c++) {
f67539c2
TL
3399 if (c != this_shard_id()) {
3400 _qs[c][this_shard_id()].start(c);
11fdf7f2
TL
3401 }
3402 }
f67539c2 3403 alien::smp::_qs[this_shard_id()].start();
11fdf7f2
TL
3404}
3405
3406#ifdef SEASTAR_HAVE_DPDK
3407
3408int dpdk_thread_adaptor(void* f)
3409{
3410 (*static_cast<std::function<void ()>*>(f))();
3411 return 0;
3412}
3413
3414#endif
3415
3416void smp::join_all()
3417{
3418#ifdef SEASTAR_HAVE_DPDK
3419 if (_using_dpdk) {
3420 rte_eal_mp_wait_lcore();
3421 return;
3422 }
3423#endif
3424 for (auto&& t: smp::_threads) {
3425 t.join();
3426 }
3427}
3428
3429void smp::pin(unsigned cpu_id) {
3430 if (_using_dpdk) {
3431 // dpdk does its own pinning
3432 return;
3433 }
3434 pin_this_thread(cpu_id);
3435}
3436
3437void smp::arrive_at_event_loop_end() {
3438 if (_all_event_loops_done) {
3439 _all_event_loops_done->wait();
3440 }
3441}
3442
9f95a23c 3443void smp::allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg) {
11fdf7f2
TL
3444 assert(!reactor_holder);
3445
3446 // we cannot just write "local_engin = new reactor" since reactor's constructor
3447 // uses local_engine
3448 void *buf;
3449 int r = posix_memalign(&buf, cache_line_size, sizeof(reactor));
3450 assert(r == 0);
3451 local_engine = reinterpret_cast<reactor*>(buf);
f67539c2 3452 *internal::this_shard_id_ptr() = id;
9f95a23c 3453 new (buf) reactor(id, std::move(rbs), cfg);
11fdf7f2
TL
3454 reactor_holder.reset(local_engine);
3455}
3456
3457void smp::cleanup() {
3458 smp::_threads = std::vector<posix_thread>();
3459 _thread_loops.clear();
3460}
3461
3462void smp::cleanup_cpu() {
f67539c2 3463 size_t cpuid = this_shard_id();
11fdf7f2
TL
3464
3465 if (_qs) {
3466 for(unsigned i = 0; i < smp::count; i++) {
3467 _qs[i][cpuid].stop();
3468 }
3469 }
3470 if (alien::smp::_qs) {
3471 alien::smp::_qs[cpuid].stop();
3472 }
3473}
3474
3475void smp::create_thread(std::function<void ()> thread_loop) {
3476 if (_using_dpdk) {
3477 _thread_loops.push_back(std::move(thread_loop));
3478 } else {
3479 _threads.emplace_back(std::move(thread_loop));
3480 }
3481}
3482
3483// Installs handler for Signal which ensures that Func is invoked only once
3484// in the whole program and that after it is invoked the default handler is restored.
3485template<int Signal, void(*Func)()>
3486void install_oneshot_signal_handler() {
3487 static bool handled = false;
3488 static util::spinlock lock;
3489
3490 struct sigaction sa;
3491 sa.sa_sigaction = [](int sig, siginfo_t *info, void *p) {
3492 std::lock_guard<util::spinlock> g(lock);
3493 if (!handled) {
3494 handled = true;
3495 Func();
3496 signal(sig, SIG_DFL);
3497 }
3498 };
3499 sigfillset(&sa.sa_mask);
3500 sa.sa_flags = SA_SIGINFO | SA_RESTART;
3501 if (Signal == SIGSEGV) {
3502 sa.sa_flags |= SA_ONSTACK;
3503 }
3504 auto r = ::sigaction(Signal, &sa, nullptr);
3505 throw_system_error_on(r == -1);
3506}
3507
3508static void sigsegv_action() noexcept {
3509 print_with_backtrace("Segmentation fault");
3510}
3511
3512static void sigabrt_action() noexcept {
3513 print_with_backtrace("Aborting");
3514}
3515
3516void smp::qs_deleter::operator()(smp_message_queue** qs) const {
3517 for (unsigned i = 0; i < smp::count; i++) {
3518 for (unsigned j = 0; j < smp::count; j++) {
3519 qs[i][j].~smp_message_queue();
3520 }
3521 ::operator delete[](qs[i]);
3522 }
3523 delete[](qs);
3524}
3525
3526class disk_config_params {
3527private:
3528 unsigned _num_io_queues = 0;
f67539c2 3529 std::optional<unsigned> _capacity;
11fdf7f2
TL
3530 std::unordered_map<dev_t, mountpoint_params> _mountpoints;
3531 std::chrono::duration<double> _latency_goal;
3532
3533public:
3534 uint64_t per_io_queue(uint64_t qty, dev_t devid) const {
3535 const mountpoint_params& p = _mountpoints.at(devid);
3536 return std::max(qty / p.num_io_queues, 1ul);
3537 }
3538
3539 unsigned num_io_queues(dev_t devid) const {
3540 const mountpoint_params& p = _mountpoints.at(devid);
3541 return p.num_io_queues;
3542 }
3543
3544 std::chrono::duration<double> latency_goal() const {
3545 return _latency_goal;
3546 }
3547
3548 void parse_config(boost::program_options::variables_map& configuration) {
3549 seastar_logger.debug("smp::count: {}", smp::count);
3550 _latency_goal = std::chrono::duration_cast<std::chrono::duration<double>>(configuration["task-quota-ms"].as<double>() * 1.5 * 1ms);
3551 seastar_logger.debug("latency_goal: {}", latency_goal().count());
3552
3553 if (configuration.count("max-io-requests")) {
3554 _capacity = configuration["max-io-requests"].as<unsigned>();
3555 }
3556
3557 if (configuration.count("num-io-queues")) {
3558 _num_io_queues = configuration["num-io-queues"].as<unsigned>();
3559 if (!_num_io_queues) {
3560 throw std::runtime_error("num-io-queues must be greater than zero");
3561 }
3562 }
3563 if (configuration.count("io-properties-file") && configuration.count("io-properties")) {
3564 throw std::runtime_error("Both io-properties and io-properties-file specified. Don't know which to trust!");
3565 }
3566
f67539c2 3567 std::optional<YAML::Node> doc;
11fdf7f2
TL
3568 if (configuration.count("io-properties-file")) {
3569 doc = YAML::LoadFile(configuration["io-properties-file"].as<std::string>());
3570 } else if (configuration.count("io-properties")) {
3571 doc = YAML::Load(configuration["io-properties"].as<std::string>());
3572 }
3573
3574 if (doc) {
3575 static constexpr unsigned task_quotas_in_default_latency_goal = 3;
3576 unsigned auto_num_io_queues = smp::count;
3577
3578 for (auto&& section : *doc) {
3579 auto sec_name = section.first.as<std::string>();
3580 if (sec_name != "disks") {
3581 throw std::runtime_error(fmt::format("While parsing I/O options: section {} currently unsupported.", sec_name));
3582 }
3583 auto disks = section.second.as<std::vector<mountpoint_params>>();
3584 for (auto& d : disks) {
3585 struct ::stat buf;
3586 auto ret = stat(d.mountpoint.c_str(), &buf);
3587 if (ret < 0) {
3588 throw std::runtime_error(fmt::format("Couldn't stat {}", d.mountpoint));
3589 }
3590 if (_mountpoints.count(buf.st_dev)) {
3591 throw std::runtime_error(fmt::format("Mountpoint {} already configured", d.mountpoint));
3592 }
3593 if (_mountpoints.size() >= reactor::max_queues) {
3594 throw std::runtime_error(fmt::format("Configured number of queues {} is larger than the maximum {}",
3595 _mountpoints.size(), reactor::max_queues));
3596 }
9f95a23c
TL
3597 if (d.read_bytes_rate == 0 || d.write_bytes_rate == 0 ||
3598 d.read_req_rate == 0 || d.write_req_rate == 0) {
3599 throw std::runtime_error(fmt::format("R/W bytes and req rates must not be zero"));
3600 }
11fdf7f2
TL
3601
3602 // Ideally we wouldn't have I/O Queues and would dispatch from every shard (https://github.com/scylladb/seastar/issues/485)
3603 // While we don't do that, we'll just be conservative and try to recommend values of I/O Queues that are close to what we
3604 // suggested before the I/O Scheduler rework. The I/O Scheduler has traditionally tried to make sure that each queue would have
3605 // at least 4 requests in depth, and all its requests were 4kB in size. Therefore, try to arrange the I/O Queues so that we would
3606 // end up in the same situation here (that's where the 4 comes from).
3607 //
3608 // For the bandwidth limit, we want that to be 4 * 4096, so each I/O Queue has the same bandwidth as before.
3609 if (!_num_io_queues) {
3610 unsigned dev_io_queues = smp::count;
3611 dev_io_queues = std::min(dev_io_queues, unsigned((task_quotas_in_default_latency_goal * d.write_req_rate * latency_goal().count()) / 4));
3612 dev_io_queues = std::min(dev_io_queues, unsigned((task_quotas_in_default_latency_goal * d.write_bytes_rate * latency_goal().count()) / (4 * 4096)));
3613 dev_io_queues = std::max(dev_io_queues, 1u);
3614 seastar_logger.debug("dev_io_queues: {}", dev_io_queues);
3615 d.num_io_queues = dev_io_queues;
3616 auto_num_io_queues = std::min(auto_num_io_queues, dev_io_queues);
3617 } else {
3618 d.num_io_queues = _num_io_queues;
3619 }
3620
3621 seastar_logger.debug("dev_id: {} mountpoint: {}", buf.st_dev, d.mountpoint);
3622 _mountpoints.emplace(buf.st_dev, d);
3623 }
3624 }
3625 if (!_num_io_queues) {
3626 _num_io_queues = auto_num_io_queues;
3627 }
3628 } else if (!_num_io_queues) {
3629 _num_io_queues = smp::count;
3630 }
3631
3632 // Placeholder for unconfigured disks.
3633 mountpoint_params d = {};
3634 d.num_io_queues = _num_io_queues;
3635 seastar_logger.debug("num_io_queues: {}", d.num_io_queues);
3636 _mountpoints.emplace(0, d);
3637 }
3638
3639 struct io_queue::config generate_config(dev_t devid) const {
3640 seastar_logger.debug("generate_config dev_id: {}", devid);
3641 const mountpoint_params& p = _mountpoints.at(devid);
3642 struct io_queue::config cfg;
3643 uint64_t max_bandwidth = std::max(p.read_bytes_rate, p.write_bytes_rate);
3644 uint64_t max_iops = std::max(p.read_req_rate, p.write_req_rate);
3645
f67539c2
TL
3646 cfg.devid = devid;
3647 cfg.disk_bytes_write_to_read_multiplier = io_queue::read_request_base_count;
3648 cfg.disk_req_write_to_read_multiplier = io_queue::read_request_base_count;
3649
11fdf7f2 3650 if (!_capacity) {
11fdf7f2 3651 if (max_bandwidth != std::numeric_limits<uint64_t>::max()) {
f67539c2 3652 cfg.disk_bytes_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_bytes_rate) / p.write_bytes_rate;
11fdf7f2
TL
3653 cfg.max_bytes_count = io_queue::read_request_base_count * per_io_queue(max_bandwidth * latency_goal().count(), devid);
3654 }
3655 if (max_iops != std::numeric_limits<uint64_t>::max()) {
3656 cfg.max_req_count = io_queue::read_request_base_count * per_io_queue(max_iops * latency_goal().count(), devid);
f67539c2 3657 cfg.disk_req_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_req_rate) / p.write_req_rate;
11fdf7f2
TL
3658 }
3659 cfg.mountpoint = p.mountpoint;
3660 } else {
f67539c2
TL
3661 // For backwards compatibility
3662 cfg.capacity = *_capacity;
3663 // Legacy configuration when only concurrency is specified.
3664 cfg.max_req_count = io_queue::read_request_base_count * std::min(*_capacity, reactor::max_aio_per_queue);
3665 // specify size in terms of 16kB IOPS.
3666 cfg.max_bytes_count = io_queue::read_request_base_count * (cfg.max_req_count << 14);
11fdf7f2
TL
3667 }
3668 return cfg;
3669 }
3670
3671 auto device_ids() {
3672 return boost::adaptors::keys(_mountpoints);
3673 }
3674};
3675
9f95a23c 3676void smp::register_network_stacks() {
11fdf7f2
TL
3677 register_posix_stack();
3678 register_native_stack();
3679}
3680
9f95a23c 3681void smp::configure(boost::program_options::variables_map configuration, reactor_config reactor_cfg)
11fdf7f2 3682{
11fdf7f2
TL
3683#ifndef SEASTAR_NO_EXCEPTION_HACK
3684 if (configuration["enable-glibc-exception-scaling-workaround"].as<bool>()) {
3685 init_phdr_cache();
3686 }
3687#endif
3688
3689 // Mask most, to prevent threads (esp. dpdk helper threads)
3690 // from servicing a signal. Individual reactors will unmask signals
3691 // as they become prepared to handle them.
3692 //
3693 // We leave some signals unmasked since we don't handle them ourself.
3694 sigset_t sigs;
3695 sigfillset(&sigs);
3696 for (auto sig : {SIGHUP, SIGQUIT, SIGILL, SIGABRT, SIGFPE, SIGSEGV,
3697 SIGALRM, SIGCONT, SIGSTOP, SIGTSTP, SIGTTIN, SIGTTOU}) {
3698 sigdelset(&sigs, sig);
3699 }
9f95a23c
TL
3700 if (!reactor_cfg.auto_handle_sigint_sigterm) {
3701 sigdelset(&sigs, SIGINT);
3702 sigdelset(&sigs, SIGTERM);
3703 }
11fdf7f2
TL
3704 pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
3705
9f95a23c
TL
3706#ifndef SEASTAR_ASAN_ENABLED
3707 // We don't need to handle SIGSEGV when asan is enabled.
11fdf7f2 3708 install_oneshot_signal_handler<SIGSEGV, sigsegv_action>();
9f95a23c
TL
3709#else
3710 (void)sigsegv_action;
3711#endif
11fdf7f2
TL
3712 install_oneshot_signal_handler<SIGABRT, sigabrt_action>();
3713
3714#ifdef SEASTAR_HAVE_DPDK
3715 _using_dpdk = configuration.count("dpdk-pmd");
3716#endif
3717 auto thread_affinity = configuration["thread-affinity"].as<bool>();
3718 if (configuration.count("overprovisioned")
3719 && configuration["thread-affinity"].defaulted()) {
3720 thread_affinity = false;
3721 }
3722 if (!thread_affinity && _using_dpdk) {
3723 fmt::print("warning: --thread-affinity 0 ignored in dpdk mode\n");
3724 }
3725 auto mbind = configuration["mbind"].as<bool>();
3726 if (!thread_affinity) {
3727 mbind = false;
3728 }
3729
3730 smp::count = 1;
3731 smp::_tmain = std::this_thread::get_id();
3732 auto nr_cpus = resource::nr_processing_units();
3733 resource::cpuset cpu_set;
9f95a23c
TL
3734 auto cgroup_cpu_set = cgroup::cpu_set();
3735
11fdf7f2
TL
3736 std::copy(boost::counting_iterator<unsigned>(0), boost::counting_iterator<unsigned>(nr_cpus),
3737 std::inserter(cpu_set, cpu_set.end()));
9f95a23c 3738
11fdf7f2
TL
3739 if (configuration.count("cpuset")) {
3740 cpu_set = configuration["cpuset"].as<cpuset_bpo_wrapper>().value;
9f95a23c
TL
3741 if (cgroup_cpu_set && *cgroup_cpu_set != cpu_set) {
3742 // CPUs that are not available are those pinned by
3743 // --cpuset but not by cgroups, if mounted.
3744 std::set<unsigned int> not_available_cpus;
3745 std::set_difference(cpu_set.begin(), cpu_set.end(),
3746 cgroup_cpu_set->begin(), cgroup_cpu_set->end(),
3747 std::inserter(not_available_cpus, not_available_cpus.end()));
3748
3749 if (!not_available_cpus.empty()) {
3750 std::ostringstream not_available_cpus_list;
3751 for (auto cpu_id : not_available_cpus) {
3752 not_available_cpus_list << " " << cpu_id;
3753 }
3754 seastar_logger.error("Bad value for --cpuset:{} not allowed. Shutting down.", not_available_cpus_list.str());
3755 exit(1);
3756 }
3757 }
3758 } else if (cgroup_cpu_set) {
3759 cpu_set = *cgroup_cpu_set;
11fdf7f2 3760 }
9f95a23c 3761
11fdf7f2
TL
3762 if (configuration.count("smp")) {
3763 nr_cpus = configuration["smp"].as<unsigned>();
3764 } else {
3765 nr_cpus = cpu_set.size();
3766 }
3767 smp::count = nr_cpus;
3768 _reactors.resize(nr_cpus);
3769 resource::configuration rc;
3770 if (configuration.count("memory")) {
3771 rc.total_memory = parse_memory_size(configuration["memory"].as<std::string>());
3772#ifdef SEASTAR_HAVE_DPDK
3773 if (configuration.count("hugepages") &&
3774 !configuration["network-stack"].as<std::string>().compare("native") &&
3775 _using_dpdk) {
3776 size_t dpdk_memory = dpdk::eal::mem_size(smp::count);
3777
3778 if (dpdk_memory >= rc.total_memory) {
3779 std::cerr<<"Can't run with the given amount of memory: ";
3780 std::cerr<<configuration["memory"].as<std::string>();
3781 std::cerr<<". Consider giving more."<<std::endl;
3782 exit(1);
3783 }
3784
3785 //
3786 // Subtract the memory we are about to give to DPDK from the total
3787 // amount of memory we are allowed to use.
3788 //
3789 rc.total_memory.value() -= dpdk_memory;
3790 }
3791#endif
3792 }
3793 if (configuration.count("reserve-memory")) {
3794 rc.reserve_memory = parse_memory_size(configuration["reserve-memory"].as<std::string>());
3795 }
f67539c2 3796 std::optional<std::string> hugepages_path;
11fdf7f2
TL
3797 if (configuration.count("hugepages")) {
3798 hugepages_path = configuration["hugepages"].as<std::string>();
3799 }
3800 auto mlock = false;
3801 if (configuration.count("lock-memory")) {
3802 mlock = configuration["lock-memory"].as<bool>();
3803 }
3804 if (mlock) {
f67539c2
TL
3805 auto extra_flags = 0;
3806#ifdef MCL_ONFAULT
3807 // Linux will serialize faulting in anonymous memory, and also
3808 // serialize marking them as locked. This can take many minutes on
3809 // terabyte class machines, so fault them in the future to spread
3810 // out the cost. This isn't good since we'll see contention if
3811 // multiple shards fault in memory at once, but if that work can be
3812 // in parallel to regular reactor work on other shards.
3813 extra_flags |= MCL_ONFAULT; // Linux 4.4+
3814#endif
3815 auto r = mlockall(MCL_CURRENT | MCL_FUTURE | extra_flags);
11fdf7f2
TL
3816 if (r) {
3817 // Don't hard fail for now, it's hard to get the configuration right
3818 fmt::print("warning: failed to mlockall: {}\n", strerror(errno));
3819 }
3820 }
3821
3822 rc.cpus = smp::count;
3823 rc.cpu_set = std::move(cpu_set);
3824
3825 disk_config_params disk_config;
3826 disk_config.parse_config(configuration);
3827 for (auto& id : disk_config.device_ids()) {
3828 rc.num_io_queues.emplace(id, disk_config.num_io_queues(id));
3829 }
3830
f67539c2
TL
3831#ifdef SEASTAR_HAVE_HWLOC
3832 if (configuration["allow-cpus-in-remote-numa-nodes"].as<bool>()) {
3833 rc.assign_orphan_cpus = true;
3834 }
3835#endif
3836
11fdf7f2
TL
3837 auto resources = resource::allocate(rc);
3838 std::vector<resource::cpu> allocations = std::move(resources.cpus);
3839 if (thread_affinity) {
3840 smp::pin(allocations[0].cpu_id);
3841 }
3842 memory::configure(allocations[0].mem, mbind, hugepages_path);
3843
3844 if (configuration.count("abort-on-seastar-bad-alloc")) {
3845 memory::enable_abort_on_allocation_failure();
3846 }
3847
f67539c2
TL
3848 if (configuration.count("dump-memory-diagnostics-on-alloc-failure-kind")) {
3849 memory::set_dump_memory_diagnostics_on_alloc_failure_kind(configuration["dump-memory-diagnostics-on-alloc-failure-kind"].as<std::string>());
3850 }
3851
11fdf7f2 3852 bool heapprof_enabled = configuration.count("heapprof");
9f95a23c
TL
3853 if (heapprof_enabled) {
3854 memory::set_heap_profiling_enabled(heapprof_enabled);
3855 }
11fdf7f2
TL
3856
3857#ifdef SEASTAR_HAVE_DPDK
3858 if (smp::_using_dpdk) {
3859 dpdk::eal::cpuset cpus;
3860 for (auto&& a : allocations) {
3861 cpus[a.cpu_id] = true;
3862 }
3863 dpdk::eal::init(cpus, configuration);
3864 }
3865#endif
3866
3867 // Better to put it into the smp class, but at smp construction time
3868 // correct smp::count is not known.
3869 static boost::barrier reactors_registered(smp::count);
3870 static boost::barrier smp_queues_constructed(smp::count);
3871 static boost::barrier inited(smp::count);
3872
3873 auto ioq_topology = std::move(resources.ioq_topology);
3874
3875 std::unordered_map<dev_t, std::vector<io_queue*>> all_io_queues;
11fdf7f2
TL
3876
3877 for (auto& id : disk_config.device_ids()) {
3878 auto io_info = ioq_topology.at(id);
f67539c2 3879 all_io_queues.emplace(id, io_info.nr_coordinators);
11fdf7f2
TL
3880 }
3881
3882 auto alloc_io_queue = [&ioq_topology, &all_io_queues, &disk_config] (unsigned shard, dev_t id) {
3883 auto io_info = ioq_topology.at(id);
3884 auto cid = io_info.shard_to_coordinator[shard];
3885 auto vec_idx = io_info.coordinator_to_idx[cid];
3886 assert(io_info.coordinator_to_idx_valid[cid]);
3887 if (shard == cid) {
3888 struct io_queue::config cfg = disk_config.generate_config(id);
3889 cfg.coordinator = cid;
11fdf7f2
TL
3890 assert(vec_idx < all_io_queues[id].size());
3891 assert(!all_io_queues[id][vec_idx]);
3892 all_io_queues[id][vec_idx] = new io_queue(std::move(cfg));
3893 }
3894 };
3895
9f95a23c 3896 auto assign_io_queue = [&ioq_topology, &all_io_queues] (shard_id shard_id, dev_t dev_id) {
11fdf7f2
TL
3897 auto io_info = ioq_topology.at(dev_id);
3898 auto cid = io_info.shard_to_coordinator[shard_id];
3899 auto queue_idx = io_info.coordinator_to_idx[cid];
3900 if (all_io_queues[dev_id][queue_idx]->coordinator() == shard_id) {
3901 engine().my_io_queues.emplace_back(all_io_queues[dev_id][queue_idx]);
3902 }
3903 engine()._io_queues.emplace(dev_id, all_io_queues[dev_id][queue_idx]);
3904 };
3905
3906 _all_event_loops_done.emplace(smp::count);
3907
3908 auto backend_selector = configuration["reactor-backend"].as<reactor_backend_selector>();
3909
3910 unsigned i;
3911 for (i = 1; i < smp::count; i++) {
3912 auto allocation = allocations[i];
9f95a23c
TL
3913 create_thread([configuration, &disk_config, hugepages_path, i, allocation, assign_io_queue, alloc_io_queue, thread_affinity, heapprof_enabled, mbind, backend_selector, reactor_cfg] {
3914 try {
11fdf7f2
TL
3915 auto thread_name = seastar::format("reactor-{}", i);
3916 pthread_setname_np(pthread_self(), thread_name.c_str());
3917 if (thread_affinity) {
3918 smp::pin(allocation.cpu_id);
3919 }
3920 memory::configure(allocation.mem, mbind, hugepages_path);
9f95a23c
TL
3921 if (heapprof_enabled) {
3922 memory::set_heap_profiling_enabled(heapprof_enabled);
3923 }
11fdf7f2
TL
3924 sigset_t mask;
3925 sigfillset(&mask);
3926 for (auto sig : { SIGSEGV }) {
3927 sigdelset(&mask, sig);
3928 }
3929 auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
3930 throw_pthread_error(r);
9f95a23c
TL
3931 init_default_smp_service_group(i);
3932 allocate_reactor(i, backend_selector, reactor_cfg);
11fdf7f2
TL
3933 _reactors[i] = &engine();
3934 for (auto& dev_id : disk_config.device_ids()) {
3935 alloc_io_queue(i, dev_id);
3936 }
3937 reactors_registered.wait();
3938 smp_queues_constructed.wait();
3939 start_all_queues();
3940 for (auto& dev_id : disk_config.device_ids()) {
3941 assign_io_queue(i, dev_id);
3942 }
3943 inited.wait();
3944 engine().configure(configuration);
3945 engine().run();
9f95a23c
TL
3946 } catch (const std::exception& e) {
3947 seastar_logger.error(e.what());
3948 _exit(1);
3949 }
11fdf7f2
TL
3950 });
3951 }
3952
9f95a23c
TL
3953 init_default_smp_service_group(0);
3954 try {
3955 allocate_reactor(0, backend_selector, reactor_cfg);
3956 } catch (const std::exception& e) {
3957 seastar_logger.error(e.what());
3958 _exit(1);
3959 }
3960
11fdf7f2
TL
3961 _reactors[0] = &engine();
3962 for (auto& dev_id : disk_config.device_ids()) {
3963 alloc_io_queue(0, dev_id);
3964 }
3965
3966#ifdef SEASTAR_HAVE_DPDK
3967 if (_using_dpdk) {
3968 auto it = _thread_loops.begin();
3969 RTE_LCORE_FOREACH_SLAVE(i) {
3970 rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&*(it++)), i);
3971 }
3972 }
3973#endif
3974
3975 reactors_registered.wait();
3976 smp::_qs = decltype(smp::_qs){new smp_message_queue* [smp::count], qs_deleter{}};
3977 for(unsigned i = 0; i < smp::count; i++) {
3978 smp::_qs[i] = reinterpret_cast<smp_message_queue*>(operator new[] (sizeof(smp_message_queue) * smp::count));
3979 for (unsigned j = 0; j < smp::count; ++j) {
3980 new (&smp::_qs[i][j]) smp_message_queue(_reactors[j], _reactors[i]);
3981 }
3982 }
3983 alien::smp::_qs = alien::smp::create_qs(_reactors);
3984 smp_queues_constructed.wait();
3985 start_all_queues();
3986 for (auto& dev_id : disk_config.device_ids()) {
3987 assign_io_queue(0, dev_id);
3988 }
3989 inited.wait();
3990
3991 engine().configure(configuration);
3992 // The raw `new` is necessary because of the private constructor of `lowres_clock_impl`.
3993 engine()._lowres_clock_impl = std::unique_ptr<lowres_clock_impl>(new lowres_clock_impl);
3994}
3995
3996bool smp::poll_queues() {
3997 size_t got = 0;
3998 for (unsigned i = 0; i < count; i++) {
f67539c2
TL
3999 if (this_shard_id() != i) {
4000 auto& rxq = _qs[this_shard_id()][i];
11fdf7f2
TL
4001 rxq.flush_response_batch();
4002 got += rxq.has_unflushed_responses();
4003 got += rxq.process_incoming();
f67539c2 4004 auto& txq = _qs[i][this_shard_id()];
11fdf7f2 4005 txq.flush_request_batch();
9f95a23c 4006 got += txq.process_completions(i);
11fdf7f2
TL
4007 }
4008 }
4009 return got != 0;
4010}
4011
4012bool smp::pure_poll_queues() {
4013 for (unsigned i = 0; i < count; i++) {
f67539c2
TL
4014 if (this_shard_id() != i) {
4015 auto& rxq = _qs[this_shard_id()][i];
11fdf7f2 4016 rxq.flush_response_batch();
f67539c2 4017 auto& txq = _qs[i][this_shard_id()];
11fdf7f2
TL
4018 txq.flush_request_batch();
4019 if (rxq.pure_poll_rx() || txq.pure_poll_tx() || rxq.has_unflushed_responses()) {
4020 return true;
4021 }
4022 }
4023 }
4024 return false;
4025}
4026
4027internal::preemption_monitor bootstrap_preemption_monitor{};
4028__thread const internal::preemption_monitor* g_need_preempt = &bootstrap_preemption_monitor;
4029
4030__thread reactor* local_engine;
4031
f67539c2 4032void report_exception(std::string_view message, std::exception_ptr eptr) noexcept {
11fdf7f2
TL
4033 seastar_logger.error("{}: {}", message, eptr);
4034}
4035
f67539c2 4036future<> check_direct_io_support(std::string_view path) noexcept {
11fdf7f2
TL
4037 struct w {
4038 sstring path;
4039 open_flags flags;
4040 std::function<future<>()> cleanup;
4041
f67539c2 4042 static w parse(sstring path, std::optional<directory_entry_type> type) {
11fdf7f2
TL
4043 if (!type) {
4044 throw std::invalid_argument(format("Could not open file at {}. Make sure it exists", path));
4045 }
4046
4047 if (type == directory_entry_type::directory) {
4048 auto fpath = path + "/.o_direct_test";
4049 return w{fpath, open_flags::wo | open_flags::create | open_flags::truncate, [fpath] { return remove_file(fpath); }};
4050 } else if ((type == directory_entry_type::regular) || (type == directory_entry_type::link)) {
4051 return w{path, open_flags::ro, [] { return make_ready_future<>(); }};
4052 } else {
4053 throw std::invalid_argument(format("{} neither a directory nor file. Can't be opened with O_DIRECT", path));
4054 }
4055 };
4056 };
4057
f67539c2
TL
4058 // Allocating memory for a sstring can throw, hence the futurize_invoke
4059 return futurize_invoke([path] {
4060 return engine().file_type(path).then([path = sstring(path)] (auto type) {
4061 auto w = w::parse(path, type);
4062 return open_file_dma(w.path, w.flags).then_wrapped([path = w.path, cleanup = std::move(w.cleanup)] (future<file> f) {
4063 try {
4064 auto fd = f.get0();
4065 return cleanup().finally([fd = std::move(fd)] () mutable {
4066 return fd.close();
4067 });
4068 } catch (std::system_error& e) {
4069 if (e.code() == std::error_code(EINVAL, std::system_category())) {
4070 report_exception(format("Could not open file at {}. Does your filesystem support O_DIRECT?", path), std::current_exception());
4071 }
4072 throw;
11fdf7f2 4073 }
11fdf7f2
TL
4074 });
4075 });
4076 });
4077}
4078
11fdf7f2
TL
4079server_socket listen(socket_address sa) {
4080 return engine().listen(sa);
4081}
4082
4083server_socket listen(socket_address sa, listen_options opts) {
4084 return engine().listen(sa, opts);
4085}
4086
4087future<connected_socket> connect(socket_address sa) {
4088 return engine().connect(sa);
4089}
4090
4091future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) {
4092 return engine().connect(sa, local, proto);
4093}
4094
f67539c2
TL
4095socket make_socket() {
4096 return engine().net().socket();
4097}
4098
4099net::udp_channel make_udp_channel() {
4100 return engine().net().make_udp_channel();
4101}
4102
4103net::udp_channel make_udp_channel(const socket_address& local) {
4104 return engine().net().make_udp_channel(local);
4105}
4106
9f95a23c
TL
4107void reactor::add_high_priority_task(task* t) noexcept {
4108 add_urgent_task(t);
11fdf7f2
TL
4109 // break .then() chains
4110 request_preemption();
4111}
4112
f67539c2
TL
4113
4114void set_idle_cpu_handler(idle_cpu_handler&& handler) {
4115 engine().set_idle_cpu_handler(std::move(handler));
4116}
4117
11fdf7f2
TL
4118static
4119bool
4120virtualized() {
9f95a23c 4121 return fs::exists("/sys/hypervisor/type");
11fdf7f2
TL
4122}
4123
4124std::chrono::nanoseconds
4125reactor::calculate_poll_time() {
4126 // In a non-virtualized environment, select a poll time
4127 // that is competitive with halt/unhalt.
4128 //
4129 // In a virutalized environment, IPIs are slow and dominate
4130 // sleep/wake (mprotect/tgkill), so increase poll time to reduce
4131 // so we don't sleep in a request/reply workload
4132 return virtualized() ? 2000us : 200us;
4133}
4134
f67539c2
TL
4135future<> later() noexcept {
4136 memory::scoped_critical_alloc_section _;
11fdf7f2 4137 engine().force_poll();
f67539c2
TL
4138 auto tsk = make_task(default_scheduling_group(), [] {});
4139 schedule(tsk);
4140 return tsk->get_future();
11fdf7f2
TL
4141}
4142
4143void add_to_flush_poller(output_stream<char>* os) {
4144 engine()._flush_batching.emplace_back(os);
4145}
4146
4147reactor::sched_clock::duration reactor::total_idle_time() {
4148 return _total_idle;
4149}
4150
4151reactor::sched_clock::duration reactor::total_busy_time() {
4152 return sched_clock::now() - _start_time - _total_idle;
4153}
4154
4155std::chrono::nanoseconds reactor::total_steal_time() {
4156 // Steal time: this mimics the concept some Hypervisors have about Steal time.
4157 // That is the time in which a VM has something to run, but is not running because some other
4158 // process (another VM or the hypervisor itself) is in control.
4159 //
4160 // For us, we notice that during the time in which we were not sleeping (either running or busy
4161 // polling while idle), we should be accumulating thread runtime. If we are not, that's because
4162 // someone stole it from us.
4163 //
4164 // Because this is totally in userspace we can miss some events. For instance, if the seastar
4165 // process is ready to run but the kernel hasn't scheduled us yet, that would be technically
4166 // steal time but we have no ways to account it.
4167 //
4168 // But what we have here should be good enough and at least has a well defined meaning.
4169 return std::chrono::duration_cast<std::chrono::nanoseconds>(sched_clock::now() - _start_time - _total_sleep) -
4170 std::chrono::duration_cast<std::chrono::nanoseconds>(thread_cputime_clock::now().time_since_epoch());
4171}
4172
4173static std::atomic<unsigned long> s_used_scheduling_group_ids_bitmap{3}; // 0=main, 1=atexit
9f95a23c 4174static std::atomic<unsigned long> s_next_scheduling_group_specific_key{0};
11fdf7f2
TL
4175
4176static
f67539c2
TL
4177int
4178allocate_scheduling_group_id() noexcept {
9f95a23c 4179 static_assert(max_scheduling_groups() <= std::numeric_limits<unsigned long>::digits, "more scheduling groups than available bits");
11fdf7f2
TL
4180 auto b = s_used_scheduling_group_ids_bitmap.load(std::memory_order_relaxed);
4181 auto nb = b;
4182 unsigned i = 0;
4183 do {
4184 if (__builtin_popcountl(b) == max_scheduling_groups()) {
f67539c2 4185 return -1;
11fdf7f2
TL
4186 }
4187 i = count_trailing_zeros(~b);
4188 nb = b | (1ul << i);
4189 } while (!s_used_scheduling_group_ids_bitmap.compare_exchange_weak(b, nb, std::memory_order_relaxed));
4190 return i;
4191}
4192
9f95a23c
TL
4193static
4194unsigned long
f67539c2 4195allocate_scheduling_group_specific_key() noexcept {
9f95a23c
TL
4196 return s_next_scheduling_group_specific_key.fetch_add(1, std::memory_order_relaxed);
4197}
4198
11fdf7f2
TL
4199static
4200void
f67539c2 4201deallocate_scheduling_group_id(unsigned id) noexcept {
11fdf7f2
TL
4202 s_used_scheduling_group_ids_bitmap.fetch_and(~(1ul << id), std::memory_order_relaxed);
4203}
4204
4205void
9f95a23c 4206reactor::allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key) {
f67539c2
TL
4207 auto& sg_data = _scheduling_group_specific_data;
4208 auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
4209 this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key.id()+1));
4210 this_sg.specific_vals[key.id()] =
4211 aligned_alloc(sg_data.scheduling_group_key_configs[key.id()].alignment,
4212 sg_data.scheduling_group_key_configs[key.id()].allocation_size);
4213 if (!this_sg.specific_vals[key.id()]) {
9f95a23c
TL
4214 std::abort();
4215 }
f67539c2
TL
4216 if (sg_data.scheduling_group_key_configs[key.id()].constructor) {
4217 sg_data.scheduling_group_key_configs[key.id()].constructor(this_sg.specific_vals[key.id()]);
9f95a23c
TL
4218 }
4219}
4220
4221future<>
11fdf7f2 4222reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, float shares) {
f67539c2
TL
4223 auto& sg_data = _scheduling_group_specific_data;
4224 auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
4225 this_sg.queue_is_initialized = true;
11fdf7f2
TL
4226 _task_queues.resize(std::max<size_t>(_task_queues.size(), sg._id + 1));
4227 _task_queues[sg._id] = std::make_unique<task_queue>(sg._id, name, shares);
9f95a23c
TL
4228 unsigned long num_keys = s_next_scheduling_group_specific_key.load(std::memory_order_relaxed);
4229
4230 return with_scheduling_group(sg, [this, num_keys, sg] () {
4231 for (unsigned long key_id = 0; key_id < num_keys; key_id++) {
4232 allocate_scheduling_group_specific_data(sg, scheduling_group_key(key_id));
4233 }
4234 });
11fdf7f2
TL
4235}
4236
9f95a23c
TL
4237future<>
4238reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) {
f67539c2
TL
4239 auto& sg_data = _scheduling_group_specific_data;
4240 sg_data.scheduling_group_key_configs.resize(std::max<size_t>(sg_data.scheduling_group_key_configs.size(), key.id() + 1));
4241 sg_data.scheduling_group_key_configs[key.id()] = cfg;
9f95a23c
TL
4242 return parallel_for_each(_task_queues, [this, cfg, key] (std::unique_ptr<task_queue>& tq) {
4243 if (tq) {
4244 scheduling_group sg = scheduling_group(tq->_id);
4245 return with_scheduling_group(sg, [this, key, sg] () {
4246 allocate_scheduling_group_specific_data(sg, key);
4247 });
4248 }
4249 return make_ready_future();
4250 });
4251}
4252
4253future<>
11fdf7f2 4254reactor::destroy_scheduling_group(scheduling_group sg) {
9f95a23c 4255 return with_scheduling_group(sg, [this, sg] () {
f67539c2
TL
4256 auto& sg_data = _scheduling_group_specific_data;
4257 auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
4258 for (unsigned long key_id = 0; key_id < sg_data.scheduling_group_key_configs.size(); key_id++) {
4259 void* val = this_sg.specific_vals[key_id];
9f95a23c 4260 if (val) {
f67539c2
TL
4261 if (sg_data.scheduling_group_key_configs[key_id].destructor) {
4262 sg_data.scheduling_group_key_configs[key_id].destructor(val);
9f95a23c
TL
4263 }
4264 free(val);
f67539c2 4265 this_sg.specific_vals[key_id] = nullptr;
9f95a23c
TL
4266 }
4267 }
4268 }).then( [this, sg] () {
f67539c2
TL
4269 auto& sg_data = _scheduling_group_specific_data;
4270 auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
4271 this_sg.queue_is_initialized = false;
9f95a23c
TL
4272 _task_queues[sg._id].reset();
4273 });
4274
4275}
4276
4277void
f67539c2
TL
4278internal::no_such_scheduling_group(scheduling_group sg) {
4279 throw std::invalid_argument(format("The scheduling group does not exist ({})", internal::scheduling_group_index(sg)));
11fdf7f2
TL
4280}
4281
4282const sstring&
f67539c2 4283scheduling_group::name() const noexcept {
11fdf7f2
TL
4284 return engine()._task_queues[_id]->_name;
4285}
4286
4287void
f67539c2 4288scheduling_group::set_shares(float shares) noexcept {
11fdf7f2
TL
4289 engine()._task_queues[_id]->set_shares(shares);
4290}
4291
4292future<scheduling_group>
f67539c2
TL
4293create_scheduling_group(sstring name, float shares) noexcept {
4294 auto aid = allocate_scheduling_group_id();
4295 if (aid < 0) {
4296 return make_exception_future<scheduling_group>(std::runtime_error("Scheduling group limit exceeded"));
4297 }
4298 auto id = static_cast<unsigned>(aid);
11fdf7f2
TL
4299 assert(id < max_scheduling_groups());
4300 auto sg = scheduling_group(id);
4301 return smp::invoke_on_all([sg, name, shares] {
9f95a23c 4302 return engine().init_scheduling_group(sg, name, shares);
11fdf7f2
TL
4303 }).then([sg] {
4304 return make_ready_future<scheduling_group>(sg);
4305 });
4306}
4307
9f95a23c 4308future<scheduling_group_key>
f67539c2 4309scheduling_group_key_create(scheduling_group_key_config cfg) noexcept {
9f95a23c
TL
4310 scheduling_group_key key = allocate_scheduling_group_specific_key();
4311 return smp::invoke_on_all([key, cfg] {
4312 return engine().init_new_scheduling_group_key(key, cfg);
4313 }).then([key] {
4314 return make_ready_future<scheduling_group_key>(key);
4315 });
4316}
4317
4318future<>
4319rename_priority_class(io_priority_class pc, sstring new_name) {
4320 return reactor::rename_priority_class(pc, new_name);
4321}
4322
11fdf7f2 4323future<>
f67539c2 4324destroy_scheduling_group(scheduling_group sg) noexcept {
11fdf7f2 4325 if (sg == default_scheduling_group()) {
f67539c2 4326 return make_exception_future<>(make_backtraced_exception_ptr<std::runtime_error>("Attempt to destroy the default scheduling group"));
11fdf7f2
TL
4327 }
4328 if (sg == current_scheduling_group()) {
f67539c2 4329 return make_exception_future<>(make_backtraced_exception_ptr<std::runtime_error>("Attempt to destroy the current scheduling group"));
11fdf7f2
TL
4330 }
4331 return smp::invoke_on_all([sg] {
9f95a23c 4332 return engine().destroy_scheduling_group(sg);
11fdf7f2
TL
4333 }).then([sg] {
4334 deallocate_scheduling_group_id(sg._id);
4335 });
4336}
4337
9f95a23c 4338future<>
f67539c2 4339rename_scheduling_group(scheduling_group sg, sstring new_name) noexcept {
9f95a23c 4340 if (sg == default_scheduling_group()) {
f67539c2 4341 return make_exception_future<>(make_backtraced_exception_ptr<std::runtime_error>("Attempt to rename the default scheduling group"));
9f95a23c
TL
4342 }
4343 return smp::invoke_on_all([sg, new_name] {
4344 engine()._task_queues[sg._id]->rename(new_name);
4345 });
4346}
11fdf7f2
TL
4347
4348namespace internal {
4349
4350inline
4351std::chrono::steady_clock::duration
4352timeval_to_duration(::timeval tv) {
4353 return std::chrono::seconds(tv.tv_sec) + std::chrono::microseconds(tv.tv_usec);
4354}
4355
4356class reactor_stall_sampler : public reactor::pollfn {
4357 std::chrono::steady_clock::time_point _run_start;
4358 ::rusage _run_start_rusage;
4359 uint64_t _kernel_stalls = 0;
4360 std::chrono::steady_clock::duration _nonsleep_cpu_time = {};
4361 std::chrono::steady_clock::duration _nonsleep_wall_time = {};
4362private:
4363 static ::rusage get_rusage() {
4364 struct ::rusage ru;
4365 ::getrusage(RUSAGE_THREAD, &ru);
4366 return ru;
4367 }
4368 static std::chrono::steady_clock::duration cpu_time(const ::rusage& ru) {
4369 return timeval_to_duration(ru.ru_stime) + timeval_to_duration(ru.ru_utime);
4370 }
4371 void mark_run_start() {
4372 _run_start = std::chrono::steady_clock::now();
4373 _run_start_rusage = get_rusage();
4374 }
4375 void mark_run_end() {
4376 auto start_nvcsw = _run_start_rusage.ru_nvcsw;
4377 auto start_cpu_time = cpu_time(_run_start_rusage);
4378 auto start_time = _run_start;
4379 _run_start = std::chrono::steady_clock::now();
4380 _run_start_rusage = get_rusage();
4381 _kernel_stalls += _run_start_rusage.ru_nvcsw - start_nvcsw;
4382 _nonsleep_cpu_time += cpu_time(_run_start_rusage) - start_cpu_time;
4383 _nonsleep_wall_time += _run_start - start_time;
4384 }
4385public:
4386 reactor_stall_sampler() { mark_run_start(); }
4387 virtual bool poll() override { return false; }
4388 virtual bool pure_poll() override { return false; }
4389 virtual bool try_enter_interrupt_mode() override {
4390 // try_enter_interrupt_mode marks the end of a reactor run that should be context-switch free
4391 mark_run_end();
4392 return true;
4393 }
4394 virtual void exit_interrupt_mode() override {
4395 // start a reactor run that should be context switch free
4396 mark_run_start();
4397 }
4398 stall_report report() const {
4399 stall_report r;
4400 // mark_run_end() with an immediate mark_run_start() is logically a no-op,
4401 // but each one of them has an effect, so they can't be marked const
4402 const_cast<reactor_stall_sampler*>(this)->mark_run_end();
4403 r.kernel_stalls = _kernel_stalls;
4404 r.run_wall_time = _nonsleep_wall_time;
4405 r.stall_time = _nonsleep_wall_time - _nonsleep_cpu_time;
4406 const_cast<reactor_stall_sampler*>(this)->mark_run_start();
4407 return r;
4408 }
4409};
4410
4411future<stall_report>
4412report_reactor_stalls(noncopyable_function<future<> ()> uut) {
4413 auto reporter = std::make_unique<reactor_stall_sampler>();
4414 auto p_reporter = reporter.get();
4415 auto poller = reactor::poller(std::move(reporter));
4416 return uut().then([poller = std::move(poller), p_reporter] () mutable {
4417 return p_reporter->report();
4418 });
4419}
4420
4421std::ostream& operator<<(std::ostream& os, const stall_report& sr) {
4422 auto to_ms = [] (std::chrono::steady_clock::duration d) -> float {
4423 return std::chrono::duration<float>(d) / 1ms;
4424 };
4425 return os << format("{} stalls, {} ms stall time, {} ms run time", sr.kernel_stalls, to_ms(sr.stall_time), to_ms(sr.run_wall_time));
4426}
4427
4428}
4429
f67539c2
TL
4430#ifdef SEASTAR_TASK_BACKTRACE
4431
4432void task::make_backtrace() noexcept {
4433 memory::disable_backtrace_temporarily dbt;
4434 try {
4435 _bt = make_lw_shared<simple_backtrace>(current_backtrace_tasklocal());
4436 } catch (...) {
4437 _bt = nullptr;
4438 }
4439}
4440
4441#endif
4442
11fdf7f2 4443}