]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2014 Cloudius Systems | |
20 | */ | |
21 | ||
22 | #define __user /* empty */ // for xfs includes, below | |
23 | ||
24 | #include <cinttypes> | |
25 | #include <sys/syscall.h> | |
26 | #include <sys/vfs.h> | |
27 | #include <sys/statfs.h> | |
28 | #include <sys/time.h> | |
29 | #include <sys/resource.h> | |
f67539c2 | 30 | #include <sys/inotify.h> |
11fdf7f2 TL |
31 | #include <seastar/core/task.hh> |
32 | #include <seastar/core/reactor.hh> | |
33 | #include <seastar/core/memory.hh> | |
34 | #include <seastar/core/posix.hh> | |
35 | #include <seastar/net/packet.hh> | |
36 | #include <seastar/net/stack.hh> | |
37 | #include <seastar/net/posix-stack.hh> | |
38 | #include <seastar/net/native-stack.hh> | |
39 | #include <seastar/core/resource.hh> | |
40 | #include <seastar/core/print.hh> | |
41 | #include "core/scollectd-impl.hh" | |
42 | #include <seastar/util/conversions.hh> | |
f67539c2 TL |
43 | #include <seastar/core/loop.hh> |
44 | #include <seastar/core/with_scheduling_group.hh> | |
11fdf7f2 | 45 | #include <seastar/core/thread.hh> |
f67539c2 | 46 | #include <seastar/core/make_task.hh> |
11fdf7f2 TL |
47 | #include <seastar/core/systemwide_memory_barrier.hh> |
48 | #include <seastar/core/report_exception.hh> | |
49 | #include <seastar/core/stall_sampler.hh> | |
50 | #include <seastar/core/thread_cputime_clock.hh> | |
9f95a23c TL |
51 | #include <seastar/core/abort_on_ebadf.hh> |
52 | #include <seastar/core/io_queue.hh> | |
53 | #include <seastar/core/internal/io_desc.hh> | |
f67539c2 TL |
54 | #include <seastar/core/internal/buffer_allocator.hh> |
55 | #include <seastar/core/scheduling_specific.hh> | |
11fdf7f2 TL |
56 | #include <seastar/util/log.hh> |
57 | #include "core/file-impl.hh" | |
9f95a23c TL |
58 | #include "core/reactor_backend.hh" |
59 | #include "core/syscall_result.hh" | |
60 | #include "core/thread_pool.hh" | |
11fdf7f2 | 61 | #include "syscall_work_queue.hh" |
9f95a23c TL |
62 | #include "cgroup.hh" |
63 | #include "uname.hh" | |
11fdf7f2 TL |
64 | #include <cassert> |
65 | #include <unistd.h> | |
66 | #include <fcntl.h> | |
67 | #include <sys/eventfd.h> | |
68 | #include <sys/poll.h> | |
69 | #include <boost/lexical_cast.hpp> | |
11fdf7f2 TL |
70 | #include <boost/thread/barrier.hpp> |
71 | #include <boost/algorithm/string/classification.hpp> | |
72 | #include <boost/algorithm/string/split.hpp> | |
73 | #include <boost/iterator/counting_iterator.hpp> | |
74 | #include <boost/range/numeric.hpp> | |
75 | #include <boost/range/algorithm/sort.hpp> | |
76 | #include <boost/range/algorithm/remove_if.hpp> | |
9f95a23c | 77 | #include <boost/range/algorithm/find_if.hpp> |
11fdf7f2 TL |
78 | #include <boost/algorithm/clamp.hpp> |
79 | #include <boost/range/adaptor/transformed.hpp> | |
80 | #include <boost/range/adaptor/map.hpp> | |
81 | #include <boost/version.hpp> | |
82 | #include <atomic> | |
11fdf7f2 TL |
83 | #include <dirent.h> |
84 | #include <linux/types.h> // for xfs, below | |
85 | #include <sys/ioctl.h> | |
86 | #include <xfs/linux.h> | |
87 | #define min min /* prevent xfs.h from defining min() as a macro */ | |
88 | #include <xfs/xfs.h> | |
89 | #undef min | |
90 | #ifdef SEASTAR_HAVE_DPDK | |
91 | #include <seastar/core/dpdk_rte.hh> | |
92 | #include <rte_lcore.h> | |
93 | #include <rte_launch.h> | |
94 | #endif | |
95 | #include <seastar/core/prefetch.hh> | |
96 | #include <exception> | |
97 | #include <regex> | |
9f95a23c | 98 | #include <fstream> |
11fdf7f2 TL |
99 | #ifdef __GNUC__ |
100 | #include <iostream> | |
101 | #include <system_error> | |
102 | #include <cxxabi.h> | |
103 | #endif | |
104 | ||
105 | #ifdef SEASTAR_SHUFFLE_TASK_QUEUE | |
106 | #include <random> | |
107 | #endif | |
108 | ||
109 | #include <sys/mman.h> | |
110 | #include <sys/utsname.h> | |
111 | #include <linux/falloc.h> | |
112 | #include <linux/magic.h> | |
113 | #include <seastar/util/backtrace.hh> | |
114 | #include <seastar/util/spinlock.hh> | |
115 | #include <seastar/util/print_safe.hh> | |
116 | #include <sys/sdt.h> | |
117 | ||
118 | #ifdef HAVE_OSV | |
119 | #include <osv/newpoll.hh> | |
120 | #endif | |
121 | ||
122 | #if defined(__x86_64__) || defined(__i386__) | |
123 | #include <xmmintrin.h> | |
124 | #endif | |
125 | ||
126 | #include <seastar/util/defer.hh> | |
127 | #include <seastar/core/alien.hh> | |
128 | #include <seastar/core/metrics.hh> | |
129 | #include <seastar/core/execution_stage.hh> | |
130 | #include <seastar/core/exception_hacks.hh> | |
131 | #include "stall_detector.hh" | |
f67539c2 | 132 | #include <seastar/util/memory_diagnostics.hh> |
11fdf7f2 TL |
133 | |
134 | #include <yaml-cpp/yaml.h> | |
135 | ||
136 | #ifdef SEASTAR_TASK_HISTOGRAM | |
137 | #include <typeinfo> | |
138 | #endif | |
139 | ||
140 | namespace seastar { | |
141 | ||
142 | struct mountpoint_params { | |
143 | std::string mountpoint = "none"; | |
144 | uint64_t read_bytes_rate = std::numeric_limits<uint64_t>::max(); | |
145 | uint64_t write_bytes_rate = std::numeric_limits<uint64_t>::max(); | |
146 | uint64_t read_req_rate = std::numeric_limits<uint64_t>::max(); | |
147 | uint64_t write_req_rate = std::numeric_limits<uint64_t>::max(); | |
148 | uint64_t num_io_queues = 0; // calculated | |
149 | }; | |
150 | ||
151 | } | |
9f95a23c | 152 | |
11fdf7f2 TL |
153 | namespace YAML { |
154 | template<> | |
155 | struct convert<seastar::mountpoint_params> { | |
156 | static bool decode(const Node& node, seastar::mountpoint_params& mp) { | |
157 | using namespace seastar; | |
158 | mp.mountpoint = node["mountpoint"].as<std::string>().c_str(); | |
159 | mp.read_bytes_rate = parse_memory_size(node["read_bandwidth"].as<std::string>()); | |
160 | mp.read_req_rate = parse_memory_size(node["read_iops"].as<std::string>()); | |
161 | mp.write_bytes_rate = parse_memory_size(node["write_bandwidth"].as<std::string>()); | |
162 | mp.write_req_rate = parse_memory_size(node["write_iops"].as<std::string>()); | |
163 | return true; | |
164 | } | |
165 | }; | |
166 | } | |
167 | ||
168 | namespace seastar { | |
169 | ||
9f95a23c TL |
170 | seastar::logger seastar_logger("seastar"); |
171 | seastar::logger sched_logger("scheduler"); | |
172 | ||
f67539c2 TL |
173 | shard_id reactor::cpu_id() const { |
174 | assert(_id == this_shard_id()); | |
175 | return _id; | |
176 | } | |
177 | ||
11fdf7f2 TL |
178 | io_priority_class |
179 | reactor::register_one_priority_class(sstring name, uint32_t shares) { | |
180 | return io_queue::register_one_priority_class(std::move(name), shares); | |
181 | } | |
182 | ||
183 | future<> | |
184 | reactor::update_shares_for_class(io_priority_class pc, uint32_t shares) { | |
185 | return parallel_for_each(_io_queues, [pc, shares] (auto& queue) { | |
186 | return queue.second->update_shares_for_class(pc, shares); | |
187 | }); | |
188 | } | |
189 | ||
9f95a23c | 190 | future<> |
f67539c2 | 191 | reactor::rename_priority_class(io_priority_class pc, sstring new_name) noexcept { |
9f95a23c | 192 | |
f67539c2 | 193 | return futurize_invoke([pc, new_name = std::move(new_name)] () mutable { |
9f95a23c TL |
194 | // Taking the lock here will prevent from newly registered classes |
195 | // to register under the old name (and will prevent undefined | |
196 | // behavior since this array is shared cross shards. However, it | |
197 | // doesn't prevent the case where a newly registered class (that | |
198 | // got registered right after the lock release) will be unnecessarily | |
199 | // renamed. This is not a real problem and it is a lot better than | |
200 | // holding the lock until all cross shard activity is over. | |
201 | ||
202 | try { | |
f67539c2 TL |
203 | if (!io_queue::rename_one_priority_class(pc, new_name)) { |
204 | return make_ready_future<>(); | |
9f95a23c | 205 | } |
9f95a23c TL |
206 | } catch (...) { |
207 | sched_logger.error("exception while trying to rename priority group with id {} to \"{}\" ({})", | |
208 | pc.id(), new_name, std::current_exception()); | |
209 | std::rethrow_exception(std::current_exception()); | |
210 | } | |
f67539c2 | 211 | return smp::invoke_on_all([pc, new_name = std::move(new_name)] { |
9f95a23c TL |
212 | for (auto&& queue : engine()._io_queues) { |
213 | queue.second->rename_priority_class(pc, new_name); | |
214 | } | |
215 | }); | |
216 | }); | |
217 | } | |
218 | ||
219 | future<std::tuple<pollable_fd, socket_address>> | |
220 | reactor::do_accept(pollable_fd_state& listenfd) { | |
11fdf7f2 TL |
221 | return readable_or_writeable(listenfd).then([this, &listenfd] () mutable { |
222 | socket_address sa; | |
9f95a23c TL |
223 | listenfd.maybe_no_more_recv(); |
224 | auto maybe_fd = listenfd.fd.try_accept(sa, SOCK_NONBLOCK | SOCK_CLOEXEC); | |
225 | if (!maybe_fd) { | |
226 | // We speculated that we will have an another connection, but got a false | |
227 | // positive. Try again without speculation. | |
228 | return do_accept(listenfd); | |
229 | } | |
230 | // Speculate that there is another connection on this listening socket, to avoid | |
231 | // a task-quota delay. Usually this will fail, but accept is a rare-enough operation | |
232 | // that it is worth the false positive in order to withstand a connection storm | |
233 | // without having to accept at a rate of 1 per task quota. | |
234 | listenfd.speculate_epoll(EPOLLIN); | |
235 | pollable_fd pfd(std::move(*maybe_fd), pollable_fd::speculation(EPOLLOUT)); | |
236 | return make_ready_future<std::tuple<pollable_fd, socket_address>>(std::make_tuple(std::move(pfd), std::move(sa))); | |
237 | }); | |
238 | } | |
239 | ||
240 | future<> reactor::do_connect(pollable_fd_state& pfd, socket_address& sa) { | |
241 | pfd.fd.connect(sa.u.sa, sa.length()); | |
242 | return pfd.writeable().then([&pfd]() mutable { | |
243 | auto err = pfd.fd.getsockopt<int>(SOL_SOCKET, SO_ERROR); | |
244 | if (err != 0) { | |
245 | throw std::system_error(err, std::system_category()); | |
246 | } | |
247 | return make_ready_future<>(); | |
11fdf7f2 TL |
248 | }); |
249 | } | |
250 | ||
251 | future<size_t> | |
9f95a23c | 252 | reactor::do_read_some(pollable_fd_state& fd, void* buffer, size_t len) { |
11fdf7f2 TL |
253 | return readable(fd).then([this, &fd, buffer, len] () mutable { |
254 | auto r = fd.fd.read(buffer, len); | |
255 | if (!r) { | |
9f95a23c | 256 | return do_read_some(fd, buffer, len); |
11fdf7f2 TL |
257 | } |
258 | if (size_t(*r) == len) { | |
259 | fd.speculate_epoll(EPOLLIN); | |
260 | } | |
261 | return make_ready_future<size_t>(*r); | |
262 | }); | |
263 | } | |
264 | ||
f67539c2 TL |
265 | future<temporary_buffer<char>> |
266 | reactor::do_read_some(pollable_fd_state& fd, internal::buffer_allocator* ba) { | |
267 | return fd.readable().then([this, &fd, ba] { | |
268 | auto buffer = ba->allocate_buffer(); | |
269 | auto r = fd.fd.read(buffer.get_write(), buffer.size()); | |
270 | if (!r) { | |
271 | // Speculation failure, try again with real polling this time | |
272 | // Note we release the buffer and will reallocate it when poll | |
273 | // completes. | |
274 | return do_read_some(fd, ba); | |
275 | } | |
276 | if (size_t(*r) == buffer.size()) { | |
277 | fd.speculate_epoll(EPOLLIN); | |
278 | } | |
279 | buffer.trim(*r); | |
280 | return make_ready_future<temporary_buffer<char>>(std::move(buffer)); | |
281 | }); | |
282 | } | |
283 | ||
11fdf7f2 | 284 | future<size_t> |
9f95a23c | 285 | reactor::do_read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) { |
11fdf7f2 TL |
286 | return readable(fd).then([this, &fd, iov = iov] () mutable { |
287 | ::msghdr mh = {}; | |
288 | mh.msg_iov = &iov[0]; | |
289 | mh.msg_iovlen = iov.size(); | |
290 | auto r = fd.fd.recvmsg(&mh, 0); | |
291 | if (!r) { | |
9f95a23c | 292 | return do_read_some(fd, iov); |
11fdf7f2 TL |
293 | } |
294 | if (size_t(*r) == iovec_len(iov)) { | |
295 | fd.speculate_epoll(EPOLLIN); | |
296 | } | |
297 | return make_ready_future<size_t>(*r); | |
298 | }); | |
299 | } | |
300 | ||
301 | future<size_t> | |
9f95a23c | 302 | reactor::do_write_some(pollable_fd_state& fd, const void* buffer, size_t len) { |
11fdf7f2 TL |
303 | return writeable(fd).then([this, &fd, buffer, len] () mutable { |
304 | auto r = fd.fd.send(buffer, len, MSG_NOSIGNAL); | |
305 | if (!r) { | |
9f95a23c | 306 | return do_write_some(fd, buffer, len); |
11fdf7f2 TL |
307 | } |
308 | if (size_t(*r) == len) { | |
309 | fd.speculate_epoll(EPOLLOUT); | |
310 | } | |
311 | return make_ready_future<size_t>(*r); | |
312 | }); | |
313 | } | |
314 | ||
9f95a23c TL |
315 | future<size_t> |
316 | reactor::do_write_some(pollable_fd_state& fd, net::packet& p) { | |
317 | return writeable(fd).then([this, &fd, &p] () mutable { | |
318 | static_assert(offsetof(iovec, iov_base) == offsetof(net::fragment, base) && | |
319 | sizeof(iovec::iov_base) == sizeof(net::fragment::base) && | |
320 | offsetof(iovec, iov_len) == offsetof(net::fragment, size) && | |
321 | sizeof(iovec::iov_len) == sizeof(net::fragment::size) && | |
322 | alignof(iovec) == alignof(net::fragment) && | |
323 | sizeof(iovec) == sizeof(net::fragment) | |
324 | , "net::fragment and iovec should be equivalent"); | |
325 | ||
326 | iovec* iov = reinterpret_cast<iovec*>(p.fragment_array()); | |
327 | msghdr mh = {}; | |
328 | mh.msg_iov = iov; | |
329 | mh.msg_iovlen = std::min<size_t>(p.nr_frags(), IOV_MAX); | |
330 | auto r = fd.fd.sendmsg(&mh, MSG_NOSIGNAL); | |
331 | if (!r) { | |
332 | return do_write_some(fd, p); | |
333 | } | |
334 | if (size_t(*r) == p.len()) { | |
335 | fd.speculate_epoll(EPOLLOUT); | |
336 | } | |
337 | return make_ready_future<size_t>(*r); | |
338 | }); | |
339 | } | |
340 | ||
11fdf7f2 TL |
341 | future<> |
342 | reactor::write_all_part(pollable_fd_state& fd, const void* buffer, size_t len, size_t completed) { | |
343 | if (completed == len) { | |
344 | return make_ready_future<>(); | |
345 | } else { | |
9f95a23c | 346 | return _backend->write_some(fd, static_cast<const char*>(buffer) + completed, len - completed).then( |
11fdf7f2 TL |
347 | [&fd, buffer, len, completed, this] (size_t part) mutable { |
348 | return write_all_part(fd, buffer, len, completed + part); | |
349 | }); | |
350 | } | |
351 | } | |
352 | ||
353 | future<> | |
354 | reactor::write_all(pollable_fd_state& fd, const void* buffer, size_t len) { | |
355 | assert(len); | |
356 | return write_all_part(fd, buffer, len, 0); | |
357 | } | |
358 | ||
9f95a23c TL |
359 | future<size_t> pollable_fd_state::read_some(char* buffer, size_t size) { |
360 | return engine()._backend->read_some(*this, buffer, size); | |
11fdf7f2 TL |
361 | } |
362 | ||
9f95a23c TL |
363 | future<size_t> pollable_fd_state::read_some(uint8_t* buffer, size_t size) { |
364 | return engine()._backend->read_some(*this, buffer, size); | |
11fdf7f2 TL |
365 | } |
366 | ||
9f95a23c TL |
367 | future<size_t> pollable_fd_state::read_some(const std::vector<iovec>& iov) { |
368 | return engine()._backend->read_some(*this, iov); | |
11fdf7f2 TL |
369 | } |
370 | ||
f67539c2 TL |
371 | future<temporary_buffer<char>> pollable_fd_state::read_some(internal::buffer_allocator* ba) { |
372 | return engine()._backend->read_some(*this, ba); | |
373 | } | |
374 | ||
9f95a23c TL |
375 | future<size_t> pollable_fd_state::write_some(net::packet& p) { |
376 | return engine()._backend->write_some(*this, p); | |
11fdf7f2 TL |
377 | } |
378 | ||
9f95a23c TL |
379 | future<> pollable_fd_state::write_all(const char* buffer, size_t size) { |
380 | return engine().write_all(*this, buffer, size); | |
11fdf7f2 TL |
381 | } |
382 | ||
9f95a23c TL |
383 | future<> pollable_fd_state::write_all(const uint8_t* buffer, size_t size) { |
384 | return engine().write_all(*this, buffer, size); | |
11fdf7f2 TL |
385 | } |
386 | ||
9f95a23c | 387 | future<> pollable_fd_state::write_all(net::packet& p) { |
11fdf7f2 TL |
388 | return write_some(p).then([this, &p] (size_t size) { |
389 | if (p.len() == size) { | |
390 | return make_ready_future<>(); | |
391 | } | |
392 | p.trim_front(size); | |
393 | return write_all(p); | |
394 | }); | |
395 | } | |
396 | ||
9f95a23c TL |
397 | future<> pollable_fd_state::readable() { |
398 | return engine().readable(*this); | |
11fdf7f2 TL |
399 | } |
400 | ||
9f95a23c TL |
401 | future<> pollable_fd_state::writeable() { |
402 | return engine().writeable(*this); | |
11fdf7f2 TL |
403 | } |
404 | ||
9f95a23c TL |
405 | future<> pollable_fd_state::readable_or_writeable() { |
406 | return engine().readable_or_writeable(*this); | |
11fdf7f2 TL |
407 | } |
408 | ||
409 | void | |
9f95a23c TL |
410 | pollable_fd_state::abort_reader() { |
411 | engine().abort_reader(*this); | |
11fdf7f2 TL |
412 | } |
413 | ||
414 | void | |
9f95a23c TL |
415 | pollable_fd_state::abort_writer() { |
416 | engine().abort_writer(*this); | |
11fdf7f2 TL |
417 | } |
418 | ||
9f95a23c TL |
419 | future<std::tuple<pollable_fd, socket_address>> pollable_fd_state::accept() { |
420 | return engine()._backend->accept(*this); | |
11fdf7f2 TL |
421 | } |
422 | ||
9f95a23c TL |
423 | future<> pollable_fd_state::connect(socket_address& sa) { |
424 | return engine()._backend->connect(*this, sa); | |
425 | } | |
426 | ||
427 | future<size_t> pollable_fd_state::recvmsg(struct msghdr *msg) { | |
11fdf7f2 | 428 | maybe_no_more_recv(); |
9f95a23c TL |
429 | return engine().readable(*this).then([this, msg] { |
430 | auto r = fd.recvmsg(msg, 0); | |
11fdf7f2 TL |
431 | if (!r) { |
432 | return recvmsg(msg); | |
433 | } | |
434 | // We always speculate here to optimize for throughput in a workload | |
435 | // with multiple outstanding requests. This way the caller can consume | |
436 | // all messages without resorting to epoll. However this adds extra | |
437 | // recvmsg() call when we hit the empty queue condition, so it may | |
438 | // hurt request-response workload in which the queue is empty when we | |
439 | // initially enter recvmsg(). If that turns out to be a problem, we can | |
440 | // improve speculation by using recvmmsg(). | |
9f95a23c | 441 | speculate_epoll(EPOLLIN); |
11fdf7f2 TL |
442 | return make_ready_future<size_t>(*r); |
443 | }); | |
444 | }; | |
445 | ||
9f95a23c | 446 | future<size_t> pollable_fd_state::sendmsg(struct msghdr* msg) { |
11fdf7f2 | 447 | maybe_no_more_send(); |
9f95a23c TL |
448 | return engine().writeable(*this).then([this, msg] () mutable { |
449 | auto r = fd.sendmsg(msg, 0); | |
11fdf7f2 TL |
450 | if (!r) { |
451 | return sendmsg(msg); | |
452 | } | |
453 | // For UDP this will always speculate. We can't know if there's room | |
454 | // or not, but most of the time there should be so the cost of mis- | |
455 | // speculation is amortized. | |
456 | if (size_t(*r) == iovec_len(msg->msg_iov, msg->msg_iovlen)) { | |
9f95a23c | 457 | speculate_epoll(EPOLLOUT); |
11fdf7f2 TL |
458 | } |
459 | return make_ready_future<size_t>(*r); | |
460 | }); | |
461 | } | |
462 | ||
9f95a23c | 463 | future<size_t> pollable_fd_state::sendto(socket_address addr, const void* buf, size_t len) { |
11fdf7f2 | 464 | maybe_no_more_send(); |
9f95a23c TL |
465 | return engine().writeable(*this).then([this, buf, len, addr] () mutable { |
466 | auto r = fd.sendto(addr, buf, len, 0); | |
11fdf7f2 TL |
467 | if (!r) { |
468 | return sendto(std::move(addr), buf, len); | |
469 | } | |
470 | // See the comment about speculation in sendmsg(). | |
471 | if (size_t(*r) == len) { | |
9f95a23c | 472 | speculate_epoll(EPOLLOUT); |
11fdf7f2 TL |
473 | } |
474 | return make_ready_future<size_t>(*r); | |
475 | }); | |
476 | } | |
477 | ||
478 | namespace internal { | |
479 | ||
480 | #ifdef SEASTAR_TASK_HISTOGRAM | |
481 | ||
482 | class task_histogram { | |
483 | static constexpr unsigned max_countdown = 1'000'000; | |
484 | std::unordered_map<std::type_index, uint64_t> _histogram; | |
485 | unsigned _countdown_to_print = max_countdown; | |
486 | public: | |
487 | void add(const task& t) { | |
488 | ++_histogram[std::type_index(typeid(t))]; | |
489 | if (!--_countdown_to_print) { | |
490 | print(); | |
491 | _countdown_to_print = max_countdown; | |
492 | _histogram.clear(); | |
493 | } | |
494 | } | |
495 | void print() const { | |
496 | seastar::fmt::print("task histogram, {:d} task types {:d} tasks\n", _histogram.size(), max_countdown - _countdown_to_print); | |
497 | for (auto&& type_count : _histogram) { | |
498 | auto&& type = type_count.first; | |
499 | auto&& count = type_count.second; | |
500 | seastar::fmt::print(" {:10d} {}\n", count, type.name()); | |
501 | } | |
502 | } | |
503 | }; | |
504 | ||
505 | thread_local task_histogram this_thread_task_histogram; | |
506 | ||
507 | #endif | |
508 | ||
509 | void task_histogram_add_task(const task& t) { | |
510 | #ifdef SEASTAR_TASK_HISTOGRAM | |
511 | this_thread_task_histogram.add(t); | |
512 | #endif | |
513 | } | |
514 | ||
515 | } | |
516 | ||
517 | using namespace std::chrono_literals; | |
f67539c2 | 518 | namespace fs = std::filesystem; |
11fdf7f2 TL |
519 | |
520 | using namespace net; | |
521 | ||
522 | using namespace internal; | |
523 | using namespace internal::linux_abi; | |
524 | ||
11fdf7f2 TL |
525 | std::atomic<lowres_clock_impl::steady_rep> lowres_clock_impl::counters::_steady_now; |
526 | std::atomic<lowres_clock_impl::system_rep> lowres_clock_impl::counters::_system_now; | |
527 | std::atomic<manual_clock::rep> manual_clock::_now; | |
528 | constexpr std::chrono::milliseconds lowres_clock_impl::_granularity; | |
529 | ||
530 | constexpr unsigned reactor::max_queues; | |
531 | constexpr unsigned reactor::max_aio_per_queue; | |
532 | ||
9f95a23c TL |
533 | // Broken (returns spurious EIO). Cause/fix unknown. |
534 | bool aio_nowait_supported = false; | |
535 | ||
11fdf7f2 TL |
536 | static bool sched_debug() { |
537 | return false; | |
538 | } | |
539 | ||
540 | template <typename... Args> | |
541 | void | |
542 | sched_print(const char* fmt, Args&&... args) { | |
543 | if (sched_debug()) { | |
544 | sched_logger.trace(fmt, std::forward<Args>(args)...); | |
545 | } | |
546 | } | |
547 | ||
9f95a23c TL |
548 | static std::atomic<bool> abort_on_ebadf = { false }; |
549 | ||
550 | void set_abort_on_ebadf(bool do_abort) { | |
551 | abort_on_ebadf.store(do_abort); | |
552 | } | |
553 | ||
554 | bool is_abort_on_ebadf_enabled() { | |
555 | return abort_on_ebadf.load(); | |
556 | } | |
557 | ||
11fdf7f2 TL |
558 | timespec to_timespec(steady_clock_type::time_point t) { |
559 | using ns = std::chrono::nanoseconds; | |
560 | auto n = std::chrono::duration_cast<ns>(t.time_since_epoch()).count(); | |
561 | return { n / 1'000'000'000, n % 1'000'000'000 }; | |
562 | } | |
563 | ||
564 | lowres_clock_impl::lowres_clock_impl() { | |
565 | update(); | |
566 | _timer.set_callback(&lowres_clock_impl::update); | |
567 | _timer.arm_periodic(_granularity); | |
568 | } | |
569 | ||
f67539c2 | 570 | void lowres_clock_impl::update() noexcept { |
11fdf7f2 TL |
571 | auto const steady_count = |
572 | std::chrono::duration_cast<steady_duration>(base_steady_clock::now().time_since_epoch()).count(); | |
573 | ||
574 | auto const system_count = | |
575 | std::chrono::duration_cast<system_duration>(base_system_clock::now().time_since_epoch()).count(); | |
576 | ||
577 | counters::_steady_now.store(steady_count, std::memory_order_relaxed); | |
578 | counters::_system_now.store(system_count, std::memory_order_relaxed); | |
579 | } | |
580 | ||
9f95a23c TL |
581 | template <typename Clock> |
582 | inline | |
583 | timer<Clock>::~timer() { | |
584 | if (_queued) { | |
585 | engine().del_timer(this); | |
11fdf7f2 | 586 | } |
11fdf7f2 TL |
587 | } |
588 | ||
9f95a23c TL |
589 | template <typename Clock> |
590 | inline | |
f67539c2 | 591 | void timer<Clock>::arm(time_point until, std::optional<duration> period) noexcept { |
9f95a23c TL |
592 | arm_state(until, period); |
593 | engine().add_timer(this); | |
11fdf7f2 TL |
594 | } |
595 | ||
9f95a23c TL |
596 | template <typename Clock> |
597 | inline | |
f67539c2 | 598 | void timer<Clock>::readd_periodic() noexcept { |
9f95a23c TL |
599 | arm_state(Clock::now() + _period.value(), {_period.value()}); |
600 | engine().queue_timer(this); | |
11fdf7f2 TL |
601 | } |
602 | ||
9f95a23c TL |
603 | template <typename Clock> |
604 | inline | |
f67539c2 | 605 | bool timer<Clock>::cancel() noexcept { |
9f95a23c TL |
606 | if (!_armed) { |
607 | return false; | |
11fdf7f2 | 608 | } |
9f95a23c TL |
609 | _armed = false; |
610 | if (_queued) { | |
611 | engine().del_timer(this); | |
612 | _queued = false; | |
11fdf7f2 | 613 | } |
9f95a23c | 614 | return true; |
11fdf7f2 TL |
615 | } |
616 | ||
9f95a23c TL |
617 | template class timer<steady_clock_type>; |
618 | template class timer<lowres_clock>; | |
619 | template class timer<manual_clock>; | |
11fdf7f2 TL |
620 | |
621 | reactor::signals::signals() : _pending_signals(0) { | |
622 | } | |
623 | ||
624 | reactor::signals::~signals() { | |
625 | sigset_t mask; | |
626 | sigfillset(&mask); | |
627 | ::pthread_sigmask(SIG_BLOCK, &mask, NULL); | |
628 | } | |
629 | ||
9f95a23c | 630 | reactor::signals::signal_handler::signal_handler(int signo, noncopyable_function<void ()>&& handler) |
11fdf7f2 | 631 | : _handler(std::move(handler)) { |
11fdf7f2 TL |
632 | } |
633 | ||
9f95a23c TL |
634 | void |
635 | reactor::signals::handle_signal(int signo, noncopyable_function<void ()>&& handler) { | |
636 | _signal_handlers.emplace(std::piecewise_construct, | |
637 | std::make_tuple(signo), std::make_tuple(signo, std::move(handler))); | |
638 | ||
11fdf7f2 | 639 | struct sigaction sa; |
9f95a23c TL |
640 | sa.sa_sigaction = [](int sig, siginfo_t *info, void *p) { |
641 | engine()._backend->signal_received(sig, info, p); | |
642 | }; | |
11fdf7f2 TL |
643 | sa.sa_mask = make_empty_sigset_mask(); |
644 | sa.sa_flags = SA_SIGINFO | SA_RESTART; | |
645 | auto r = ::sigaction(signo, &sa, nullptr); | |
646 | throw_system_error_on(r == -1); | |
647 | auto mask = make_sigset_mask(signo); | |
648 | r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL); | |
649 | throw_pthread_error(r); | |
650 | } | |
651 | ||
652 | void | |
9f95a23c | 653 | reactor::signals::handle_signal_once(int signo, noncopyable_function<void ()>&& handler) { |
11fdf7f2 TL |
654 | return handle_signal(signo, [fired = false, handler = std::move(handler)] () mutable { |
655 | if (!fired) { | |
656 | fired = true; | |
657 | handler(); | |
658 | } | |
659 | }); | |
660 | } | |
661 | ||
662 | bool reactor::signals::poll_signal() { | |
663 | auto signals = _pending_signals.load(std::memory_order_relaxed); | |
664 | if (signals) { | |
665 | _pending_signals.fetch_and(~signals, std::memory_order_relaxed); | |
666 | for (size_t i = 0; i < sizeof(signals)*8; i++) { | |
667 | if (signals & (1ull << i)) { | |
668 | _signal_handlers.at(i)._handler(); | |
669 | } | |
670 | } | |
671 | } | |
672 | return signals; | |
673 | } | |
674 | ||
675 | bool reactor::signals::pure_poll_signal() const { | |
676 | return _pending_signals.load(std::memory_order_relaxed); | |
677 | } | |
678 | ||
679 | void reactor::signals::action(int signo, siginfo_t* siginfo, void* ignore) { | |
9f95a23c | 680 | engine().start_handling_signal(); |
11fdf7f2 TL |
681 | engine()._signals._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed); |
682 | } | |
683 | ||
684 | void reactor::signals::failed_to_handle(int signo) { | |
685 | char tname[64]; | |
686 | pthread_getname_np(pthread_self(), tname, sizeof(tname)); | |
687 | auto tid = syscall(SYS_gettid); | |
688 | seastar_logger.error("Failed to handle signal {} on thread {} ({}): engine not ready", signo, tid, tname); | |
689 | } | |
690 | ||
9f95a23c | 691 | void reactor::handle_signal(int signo, noncopyable_function<void ()>&& handler) { |
11fdf7f2 TL |
692 | _signals.handle_signal(signo, std::move(handler)); |
693 | } | |
694 | ||
11fdf7f2 TL |
695 | // Accumulates an in-memory backtrace and flush to stderr eventually. |
696 | // Async-signal safe. | |
697 | class backtrace_buffer { | |
698 | static constexpr unsigned _max_size = 8 << 10; | |
699 | unsigned _pos = 0; | |
700 | char _buf[_max_size]; | |
701 | public: | |
702 | void flush() noexcept { | |
703 | print_safe(_buf, _pos); | |
704 | _pos = 0; | |
705 | } | |
706 | ||
707 | void append(const char* str, size_t len) noexcept { | |
708 | if (_pos + len >= _max_size) { | |
709 | flush(); | |
710 | } | |
711 | memcpy(_buf + _pos, str, len); | |
712 | _pos += len; | |
713 | } | |
714 | ||
715 | void append(const char* str) noexcept { append(str, strlen(str)); } | |
716 | ||
717 | template <typename Integral> | |
718 | void append_decimal(Integral n) noexcept { | |
719 | char buf[sizeof(n) * 3]; | |
720 | auto len = convert_decimal_safe(buf, sizeof(buf), n); | |
721 | append(buf, len); | |
722 | } | |
723 | ||
724 | template <typename Integral> | |
725 | void append_hex(Integral ptr) noexcept { | |
726 | char buf[sizeof(ptr) * 2]; | |
727 | convert_zero_padded_hex_safe(buf, sizeof(buf), ptr); | |
728 | append(buf, sizeof(buf)); | |
729 | } | |
730 | ||
731 | void append_backtrace() noexcept { | |
732 | backtrace([this] (frame f) { | |
733 | append(" "); | |
734 | if (!f.so->name.empty()) { | |
735 | append(f.so->name.c_str(), f.so->name.size()); | |
736 | append("+"); | |
737 | } | |
738 | ||
739 | append("0x"); | |
740 | append_hex(f.addr); | |
741 | append("\n"); | |
742 | }); | |
743 | } | |
744 | }; | |
745 | ||
746 | static void print_with_backtrace(backtrace_buffer& buf) noexcept { | |
747 | if (local_engine) { | |
748 | buf.append(" on shard "); | |
f67539c2 | 749 | buf.append_decimal(this_shard_id()); |
11fdf7f2 TL |
750 | } |
751 | ||
752 | buf.append(".\nBacktrace:\n"); | |
753 | buf.append_backtrace(); | |
754 | buf.flush(); | |
755 | } | |
756 | ||
757 | static void print_with_backtrace(const char* cause) noexcept { | |
758 | backtrace_buffer buf; | |
759 | buf.append(cause); | |
760 | print_with_backtrace(buf); | |
761 | } | |
762 | ||
763 | // Installs signal handler stack for current thread. | |
764 | // The stack remains installed as long as the returned object is kept alive. | |
765 | // When it goes out of scope the previous handler is restored. | |
766 | static decltype(auto) install_signal_handler_stack() { | |
767 | size_t size = SIGSTKSZ; | |
768 | auto mem = std::make_unique<char[]>(size); | |
769 | stack_t stack; | |
770 | stack_t prev_stack; | |
771 | stack.ss_sp = mem.get(); | |
772 | stack.ss_flags = 0; | |
773 | stack.ss_size = size; | |
774 | auto r = sigaltstack(&stack, &prev_stack); | |
775 | throw_system_error_on(r == -1); | |
776 | return defer([mem = std::move(mem), prev_stack] () mutable { | |
777 | try { | |
778 | auto r = sigaltstack(&prev_stack, NULL); | |
779 | throw_system_error_on(r == -1); | |
780 | } catch (...) { | |
781 | mem.release(); // We failed to restore previous stack, must leak it. | |
782 | seastar_logger.error("Failed to restore signal stack: {}", std::current_exception()); | |
783 | } | |
784 | }); | |
785 | } | |
786 | ||
787 | reactor::task_queue::task_queue(unsigned id, sstring name, float shares) | |
788 | : _shares(std::max(shares, 1.0f)) | |
789 | , _reciprocal_shares_times_2_power_32((uint64_t(1) << 32) / _shares) | |
790 | , _id(id) | |
f67539c2 | 791 | , _ts(std::chrono::steady_clock::now()) |
11fdf7f2 | 792 | , _name(name) { |
9f95a23c TL |
793 | register_stats(); |
794 | } | |
795 | ||
796 | void | |
797 | reactor::task_queue::register_stats() { | |
798 | seastar::metrics::metric_groups new_metrics; | |
11fdf7f2 TL |
799 | namespace sm = seastar::metrics; |
800 | static auto group = sm::label("group"); | |
801 | auto group_label = group(_name); | |
9f95a23c | 802 | new_metrics.add_group("scheduler", { |
11fdf7f2 TL |
803 | sm::make_counter("runtime_ms", [this] { |
804 | return std::chrono::duration_cast<std::chrono::milliseconds>(_runtime).count(); | |
805 | }, sm::description("Accumulated runtime of this task queue; an increment rate of 1000ms per second indicates full utilization"), | |
806 | {group_label}), | |
f67539c2 TL |
807 | sm::make_counter("waittime_ms", [this] { |
808 | return std::chrono::duration_cast<std::chrono::milliseconds>(_waittime).count(); | |
809 | }, sm::description("Accumulated waittime of this task queue; an increment rate of 1000ms per second indicates queue is waiting for something (e.g. IO)"), | |
810 | {group_label}), | |
811 | sm::make_counter("starvetime_ms", [this] { | |
812 | return std::chrono::duration_cast<std::chrono::milliseconds>(_starvetime).count(); | |
813 | }, sm::description("Accumulated starvation time of this task queue; an increment rate of 1000ms per second indicates the scheduler feels really bad"), | |
814 | {group_label}), | |
11fdf7f2 TL |
815 | sm::make_counter("tasks_processed", _tasks_processed, |
816 | sm::description("Count of tasks executing on this queue; indicates together with runtime_ms indicates length of tasks"), | |
817 | {group_label}), | |
818 | sm::make_gauge("queue_length", [this] { return _q.size(); }, | |
819 | sm::description("Size of backlog on this queue, in tasks; indicates whether the queue is busy and/or contended"), | |
820 | {group_label}), | |
821 | sm::make_gauge("shares", [this] { return _shares; }, | |
822 | sm::description("Shares allocated to this queue"), | |
823 | {group_label}), | |
824 | sm::make_derive("time_spent_on_task_quota_violations_ms", [this] { | |
825 | return _time_spent_on_task_quota_violations / 1ms; | |
826 | }, sm::description("Total amount in milliseconds we were in violation of the task quota"), | |
827 | {group_label}), | |
828 | }); | |
9f95a23c TL |
829 | _metrics = std::exchange(new_metrics, {}); |
830 | } | |
831 | ||
832 | void | |
833 | reactor::task_queue::rename(sstring new_name) { | |
834 | if (_name != new_name) { | |
835 | _name = new_name; | |
836 | register_stats(); | |
837 | } | |
11fdf7f2 TL |
838 | } |
839 | ||
9f95a23c TL |
840 | #ifdef __clang__ |
841 | __attribute__((no_sanitize("undefined"))) // multiplication below may overflow; we check for that | |
842 | #elif defined(__GNUC__) | |
843 | [[gnu::no_sanitize_undefined]] | |
844 | #endif | |
11fdf7f2 TL |
845 | inline |
846 | int64_t | |
847 | reactor::task_queue::to_vruntime(sched_clock::duration runtime) const { | |
848 | auto scaled = (runtime.count() * _reciprocal_shares_times_2_power_32) >> 32; | |
849 | // Prevent overflow from returning ridiculous values | |
850 | return std::max<int64_t>(scaled, 0); | |
851 | } | |
852 | ||
853 | void | |
f67539c2 | 854 | reactor::task_queue::set_shares(float shares) noexcept { |
11fdf7f2 TL |
855 | _shares = std::max(shares, 1.0f); |
856 | _reciprocal_shares_times_2_power_32 = (uint64_t(1) << 32) / _shares; | |
857 | } | |
858 | ||
859 | void | |
860 | reactor::account_runtime(task_queue& tq, sched_clock::duration runtime) { | |
9f95a23c | 861 | if (runtime > (2 * _task_quota)) { |
11fdf7f2 TL |
862 | tq._time_spent_on_task_quota_violations += runtime - _task_quota; |
863 | } | |
864 | tq._vruntime += tq.to_vruntime(runtime); | |
865 | tq._runtime += runtime; | |
866 | } | |
867 | ||
868 | void | |
869 | reactor::account_idle(sched_clock::duration runtime) { | |
870 | // anything to do here? | |
871 | } | |
872 | ||
873 | struct reactor::task_queue::indirect_compare { | |
874 | bool operator()(const task_queue* tq1, const task_queue* tq2) const { | |
875 | return tq1->_vruntime < tq2->_vruntime; | |
876 | } | |
877 | }; | |
878 | ||
9f95a23c TL |
879 | reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg) |
880 | : _cfg(cfg) | |
881 | , _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC)) | |
11fdf7f2 | 882 | , _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC)) |
11fdf7f2 TL |
883 | , _id(id) |
884 | #ifdef HAVE_OSV | |
885 | , _timer_thread( | |
886 | [&] { timer_thread_func(); }, sched::thread::attr().stack(4096).name("timer_thread").pin(sched::cpu::current())) | |
887 | , _engine_thread(sched::thread::current()) | |
888 | #endif | |
889 | , _cpu_started(0) | |
f67539c2 | 890 | , _cpu_stall_detector(std::make_unique<cpu_stall_detector>()) |
11fdf7f2 TL |
891 | , _reuseport(posix_reuseport_detect()) |
892 | , _thread_pool(std::make_unique<thread_pool>(this, seastar::format("syscall-{}", id))) { | |
9f95a23c TL |
893 | /* |
894 | * The _backend assignment is here, not on the initialization list as | |
895 | * the chosen backend constructor may want to handle signals and thus | |
896 | * needs the _signals._signal_handlers map to be initialized. | |
897 | */ | |
898 | _backend = rbs.create(this); | |
f67539c2 | 899 | *internal::get_scheduling_group_specific_thread_local_data_ptr() = &_scheduling_group_specific_data; |
11fdf7f2 TL |
900 | _task_queues.push_back(std::make_unique<task_queue>(0, "main", 1000)); |
901 | _task_queues.push_back(std::make_unique<task_queue>(1, "atexit", 1000)); | |
902 | _at_destroy_tasks = _task_queues.back().get(); | |
903 | g_need_preempt = &_preemption_monitor; | |
904 | seastar::thread_impl::init(); | |
905 | _backend->start_tick(); | |
9f95a23c | 906 | |
11fdf7f2 TL |
907 | #ifdef HAVE_OSV |
908 | _timer_thread.start(); | |
909 | #else | |
910 | sigset_t mask; | |
911 | sigemptyset(&mask); | |
11fdf7f2 | 912 | sigaddset(&mask, cpu_stall_detector::signal_number()); |
9f95a23c | 913 | auto r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL); |
11fdf7f2 TL |
914 | assert(r == 0); |
915 | #endif | |
916 | memory::set_reclaim_hook([this] (std::function<void ()> reclaim_fn) { | |
917 | add_high_priority_task(make_task(default_scheduling_group(), [fn = std::move(reclaim_fn)] { | |
918 | fn(); | |
919 | })); | |
920 | }); | |
921 | } | |
922 | ||
923 | reactor::~reactor() { | |
924 | sigset_t mask; | |
925 | sigemptyset(&mask); | |
926 | sigaddset(&mask, cpu_stall_detector::signal_number()); | |
927 | auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL); | |
928 | assert(r == 0); | |
929 | ||
930 | _backend->stop_tick(); | |
931 | auto eraser = [](auto& list) { | |
932 | while (!list.empty()) { | |
933 | auto& timer = *list.begin(); | |
934 | timer.cancel(); | |
935 | } | |
936 | }; | |
937 | eraser(_expired_timers); | |
938 | eraser(_expired_lowres_timers); | |
939 | eraser(_expired_manual_timers); | |
f67539c2 | 940 | auto& sg_data = _scheduling_group_specific_data; |
9f95a23c TL |
941 | for (auto&& tq : _task_queues) { |
942 | if (tq) { | |
f67539c2 | 943 | auto& this_sg = sg_data.per_scheduling_group_data[tq->_id]; |
9f95a23c TL |
944 | // The following line will preserve the convention that constructor and destructor functions |
945 | // for the per sg values are called in the context of the containing scheduling group. | |
946 | *internal::current_scheduling_group_ptr() = scheduling_group(tq->_id); | |
f67539c2 TL |
947 | for (size_t key : boost::irange<size_t>(0, sg_data.scheduling_group_key_configs.size())) { |
948 | void* val = this_sg.specific_vals[key]; | |
9f95a23c | 949 | if (val) { |
f67539c2 TL |
950 | if (sg_data.scheduling_group_key_configs[key].destructor) { |
951 | sg_data.scheduling_group_key_configs[key].destructor(val); | |
9f95a23c TL |
952 | } |
953 | free(val); | |
f67539c2 | 954 | this_sg.specific_vals[key] = nullptr; |
9f95a23c TL |
955 | } |
956 | } | |
957 | } | |
958 | } | |
11fdf7f2 TL |
959 | } |
960 | ||
961 | future<> reactor::readable(pollable_fd_state& fd) { | |
962 | return _backend->readable(fd); | |
963 | } | |
964 | ||
965 | future<> reactor::writeable(pollable_fd_state& fd) { | |
966 | return _backend->writeable(fd); | |
967 | } | |
968 | ||
969 | future<> reactor::readable_or_writeable(pollable_fd_state& fd) { | |
970 | return _backend->readable_or_writeable(fd); | |
971 | } | |
972 | ||
11fdf7f2 TL |
973 | void reactor::abort_reader(pollable_fd_state& fd) { |
974 | // TCP will respond to shutdown(SHUT_RD) by returning ECONNABORT on the next read, | |
975 | // but UDP responds by returning AGAIN. The no_more_recv flag tells us to convert | |
976 | // EAGAIN to ECONNABORT in that case. | |
977 | fd.no_more_recv = true; | |
978 | return fd.fd.shutdown(SHUT_RD); | |
979 | } | |
980 | ||
981 | void reactor::abort_writer(pollable_fd_state& fd) { | |
982 | // TCP will respond to shutdown(SHUT_WR) by returning ECONNABORT on the next write, | |
983 | // but UDP responds by returning AGAIN. The no_more_recv flag tells us to convert | |
984 | // EAGAIN to ECONNABORT in that case. | |
985 | fd.no_more_send = true; | |
986 | return fd.fd.shutdown(SHUT_WR); | |
987 | } | |
988 | ||
989 | void reactor::set_strict_dma(bool value) { | |
990 | _strict_o_direct = value; | |
991 | } | |
992 | ||
993 | void reactor::set_bypass_fsync(bool value) { | |
994 | _bypass_fsync = value; | |
995 | } | |
996 | ||
997 | void | |
998 | reactor::reset_preemption_monitor() { | |
999 | return _backend->reset_preemption_monitor(); | |
1000 | } | |
1001 | ||
11fdf7f2 TL |
1002 | void |
1003 | reactor::request_preemption() { | |
1004 | return _backend->request_preemption(); | |
1005 | } | |
1006 | ||
9f95a23c TL |
1007 | void reactor::start_handling_signal() { |
1008 | return _backend->start_handling_signal(); | |
11fdf7f2 TL |
1009 | } |
1010 | ||
f67539c2 TL |
1011 | cpu_stall_detector::cpu_stall_detector(cpu_stall_detector_config cfg) |
1012 | : _shard_id(this_shard_id()) { | |
9f95a23c TL |
1013 | // glib's backtrace() calls dlopen("libgcc_s.so.1") once to resolve unwind related symbols. |
1014 | // If first stall detector invocation happens during another dlopen() call the calling thread | |
1015 | // will deadlock. The dummy call here makes sure that backtrace's initialization happens in | |
1016 | // a safe place. | |
1017 | backtrace([] (frame) {}); | |
11fdf7f2 TL |
1018 | update_config(cfg); |
1019 | struct sigevent sev = {}; | |
1020 | sev.sigev_notify = SIGEV_THREAD_ID; | |
1021 | sev.sigev_signo = signal_number(); | |
1022 | sev._sigev_un._tid = syscall(SYS_gettid); | |
1023 | int err = timer_create(CLOCK_THREAD_CPUTIME_ID, &sev, &_timer); | |
1024 | if (err) { | |
1025 | throw std::system_error(std::error_code(err, std::system_category())); | |
1026 | } | |
9f95a23c TL |
1027 | |
1028 | namespace sm = seastar::metrics; | |
1029 | ||
1030 | _metrics.add_group("stall_detector", { | |
1031 | sm::make_derive("reported", _total_reported, sm::description("Total number of reported stalls, look in the traces for the exact reason"))}); | |
1032 | ||
1033 | ||
11fdf7f2 TL |
1034 | // note: if something is added here that can, it should take care to destroy _timer. |
1035 | } | |
1036 | ||
1037 | cpu_stall_detector::~cpu_stall_detector() { | |
1038 | timer_delete(_timer); | |
1039 | } | |
1040 | ||
1041 | cpu_stall_detector_config | |
1042 | cpu_stall_detector::get_config() const { | |
1043 | return _config; | |
1044 | } | |
1045 | ||
1046 | void cpu_stall_detector::update_config(cpu_stall_detector_config cfg) { | |
1047 | _config = cfg; | |
1048 | _threshold = std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold); | |
1049 | _slack = std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold * cfg.slack); | |
1050 | _stall_detector_reports_per_minute = cfg.stall_detector_reports_per_minute; | |
1051 | _max_reports_per_minute = cfg.stall_detector_reports_per_minute; | |
1052 | _rearm_timer_at = std::chrono::steady_clock::now(); | |
1053 | } | |
1054 | ||
1055 | void cpu_stall_detector::maybe_report() { | |
1056 | if (_reported++ < _max_reports_per_minute) { | |
1057 | generate_trace(); | |
1058 | } | |
1059 | } | |
1060 | // We use a tick at every timer firing so we can report suppressed backtraces. | |
1061 | // Best case it's a correctly predicted branch. If a backtrace had happened in | |
1062 | // the near past it's an increment and two branches. | |
1063 | // | |
1064 | // We can do it a cheaper if we don't report suppressed backtraces. | |
1065 | void cpu_stall_detector::on_signal() { | |
9f95a23c TL |
1066 | auto tasks_processed = engine().tasks_processed(); |
1067 | auto last_seen = _last_tasks_processed_seen.load(std::memory_order_relaxed); | |
1068 | if (!last_seen) { | |
1069 | return; // stall detector in not active | |
1070 | } else if (last_seen == tasks_processed) { // no task was processed - report | |
11fdf7f2 TL |
1071 | maybe_report(); |
1072 | _report_at <<= 1; | |
9f95a23c TL |
1073 | } else { |
1074 | _last_tasks_processed_seen.store(tasks_processed, std::memory_order_relaxed); | |
11fdf7f2 | 1075 | } |
9f95a23c | 1076 | arm_timer(); |
11fdf7f2 TL |
1077 | } |
1078 | ||
1079 | void cpu_stall_detector::report_suppressions(std::chrono::steady_clock::time_point now) { | |
1080 | if (now > _minute_mark + 60s) { | |
9f95a23c TL |
1081 | if (_reported > _max_reports_per_minute) { |
1082 | auto suppressed = _reported - _max_reports_per_minute; | |
11fdf7f2 TL |
1083 | backtrace_buffer buf; |
1084 | // Reuse backtrace buffer infrastructure so we don't have to allocate here | |
9f95a23c TL |
1085 | buf.append("Rate-limit: suppressed "); |
1086 | buf.append_decimal(suppressed); | |
1087 | suppressed == 1 ? buf.append(" backtrace") : buf.append(" backtraces"); | |
11fdf7f2 TL |
1088 | buf.append(" on shard "); |
1089 | buf.append_decimal(_shard_id); | |
1090 | buf.append("\n"); | |
1091 | buf.flush(); | |
1092 | } | |
1093 | _reported = 0; | |
1094 | _minute_mark = now; | |
1095 | } | |
1096 | } | |
1097 | ||
1098 | void cpu_stall_detector::arm_timer() { | |
1099 | auto its = posix::to_relative_itimerspec(_threshold * _report_at + _slack, 0s); | |
1100 | timer_settime(_timer, 0, &its, nullptr); | |
1101 | } | |
1102 | ||
1103 | void cpu_stall_detector::start_task_run(std::chrono::steady_clock::time_point now) { | |
1104 | if (now > _rearm_timer_at) { | |
1105 | report_suppressions(now); | |
1106 | _report_at = 1; | |
1107 | _run_started_at = now; | |
1108 | _rearm_timer_at = now + _threshold * _report_at; | |
1109 | arm_timer(); | |
1110 | } | |
9f95a23c | 1111 | _last_tasks_processed_seen.store(engine().tasks_processed(), std::memory_order_relaxed); |
11fdf7f2 TL |
1112 | std::atomic_signal_fence(std::memory_order_release); // Don't delay this write, so the signal handler can see it |
1113 | } | |
1114 | ||
1115 | void cpu_stall_detector::end_task_run(std::chrono::steady_clock::time_point now) { | |
1116 | std::atomic_signal_fence(std::memory_order_acquire); // Don't hoist this write, so the signal handler can see it | |
9f95a23c | 1117 | _last_tasks_processed_seen.store(0, std::memory_order_relaxed); |
11fdf7f2 TL |
1118 | } |
1119 | ||
1120 | void cpu_stall_detector::start_sleep() { | |
1121 | auto its = posix::to_relative_itimerspec(0s, 0s); | |
1122 | timer_settime(_timer, 0, &its, nullptr); | |
1123 | _rearm_timer_at = std::chrono::steady_clock::now(); | |
1124 | } | |
1125 | ||
1126 | void cpu_stall_detector::end_sleep() { | |
1127 | } | |
1128 | ||
1129 | void | |
1130 | reactor::task_quota_timer_thread_fn() { | |
1131 | auto thread_name = seastar::format("timer-{}", _id); | |
1132 | pthread_setname_np(pthread_self(), thread_name.c_str()); | |
1133 | ||
1134 | sigset_t mask; | |
9f95a23c | 1135 | sigfillset(&mask); |
11fdf7f2 TL |
1136 | for (auto sig : { SIGSEGV }) { |
1137 | sigdelset(&mask, sig); | |
1138 | } | |
1139 | auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL); | |
1140 | if (r) { | |
9f95a23c | 1141 | seastar_logger.error("Thread {}: failed to block signals. Aborting.", thread_name.c_str()); |
11fdf7f2 TL |
1142 | abort(); |
1143 | } | |
1144 | ||
1145 | // We need to wait until task quota is set before we can calculate how many ticks are to | |
1146 | // a minute. Technically task_quota is used from many threads, but since it is read-only here | |
1147 | // and only used during initialization we will avoid complicating the code. | |
1148 | { | |
1149 | uint64_t events; | |
1150 | _task_quota_timer.read(&events, 8); | |
1151 | request_preemption(); | |
1152 | } | |
1153 | ||
1154 | while (!_dying.load(std::memory_order_relaxed)) { | |
1155 | uint64_t events; | |
1156 | _task_quota_timer.read(&events, 8); | |
1157 | request_preemption(); | |
1158 | ||
1159 | // We're in a different thread, but guaranteed to be on the same core, so even | |
1160 | // a signal fence is overdoing it | |
1161 | std::atomic_signal_fence(std::memory_order_seq_cst); | |
1162 | } | |
1163 | } | |
9f95a23c | 1164 | void |
11fdf7f2 TL |
1165 | reactor::update_blocked_reactor_notify_ms(std::chrono::milliseconds ms) { |
1166 | auto cfg = _cpu_stall_detector->get_config(); | |
1167 | if (ms != cfg.threshold) { | |
1168 | cfg.threshold = ms; | |
1169 | _cpu_stall_detector->update_config(cfg); | |
1170 | seastar_logger.info("updated: blocked-reactor-notify-ms={}", ms.count()); | |
1171 | } | |
1172 | } | |
1173 | ||
1174 | std::chrono::milliseconds | |
1175 | reactor::get_blocked_reactor_notify_ms() const { | |
1176 | auto d = _cpu_stall_detector->get_config().threshold; | |
1177 | return std::chrono::duration_cast<std::chrono::milliseconds>(d); | |
1178 | } | |
1179 | ||
1180 | void | |
1181 | reactor::set_stall_detector_report_function(std::function<void ()> report) { | |
1182 | auto cfg = _cpu_stall_detector->get_config(); | |
1183 | cfg.report = std::move(report); | |
1184 | _cpu_stall_detector->update_config(std::move(cfg)); | |
1185 | } | |
1186 | ||
1187 | std::function<void ()> | |
1188 | reactor::get_stall_detector_report_function() const { | |
1189 | return _cpu_stall_detector->get_config().report; | |
1190 | } | |
1191 | ||
1192 | void | |
1193 | reactor::block_notifier(int) { | |
9f95a23c | 1194 | engine()._cpu_stall_detector->on_signal(); |
11fdf7f2 TL |
1195 | } |
1196 | ||
1197 | void | |
1198 | cpu_stall_detector::generate_trace() { | |
1199 | auto delta = std::chrono::steady_clock::now() - _run_started_at; | |
1200 | ||
9f95a23c | 1201 | _total_reported++; |
11fdf7f2 TL |
1202 | if (_config.report) { |
1203 | _config.report(); | |
1204 | return; | |
1205 | } | |
1206 | ||
1207 | backtrace_buffer buf; | |
1208 | buf.append("Reactor stalled for "); | |
1209 | buf.append_decimal(uint64_t(delta / 1ms)); | |
1210 | buf.append(" ms"); | |
1211 | print_with_backtrace(buf); | |
1212 | } | |
1213 | ||
1214 | template <typename T, typename E, typename EnableFunc> | |
f67539c2 | 1215 | void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_fn) noexcept(noexcept(enable_fn())) { |
11fdf7f2 TL |
1216 | expired_timers = timers.expire(timers.now()); |
1217 | for (auto& t : expired_timers) { | |
1218 | t._expired = true; | |
1219 | } | |
f67539c2 | 1220 | const auto prev_sg = current_scheduling_group(); |
11fdf7f2 TL |
1221 | while (!expired_timers.empty()) { |
1222 | auto t = &*expired_timers.begin(); | |
1223 | expired_timers.pop_front(); | |
1224 | t->_queued = false; | |
1225 | if (t->_armed) { | |
1226 | t->_armed = false; | |
1227 | if (t->_period) { | |
1228 | t->readd_periodic(); | |
1229 | } | |
1230 | try { | |
f67539c2 | 1231 | *internal::current_scheduling_group_ptr() = t->_sg; |
11fdf7f2 TL |
1232 | t->_callback(); |
1233 | } catch (...) { | |
1234 | seastar_logger.error("Timer callback failed: {}", std::current_exception()); | |
1235 | } | |
1236 | } | |
1237 | } | |
f67539c2 TL |
1238 | // complete_timers() can be called from the context of run_tasks() |
1239 | // as well so we need to restore the previous scheduling group (set by run_tasks()). | |
1240 | *internal::current_scheduling_group_ptr() = prev_sg; | |
11fdf7f2 TL |
1241 | enable_fn(); |
1242 | } | |
1243 | ||
1244 | #ifdef HAVE_OSV | |
1245 | void reactor::timer_thread_func() { | |
1246 | sched::timer tmr(*sched::thread::current()); | |
1247 | WITH_LOCK(_timer_mutex) { | |
1248 | while (!_stopped) { | |
1249 | if (_timer_due != 0) { | |
1250 | set_timer(tmr, _timer_due); | |
1251 | _timer_cond.wait(_timer_mutex, &tmr); | |
1252 | if (tmr.expired()) { | |
1253 | _timer_due = 0; | |
1254 | _engine_thread->unsafe_stop(); | |
1255 | _pending_tasks.push_front(make_task(default_scheduling_group(), [this] { | |
1256 | complete_timers(_timers, _expired_timers, [this] { | |
1257 | if (!_timers.empty()) { | |
1258 | enable_timer(_timers.get_next_timeout()); | |
1259 | } | |
1260 | }); | |
1261 | })); | |
1262 | _engine_thread->wake(); | |
1263 | } else { | |
1264 | tmr.cancel(); | |
1265 | } | |
1266 | } else { | |
1267 | _timer_cond.wait(_timer_mutex); | |
1268 | } | |
1269 | } | |
1270 | } | |
1271 | } | |
1272 | ||
1273 | void reactor::set_timer(sched::timer &tmr, s64 t) { | |
1274 | using namespace osv::clock; | |
1275 | tmr.set(wall::time_point(std::chrono::nanoseconds(t))); | |
1276 | } | |
1277 | #endif | |
1278 | ||
1279 | class network_stack_registry { | |
1280 | public: | |
1281 | using options = boost::program_options::variables_map; | |
1282 | private: | |
1283 | static std::unordered_map<sstring, | |
9f95a23c | 1284 | noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)>>& _map() { |
11fdf7f2 | 1285 | static std::unordered_map<sstring, |
9f95a23c | 1286 | noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)>> map; |
11fdf7f2 TL |
1287 | return map; |
1288 | } | |
1289 | static sstring& _default() { | |
1290 | static sstring def; | |
1291 | return def; | |
1292 | } | |
1293 | public: | |
1294 | static boost::program_options::options_description& options_description() { | |
1295 | static boost::program_options::options_description opts; | |
1296 | return opts; | |
1297 | } | |
1298 | static void register_stack(sstring name, boost::program_options::options_description opts, | |
9f95a23c | 1299 | noncopyable_function<future<std::unique_ptr<network_stack>>(options opts)> create, |
11fdf7f2 TL |
1300 | bool make_default); |
1301 | static sstring default_stack(); | |
1302 | static std::vector<sstring> list(); | |
1303 | static future<std::unique_ptr<network_stack>> create(options opts); | |
1304 | static future<std::unique_ptr<network_stack>> create(sstring name, options opts); | |
1305 | }; | |
1306 | ||
1307 | void reactor::configure(boost::program_options::variables_map vm) { | |
9f95a23c | 1308 | _network_stack_ready = vm.count("network-stack") |
11fdf7f2 TL |
1309 | ? network_stack_registry::create(sstring(vm["network-stack"].as<std::string>()), vm) |
1310 | : network_stack_registry::create(vm); | |
11fdf7f2 TL |
1311 | |
1312 | _handle_sigint = !vm.count("no-handle-interrupt"); | |
1313 | auto task_quota = vm["task-quota-ms"].as<double>() * 1ms; | |
1314 | _task_quota = std::chrono::duration_cast<sched_clock::duration>(task_quota); | |
1315 | ||
1316 | auto blocked_time = vm["blocked-reactor-notify-ms"].as<unsigned>() * 1ms; | |
1317 | cpu_stall_detector_config csdc; | |
1318 | csdc.threshold = blocked_time; | |
1319 | csdc.stall_detector_reports_per_minute = vm["blocked-reactor-reports-per-minute"].as<unsigned>(); | |
1320 | _cpu_stall_detector->update_config(csdc); | |
1321 | ||
1322 | _max_task_backlog = vm["max-task-backlog"].as<unsigned>(); | |
1323 | _max_poll_time = vm["idle-poll-time-us"].as<unsigned>() * 1us; | |
1324 | if (vm.count("poll-mode")) { | |
1325 | _max_poll_time = std::chrono::nanoseconds::max(); | |
1326 | } | |
1327 | if (vm.count("overprovisioned") | |
1328 | && vm["idle-poll-time-us"].defaulted() | |
1329 | && !vm.count("poll-mode")) { | |
1330 | _max_poll_time = 0us; | |
1331 | } | |
1332 | set_strict_dma(!vm.count("relaxed-dma")); | |
1333 | if (!vm["poll-aio"].as<bool>() | |
1334 | || (vm["poll-aio"].defaulted() && vm.count("overprovisioned"))) { | |
1335 | _aio_eventfd = pollable_fd(file_desc::eventfd(0, 0)); | |
1336 | } | |
1337 | set_bypass_fsync(vm["unsafe-bypass-fsync"].as<bool>()); | |
1338 | _force_io_getevents_syscall = vm["force-aio-syscalls"].as<bool>(); | |
9f95a23c TL |
1339 | aio_nowait_supported = vm["linux-aio-nowait"].as<bool>(); |
1340 | _have_aio_fsync = vm["aio-fsync"].as<bool>(); | |
11fdf7f2 TL |
1341 | } |
1342 | ||
9f95a23c TL |
1343 | pollable_fd |
1344 | reactor::posix_listen(socket_address sa, listen_options opts) { | |
1345 | auto specific_protocol = (int)(opts.proto); | |
1346 | if (sa.is_af_unix()) { | |
1347 | // no type-safe way to create listen_opts with proto=0 | |
1348 | specific_protocol = 0; | |
1349 | } | |
1350 | static auto somaxconn = [] { | |
f67539c2 | 1351 | std::optional<int> result; |
9f95a23c TL |
1352 | std::ifstream ifs("/proc/sys/net/core/somaxconn"); |
1353 | if (ifs) { | |
1354 | result = 0; | |
1355 | ifs >> *result; | |
1356 | } | |
1357 | return result; | |
1358 | }(); | |
1359 | if (somaxconn && *somaxconn < opts.listen_backlog) { | |
1360 | fmt::print( | |
1361 | "Warning: /proc/sys/net/core/somaxconn is set to {:d} " | |
1362 | "which is lower than the backlog parameter {:d} used for listen(), " | |
1363 | "please change it with `sysctl -w net.core.somaxconn={:d}`\n", | |
1364 | *somaxconn, opts.listen_backlog, opts.listen_backlog); | |
11fdf7f2 | 1365 | } |
9f95a23c TL |
1366 | |
1367 | file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, specific_protocol); | |
1368 | if (opts.reuse_address) { | |
1369 | fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1); | |
11fdf7f2 | 1370 | } |
9f95a23c TL |
1371 | if (_reuseport && !sa.is_af_unix()) |
1372 | fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); | |
11fdf7f2 | 1373 | |
9f95a23c TL |
1374 | try { |
1375 | fd.bind(sa.u.sa, sa.length()); | |
1376 | fd.listen(opts.listen_backlog); | |
1377 | } catch (const std::system_error& s) { | |
1378 | throw std::system_error(s.code(), fmt::format("posix_listen failed for address {}", sa)); | |
11fdf7f2 | 1379 | } |
11fdf7f2 | 1380 | |
11fdf7f2 TL |
1381 | return pollable_fd(std::move(fd)); |
1382 | } | |
1383 | ||
1384 | bool | |
1385 | reactor::posix_reuseport_detect() { | |
1386 | return false; // FIXME: reuseport currently leads to heavy load imbalance. Until we fix that, just | |
1387 | // disable it unconditionally. | |
1388 | try { | |
1389 | file_desc fd = file_desc::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); | |
1390 | fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); | |
1391 | return true; | |
1392 | } catch(std::system_error& e) { | |
1393 | return false; | |
1394 | } | |
1395 | } | |
1396 | ||
9f95a23c TL |
1397 | void pollable_fd_state::maybe_no_more_recv() { |
1398 | if (no_more_recv) { | |
11fdf7f2 TL |
1399 | throw std::system_error(std::error_code(ECONNABORTED, std::system_category())); |
1400 | } | |
1401 | } | |
1402 | ||
9f95a23c TL |
1403 | void pollable_fd_state::maybe_no_more_send() { |
1404 | if (no_more_send) { | |
11fdf7f2 TL |
1405 | throw std::system_error(std::error_code(ECONNABORTED, std::system_category())); |
1406 | } | |
1407 | } | |
1408 | ||
9f95a23c TL |
1409 | void pollable_fd_state::forget() { |
1410 | engine()._backend->forget(*this); | |
1411 | } | |
1412 | ||
1413 | void intrusive_ptr_release(pollable_fd_state* fd) { | |
1414 | if (!--fd->_refs) { | |
1415 | fd->forget(); | |
1416 | } | |
1417 | } | |
1418 | ||
1419 | pollable_fd::pollable_fd(file_desc fd, pollable_fd::speculation speculate) | |
1420 | : _s(engine()._backend->make_pollable_fd_state(std::move(fd), speculate)) | |
1421 | {} | |
1422 | ||
1423 | void pollable_fd::shutdown(int how) { | |
1424 | engine()._backend->shutdown(*_s, how); | |
1425 | } | |
1426 | ||
f67539c2 | 1427 | pollable_fd |
9f95a23c TL |
1428 | reactor::make_pollable_fd(socket_address sa, int proto) { |
1429 | file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, proto); | |
f67539c2 | 1430 | return pollable_fd(std::move(fd)); |
11fdf7f2 TL |
1431 | } |
1432 | ||
1433 | future<> | |
f67539c2 | 1434 | reactor::posix_connect(pollable_fd pfd, socket_address sa, socket_address local) { |
9f95a23c TL |
1435 | #ifdef IP_BIND_ADDRESS_NO_PORT |
1436 | if (!sa.is_af_unix()) { | |
1437 | try { | |
1438 | // do not reserve an ephemeral port when using bind() with port number 0. | |
1439 | // connect() will handle it later. The reason for that is that bind() may fail | |
1440 | // to allocate a port while connect will success, this is because bind() does not | |
1441 | // know dst address and has to find globally unique local port. | |
f67539c2 | 1442 | pfd.get_file_desc().setsockopt(SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1); |
9f95a23c TL |
1443 | } catch (std::system_error& err) { |
1444 | if (err.code() != std::error_code(ENOPROTOOPT, std::system_category())) { | |
1445 | throw; | |
1446 | } | |
11fdf7f2 | 1447 | } |
9f95a23c TL |
1448 | } |
1449 | #endif | |
1450 | if (!local.is_wildcard()) { | |
1451 | // call bind() only if local address is not wildcard | |
f67539c2 | 1452 | pfd.get_file_desc().bind(local.u.sa, local.length()); |
9f95a23c | 1453 | } |
f67539c2 | 1454 | return pfd.connect(sa).finally([pfd] {}); |
11fdf7f2 TL |
1455 | } |
1456 | ||
1457 | server_socket | |
1458 | reactor::listen(socket_address sa, listen_options opt) { | |
1459 | return server_socket(_network_stack->listen(sa, opt)); | |
1460 | } | |
1461 | ||
1462 | future<connected_socket> | |
1463 | reactor::connect(socket_address sa) { | |
1464 | return _network_stack->connect(sa); | |
1465 | } | |
1466 | ||
1467 | future<connected_socket> | |
1468 | reactor::connect(socket_address sa, socket_address local, transport proto) { | |
1469 | return _network_stack->connect(sa, local, proto); | |
1470 | } | |
1471 | ||
9f95a23c TL |
1472 | sstring io_request::opname() const { |
1473 | switch (_op) { | |
1474 | case io_request::operation::fdatasync: | |
1475 | return "fdatasync"; | |
1476 | case io_request::operation::write: | |
1477 | return "write"; | |
1478 | case io_request::operation::writev: | |
1479 | return "vectored write"; | |
1480 | case io_request::operation::read: | |
1481 | return "read"; | |
1482 | case io_request::operation::readv: | |
1483 | return "vectored read"; | |
1484 | case io_request::operation::recv: | |
1485 | return "recv"; | |
1486 | case io_request::operation::recvmsg: | |
1487 | return "recvmsg"; | |
1488 | case io_request::operation::send: | |
1489 | return "send"; | |
1490 | case io_request::operation::sendmsg: | |
1491 | return "sendmsg"; | |
1492 | case io_request::operation::accept: | |
1493 | return "accept"; | |
1494 | case io_request::operation::connect: | |
1495 | return "connect"; | |
1496 | case io_request::operation::poll_add: | |
1497 | return "poll add"; | |
1498 | case io_request::operation::poll_remove: | |
1499 | return "poll remove"; | |
1500 | case io_request::operation::cancel: | |
1501 | return "cancel"; | |
1502 | } | |
1503 | std::abort(); | |
11fdf7f2 TL |
1504 | } |
1505 | ||
f67539c2 TL |
1506 | void io_completion::complete_with(ssize_t res) { |
1507 | if (res >= 0) { | |
1508 | complete(res); | |
1509 | return; | |
1510 | } | |
1511 | ||
1512 | ++engine()._io_stats.aio_errors; | |
1513 | try { | |
1514 | throw_kernel_error(res); | |
1515 | } catch (...) { | |
1516 | set_exception(std::current_exception()); | |
1517 | } | |
1518 | } | |
1519 | ||
11fdf7f2 | 1520 | void |
f67539c2 | 1521 | reactor::submit_io(io_completion* desc, io_request req) noexcept { |
9f95a23c | 1522 | req.attach_kernel_completion(desc); |
f67539c2 TL |
1523 | try { |
1524 | _pending_io.push_back(std::move(req)); | |
1525 | } catch (...) { | |
1526 | desc->set_exception(std::current_exception()); | |
1527 | } | |
11fdf7f2 TL |
1528 | } |
1529 | ||
1530 | bool | |
1531 | reactor::flush_pending_aio() { | |
1532 | for (auto& ioq : my_io_queues) { | |
1533 | ioq->poll_io_queue(); | |
1534 | } | |
9f95a23c | 1535 | return false; |
11fdf7f2 TL |
1536 | } |
1537 | ||
f67539c2 TL |
1538 | bool |
1539 | reactor::reap_kernel_completions() { | |
1540 | return _backend->reap_kernel_completions(); | |
1541 | } | |
1542 | ||
11fdf7f2 TL |
1543 | const io_priority_class& default_priority_class() { |
1544 | static thread_local auto shard_default_class = [] { | |
1545 | return engine().register_one_priority_class("default", 1); | |
1546 | }(); | |
1547 | return shard_default_class; | |
1548 | } | |
1549 | ||
9f95a23c | 1550 | future<size_t> |
f67539c2 | 1551 | reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept { |
11fdf7f2 TL |
1552 | ++_io_stats.aio_reads; |
1553 | _io_stats.aio_read_bytes += len; | |
9f95a23c | 1554 | return ioq->queue_request(pc, len, std::move(req)); |
11fdf7f2 TL |
1555 | } |
1556 | ||
9f95a23c | 1557 | future<size_t> |
f67539c2 | 1558 | reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept { |
11fdf7f2 TL |
1559 | ++_io_stats.aio_writes; |
1560 | _io_stats.aio_write_bytes += len; | |
9f95a23c | 1561 | return ioq->queue_request(pc, len, std::move(req)); |
11fdf7f2 TL |
1562 | } |
1563 | ||
9f95a23c | 1564 | namespace internal { |
11fdf7f2 | 1565 | |
9f95a23c TL |
1566 | size_t sanitize_iovecs(std::vector<iovec>& iov, size_t disk_alignment) noexcept { |
1567 | if (iov.size() > IOV_MAX) { | |
1568 | iov.resize(IOV_MAX); | |
11fdf7f2 | 1569 | } |
9f95a23c TL |
1570 | auto length = boost::accumulate(iov | boost::adaptors::transformed(std::mem_fn(&iovec::iov_len)), size_t(0)); |
1571 | while (auto rest = length & (disk_alignment - 1)) { | |
1572 | if (iov.back().iov_len <= rest) { | |
1573 | length -= iov.back().iov_len; | |
1574 | iov.pop_back(); | |
11fdf7f2 | 1575 | } else { |
9f95a23c TL |
1576 | iov.back().iov_len -= rest; |
1577 | length -= rest; | |
11fdf7f2 | 1578 | } |
11fdf7f2 | 1579 | } |
9f95a23c | 1580 | return length; |
11fdf7f2 TL |
1581 | } |
1582 | ||
11fdf7f2 TL |
1583 | } |
1584 | ||
1585 | future<file> | |
f67539c2 TL |
1586 | reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_options options) noexcept { |
1587 | return do_with(static_cast<int>(flags), std::move(options), [this, nameref] (auto& open_flags, file_open_options& options) { | |
1588 | sstring name(nameref); | |
1589 | return _thread_pool->submit<syscall_result<int>>([name, &open_flags, &options, strict_o_direct = _strict_o_direct, bypass_fsync = _bypass_fsync] () mutable { | |
1590 | // We want O_DIRECT, except in two cases: | |
1591 | // - tmpfs (which doesn't support it, but works fine anyway) | |
1592 | // - strict_o_direct == false (where we forgive it being not supported) | |
1593 | // Because open() with O_DIRECT will fail, we open it without O_DIRECT, try | |
1594 | // to update it to O_DIRECT with fcntl(), and if that fails, see if we | |
1595 | // can forgive it. | |
1596 | auto is_tmpfs = [] (int fd) { | |
1597 | struct ::statfs buf; | |
1598 | auto r = ::fstatfs(fd, &buf); | |
1599 | if (r == -1) { | |
1600 | return false; | |
1601 | } | |
1602 | return buf.f_type == 0x01021994; // TMPFS_MAGIC | |
1603 | }; | |
1604 | open_flags |= O_CLOEXEC; | |
1605 | if (bypass_fsync) { | |
1606 | open_flags &= ~O_DSYNC; | |
11fdf7f2 | 1607 | } |
f67539c2 TL |
1608 | auto mode = static_cast<mode_t>(options.create_permissions); |
1609 | int fd = ::open(name.c_str(), open_flags, mode); | |
1610 | if (fd == -1) { | |
1611 | return wrap_syscall<int>(fd); | |
11fdf7f2 | 1612 | } |
f67539c2 TL |
1613 | int r = ::fcntl(fd, F_SETFL, open_flags | O_DIRECT); |
1614 | auto maybe_ret = wrap_syscall<int>(r); // capture errno (should be EINVAL) | |
1615 | if (r == -1 && strict_o_direct && !is_tmpfs(fd)) { | |
1616 | ::close(fd); | |
1617 | return maybe_ret; | |
1618 | } | |
1619 | if (fd != -1) { | |
1620 | fsxattr attr = {}; | |
1621 | if (options.extent_allocation_size_hint) { | |
1622 | attr.fsx_xflags |= XFS_XFLAG_EXTSIZE; | |
1623 | attr.fsx_extsize = options.extent_allocation_size_hint; | |
1624 | } | |
1625 | // Ignore error; may be !xfs, and just a hint anyway | |
1626 | ::ioctl(fd, XFS_IOC_FSSETXATTR, &attr); | |
1627 | } | |
1628 | return wrap_syscall<int>(fd); | |
1629 | }).then([&options, name = std::move(name), &open_flags] (syscall_result<int> sr) { | |
1630 | sr.throw_fs_exception_if_error("open failed", name); | |
1631 | return make_file_impl(sr.result, options, open_flags); | |
1632 | }).then([] (shared_ptr<file_impl> impl) { | |
1633 | return make_ready_future<file>(std::move(impl)); | |
1634 | }); | |
11fdf7f2 TL |
1635 | }); |
1636 | } | |
1637 | ||
1638 | future<> | |
f67539c2 TL |
1639 | reactor::remove_file(std::string_view pathname) noexcept { |
1640 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1641 | return futurize_invoke([pathname] { | |
1642 | return engine()._thread_pool->submit<syscall_result<int>>([pathname = sstring(pathname)] { | |
1643 | return wrap_syscall<int>(::remove(pathname.c_str())); | |
1644 | }).then([pathname = sstring(pathname)] (syscall_result<int> sr) { | |
1645 | sr.throw_fs_exception_if_error("remove failed", pathname); | |
1646 | return make_ready_future<>(); | |
1647 | }); | |
11fdf7f2 TL |
1648 | }); |
1649 | } | |
1650 | ||
1651 | future<> | |
f67539c2 TL |
1652 | reactor::rename_file(std::string_view old_pathname, std::string_view new_pathname) noexcept { |
1653 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1654 | return futurize_invoke([old_pathname, new_pathname] { | |
1655 | return engine()._thread_pool->submit<syscall_result<int>>([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] { | |
1656 | return wrap_syscall<int>(::rename(old_pathname.c_str(), new_pathname.c_str())); | |
1657 | }).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) { | |
1658 | sr.throw_fs_exception_if_error("rename failed", old_pathname, new_pathname); | |
1659 | return make_ready_future<>(); | |
1660 | }); | |
11fdf7f2 TL |
1661 | }); |
1662 | } | |
1663 | ||
1664 | future<> | |
f67539c2 TL |
1665 | reactor::link_file(std::string_view oldpath, std::string_view newpath) noexcept { |
1666 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1667 | return futurize_invoke([oldpath, newpath] { | |
1668 | return engine()._thread_pool->submit<syscall_result<int>>([oldpath = sstring(oldpath), newpath = sstring(newpath)] { | |
1669 | return wrap_syscall<int>(::link(oldpath.c_str(), newpath.c_str())); | |
1670 | }).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result<int> sr) { | |
1671 | sr.throw_fs_exception_if_error("link failed", oldpath, newpath); | |
1672 | return make_ready_future<>(); | |
1673 | }); | |
11fdf7f2 TL |
1674 | }); |
1675 | } | |
1676 | ||
9f95a23c | 1677 | future<> |
f67539c2 | 1678 | reactor::chmod(std::string_view name, file_permissions permissions) noexcept { |
9f95a23c | 1679 | auto mode = static_cast<mode_t>(permissions); |
f67539c2 TL |
1680 | // Allocating memory for a sstring can throw, hence the futurize_invoke |
1681 | return futurize_invoke([name, mode, this] { | |
1682 | return _thread_pool->submit<syscall_result<int>>([name = sstring(name), mode] { | |
1683 | return wrap_syscall<int>(::chmod(name.c_str(), mode)); | |
1684 | }).then([name = sstring(name), mode] (syscall_result<int> sr) { | |
1685 | if (sr.result == -1) { | |
1686 | auto reason = format("chmod(0{:o}) failed", mode); | |
1687 | sr.throw_fs_exception(reason, fs::path(name)); | |
1688 | } | |
1689 | return make_ready_future<>(); | |
1690 | }); | |
9f95a23c TL |
1691 | }); |
1692 | } | |
1693 | ||
11fdf7f2 TL |
1694 | directory_entry_type stat_to_entry_type(__mode_t type) { |
1695 | if (S_ISDIR(type)) { | |
1696 | return directory_entry_type::directory; | |
1697 | } | |
1698 | if (S_ISBLK(type)) { | |
1699 | return directory_entry_type::block_device; | |
1700 | } | |
1701 | if (S_ISCHR(type)) { | |
9f95a23c | 1702 | return directory_entry_type::char_device; |
11fdf7f2 TL |
1703 | } |
1704 | if (S_ISFIFO(type)) { | |
1705 | return directory_entry_type::fifo; | |
1706 | } | |
1707 | if (S_ISLNK(type)) { | |
1708 | return directory_entry_type::link; | |
1709 | } | |
9f95a23c TL |
1710 | if (S_ISSOCK(type)) { |
1711 | return directory_entry_type::socket; | |
1712 | } | |
1713 | if (S_ISREG(type)) { | |
1714 | return directory_entry_type::regular; | |
1715 | } | |
1716 | return directory_entry_type::unknown; | |
11fdf7f2 TL |
1717 | } |
1718 | ||
f67539c2 TL |
1719 | future<std::optional<directory_entry_type>> |
1720 | reactor::file_type(std::string_view name, follow_symlink follow) noexcept { | |
1721 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1722 | return futurize_invoke([name, follow, this] { | |
1723 | return _thread_pool->submit<syscall_result_extra<struct stat>>([name = sstring(name), follow] { | |
1724 | struct stat st; | |
1725 | auto stat_syscall = follow ? stat : lstat; | |
1726 | auto ret = stat_syscall(name.c_str(), &st); | |
1727 | return wrap_syscall(ret, st); | |
1728 | }).then([name = sstring(name)] (syscall_result_extra<struct stat> sr) { | |
1729 | if (long(sr.result) == -1) { | |
1730 | if (sr.error != ENOENT && sr.error != ENOTDIR) { | |
1731 | sr.throw_fs_exception_if_error("stat failed", name); | |
1732 | } | |
1733 | return make_ready_future<std::optional<directory_entry_type> > | |
1734 | (std::optional<directory_entry_type>() ); | |
11fdf7f2 | 1735 | } |
f67539c2 TL |
1736 | return make_ready_future<std::optional<directory_entry_type> > |
1737 | (std::optional<directory_entry_type>(stat_to_entry_type(sr.extra.st_mode)) ); | |
1738 | }); | |
11fdf7f2 TL |
1739 | }); |
1740 | } | |
1741 | ||
f67539c2 TL |
1742 | future<std::optional<directory_entry_type>> |
1743 | file_type(std::string_view name, follow_symlink follow) noexcept { | |
1744 | return engine().file_type(name, follow); | |
1745 | } | |
1746 | ||
9f95a23c TL |
1747 | static std::chrono::system_clock::time_point |
1748 | timespec_to_time_point(const timespec& ts) { | |
1749 | auto d = std::chrono::duration_cast<std::chrono::system_clock::duration>( | |
1750 | ts.tv_sec * 1s + ts.tv_nsec * 1ns); | |
1751 | return std::chrono::system_clock::time_point(d); | |
1752 | } | |
1753 | ||
f67539c2 TL |
1754 | future<struct stat> |
1755 | reactor::fstat(int fd) noexcept { | |
1756 | return _thread_pool->submit<syscall_result_extra<struct stat>>([fd] { | |
11fdf7f2 | 1757 | struct stat st; |
f67539c2 | 1758 | auto ret = ::fstat(fd, &st); |
11fdf7f2 | 1759 | return wrap_syscall(ret, st); |
f67539c2 TL |
1760 | }).then([] (syscall_result_extra<struct stat> ret) { |
1761 | ret.throw_if_error(); | |
1762 | return make_ready_future<struct stat>(ret.extra); | |
1763 | }); | |
1764 | } | |
1765 | ||
1766 | future<int> | |
1767 | reactor::inotify_add_watch(int fd, std::string_view path, uint32_t flags) { | |
1768 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1769 | return futurize_invoke([path, fd, flags, this] { | |
1770 | return _thread_pool->submit<syscall_result<int>>([fd, path = sstring(path), flags] { | |
1771 | auto ret = ::inotify_add_watch(fd, path.c_str(), flags); | |
1772 | return wrap_syscall(ret); | |
1773 | }).then([] (syscall_result<int> ret) { | |
1774 | ret.throw_if_error(); | |
1775 | return make_ready_future<int>(ret.result); | |
1776 | }); | |
1777 | }); | |
1778 | } | |
1779 | ||
1780 | future<stat_data> | |
1781 | reactor::file_stat(std::string_view pathname, follow_symlink follow) noexcept { | |
1782 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1783 | return futurize_invoke([pathname, follow, this] { | |
1784 | return _thread_pool->submit<syscall_result_extra<struct stat>>([pathname = sstring(pathname), follow] { | |
1785 | struct stat st; | |
1786 | auto stat_syscall = follow ? stat : lstat; | |
1787 | auto ret = stat_syscall(pathname.c_str(), &st); | |
1788 | return wrap_syscall(ret, st); | |
1789 | }).then([pathname = sstring(pathname)] (syscall_result_extra<struct stat> sr) { | |
1790 | sr.throw_fs_exception_if_error("stat failed", pathname); | |
1791 | struct stat& st = sr.extra; | |
1792 | stat_data sd; | |
1793 | sd.device_id = st.st_dev; | |
1794 | sd.inode_number = st.st_ino; | |
1795 | sd.mode = st.st_mode; | |
1796 | sd.type = stat_to_entry_type(st.st_mode); | |
1797 | sd.number_of_links = st.st_nlink; | |
1798 | sd.uid = st.st_uid; | |
1799 | sd.gid = st.st_gid; | |
1800 | sd.rdev = st.st_rdev; | |
1801 | sd.size = st.st_size; | |
1802 | sd.block_size = st.st_blksize; | |
1803 | sd.allocated_size = st.st_blocks * 512UL; | |
1804 | sd.time_accessed = timespec_to_time_point(st.st_atim); | |
1805 | sd.time_modified = timespec_to_time_point(st.st_mtim); | |
1806 | sd.time_changed = timespec_to_time_point(st.st_ctim); | |
1807 | return make_ready_future<stat_data>(std::move(sd)); | |
1808 | }); | |
9f95a23c TL |
1809 | }); |
1810 | } | |
1811 | ||
1812 | future<uint64_t> | |
f67539c2 | 1813 | reactor::file_size(std::string_view pathname) noexcept { |
9f95a23c TL |
1814 | return file_stat(pathname, follow_symlink::yes).then([] (stat_data sd) { |
1815 | return make_ready_future<uint64_t>(sd.size); | |
11fdf7f2 TL |
1816 | }); |
1817 | } | |
1818 | ||
1819 | future<bool> | |
f67539c2 TL |
1820 | reactor::file_accessible(std::string_view pathname, access_flags flags) noexcept { |
1821 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1822 | return futurize_invoke([pathname, flags, this] { | |
1823 | return _thread_pool->submit<syscall_result<int>>([pathname = sstring(pathname), flags] { | |
1824 | auto aflags = std::underlying_type_t<access_flags>(flags); | |
1825 | auto ret = ::access(pathname.c_str(), aflags); | |
1826 | return wrap_syscall(ret); | |
1827 | }).then([pathname = sstring(pathname), flags] (syscall_result<int> sr) { | |
1828 | if (sr.result < 0) { | |
1829 | if ((sr.error == ENOENT && flags == access_flags::exists) || | |
1830 | (sr.error == EACCES && flags != access_flags::exists)) { | |
1831 | return make_ready_future<bool>(false); | |
1832 | } | |
1833 | sr.throw_fs_exception("access failed", fs::path(pathname)); | |
9f95a23c | 1834 | } |
9f95a23c | 1835 | |
f67539c2 TL |
1836 | return make_ready_future<bool>(true); |
1837 | }); | |
11fdf7f2 TL |
1838 | }); |
1839 | } | |
1840 | ||
1841 | future<fs_type> | |
f67539c2 TL |
1842 | reactor::file_system_at(std::string_view pathname) noexcept { |
1843 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1844 | return futurize_invoke([pathname, this] { | |
1845 | return _thread_pool->submit<syscall_result_extra<struct statfs>>([pathname = sstring(pathname)] { | |
1846 | struct statfs st; | |
1847 | auto ret = statfs(pathname.c_str(), &st); | |
1848 | return wrap_syscall(ret, st); | |
1849 | }).then([pathname = sstring(pathname)] (syscall_result_extra<struct statfs> sr) { | |
1850 | static std::unordered_map<long int, fs_type> type_mapper = { | |
1851 | { 0x58465342, fs_type::xfs }, | |
1852 | { EXT2_SUPER_MAGIC, fs_type::ext2 }, | |
1853 | { EXT3_SUPER_MAGIC, fs_type::ext3 }, | |
1854 | { EXT4_SUPER_MAGIC, fs_type::ext4 }, | |
1855 | { BTRFS_SUPER_MAGIC, fs_type::btrfs }, | |
1856 | { 0x4244, fs_type::hfs }, | |
1857 | { TMPFS_MAGIC, fs_type::tmpfs }, | |
1858 | }; | |
1859 | sr.throw_fs_exception_if_error("statfs failed", pathname); | |
1860 | ||
1861 | fs_type ret = fs_type::other; | |
1862 | if (type_mapper.count(sr.extra.f_type) != 0) { | |
1863 | ret = type_mapper.at(sr.extra.f_type); | |
1864 | } | |
1865 | return make_ready_future<fs_type>(ret); | |
1866 | }); | |
1867 | }); | |
1868 | } | |
1869 | ||
1870 | future<struct statfs> | |
1871 | reactor::fstatfs(int fd) noexcept { | |
1872 | return _thread_pool->submit<syscall_result_extra<struct statfs>>([fd] { | |
11fdf7f2 | 1873 | struct statfs st; |
f67539c2 | 1874 | auto ret = ::fstatfs(fd, &st); |
11fdf7f2 | 1875 | return wrap_syscall(ret, st); |
f67539c2 TL |
1876 | }).then([] (syscall_result_extra<struct statfs> sr) { |
1877 | sr.throw_if_error(); | |
1878 | struct statfs st = sr.extra; | |
1879 | return make_ready_future<struct statfs>(std::move(st)); | |
11fdf7f2 TL |
1880 | }); |
1881 | } | |
1882 | ||
1883 | future<struct statvfs> | |
f67539c2 TL |
1884 | reactor::statvfs(std::string_view pathname) noexcept { |
1885 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1886 | return futurize_invoke([pathname, this] { | |
1887 | return _thread_pool->submit<syscall_result_extra<struct statvfs>>([pathname = sstring(pathname)] { | |
1888 | struct statvfs st; | |
1889 | auto ret = ::statvfs(pathname.c_str(), &st); | |
1890 | return wrap_syscall(ret, st); | |
1891 | }).then([pathname = sstring(pathname)] (syscall_result_extra<struct statvfs> sr) { | |
1892 | sr.throw_fs_exception_if_error("statvfs failed", pathname); | |
1893 | struct statvfs st = sr.extra; | |
1894 | return make_ready_future<struct statvfs>(std::move(st)); | |
1895 | }); | |
11fdf7f2 TL |
1896 | }); |
1897 | } | |
1898 | ||
1899 | future<file> | |
f67539c2 TL |
1900 | reactor::open_directory(std::string_view name) noexcept { |
1901 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1902 | return futurize_invoke([name, this] { | |
1903 | auto oflags = O_DIRECTORY | O_CLOEXEC | O_RDONLY; | |
1904 | return _thread_pool->submit<syscall_result<int>>([name = sstring(name), oflags] { | |
1905 | return wrap_syscall<int>(::open(name.c_str(), oflags)); | |
1906 | }).then([name = sstring(name), oflags] (syscall_result<int> sr) { | |
1907 | sr.throw_fs_exception_if_error("open failed", name); | |
1908 | return make_file_impl(sr.result, file_open_options(), oflags); | |
1909 | }).then([] (shared_ptr<file_impl> file_impl) { | |
1910 | return make_ready_future<file>(std::move(file_impl)); | |
1911 | }); | |
11fdf7f2 TL |
1912 | }); |
1913 | } | |
1914 | ||
1915 | future<> | |
f67539c2 TL |
1916 | reactor::make_directory(std::string_view name, file_permissions permissions) noexcept { |
1917 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1918 | return futurize_invoke([name, permissions, this] { | |
1919 | return _thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] { | |
1920 | auto mode = static_cast<mode_t>(permissions); | |
1921 | return wrap_syscall<int>(::mkdir(name.c_str(), mode)); | |
1922 | }).then([name = sstring(name)] (syscall_result<int> sr) { | |
1923 | sr.throw_fs_exception_if_error("mkdir failed", name); | |
1924 | }); | |
11fdf7f2 TL |
1925 | }); |
1926 | } | |
1927 | ||
1928 | future<> | |
f67539c2 TL |
1929 | reactor::touch_directory(std::string_view name, file_permissions permissions) noexcept { |
1930 | // Allocating memory for a sstring can throw, hence the futurize_invoke | |
1931 | return futurize_invoke([name, permissions] { | |
1932 | return engine()._thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] { | |
1933 | auto mode = static_cast<mode_t>(permissions); | |
1934 | return wrap_syscall<int>(::mkdir(name.c_str(), mode)); | |
1935 | }).then([name = sstring(name)] (syscall_result<int> sr) { | |
1936 | if (sr.result == -1 && sr.error != EEXIST) { | |
1937 | sr.throw_fs_exception("mkdir failed", fs::path(name)); | |
1938 | } | |
1939 | return make_ready_future<>(); | |
1940 | }); | |
11fdf7f2 | 1941 | }); |
11fdf7f2 TL |
1942 | } |
1943 | ||
1944 | future<> | |
f67539c2 | 1945 | reactor::fdatasync(int fd) noexcept { |
9f95a23c TL |
1946 | ++_fsyncs; |
1947 | if (_bypass_fsync) { | |
11fdf7f2 | 1948 | return make_ready_future<>(); |
9f95a23c TL |
1949 | } |
1950 | if (_have_aio_fsync) { | |
f67539c2 TL |
1951 | // Does not go through the I/O queue, but has to be deleted |
1952 | struct fsync_io_desc final : public io_completion { | |
1953 | promise<> _pr; | |
1954 | public: | |
1955 | virtual void complete(size_t res) noexcept override { | |
1956 | _pr.set_value(); | |
1957 | delete this; | |
1958 | } | |
11fdf7f2 | 1959 | |
f67539c2 TL |
1960 | virtual void set_exception(std::exception_ptr eptr) noexcept override { |
1961 | _pr.set_exception(std::move(eptr)); | |
1962 | delete this; | |
1963 | } | |
11fdf7f2 | 1964 | |
f67539c2 TL |
1965 | future<> get_future() { |
1966 | return _pr.get_future(); | |
1967 | } | |
1968 | }; | |
11fdf7f2 | 1969 | |
f67539c2 TL |
1970 | return futurize_invoke([this, fd] { |
1971 | auto desc = new fsync_io_desc; | |
1972 | auto fut = desc->get_future(); | |
9f95a23c | 1973 | auto req = io_request::make_fdatasync(fd); |
f67539c2 | 1974 | submit_io(desc, std::move(req)); |
9f95a23c | 1975 | return fut; |
f67539c2 | 1976 | }); |
9f95a23c TL |
1977 | } |
1978 | return _thread_pool->submit<syscall_result<int>>([fd] { | |
1979 | return wrap_syscall<int>(::fdatasync(fd)); | |
1980 | }).then([] (syscall_result<int> sr) { | |
11fdf7f2 | 1981 | sr.throw_if_error(); |
9f95a23c | 1982 | return make_ready_future<>(); |
11fdf7f2 TL |
1983 | }); |
1984 | } | |
1985 | ||
f67539c2 TL |
1986 | // Note: terminate if arm_highres_timer throws |
1987 | // `when` should always be valid | |
1988 | void reactor::enable_timer(steady_clock_type::time_point when) noexcept | |
11fdf7f2 TL |
1989 | { |
1990 | #ifndef HAVE_OSV | |
1991 | itimerspec its; | |
1992 | its.it_interval = {}; | |
1993 | its.it_value = to_timespec(when); | |
1994 | _backend->arm_highres_timer(its); | |
1995 | #else | |
1996 | using ns = std::chrono::nanoseconds; | |
1997 | WITH_LOCK(_timer_mutex) { | |
1998 | _timer_due = std::chrono::duration_cast<ns>(when.time_since_epoch()).count(); | |
1999 | _timer_cond.wake_one(); | |
2000 | } | |
2001 | #endif | |
2002 | } | |
2003 | ||
f67539c2 | 2004 | void reactor::add_timer(timer<steady_clock_type>* tmr) noexcept { |
11fdf7f2 TL |
2005 | if (queue_timer(tmr)) { |
2006 | enable_timer(_timers.get_next_timeout()); | |
2007 | } | |
2008 | } | |
2009 | ||
f67539c2 | 2010 | bool reactor::queue_timer(timer<steady_clock_type>* tmr) noexcept { |
11fdf7f2 TL |
2011 | return _timers.insert(*tmr); |
2012 | } | |
2013 | ||
f67539c2 | 2014 | void reactor::del_timer(timer<steady_clock_type>* tmr) noexcept { |
11fdf7f2 TL |
2015 | if (tmr->_expired) { |
2016 | _expired_timers.erase(_expired_timers.iterator_to(*tmr)); | |
2017 | tmr->_expired = false; | |
2018 | } else { | |
2019 | _timers.remove(*tmr); | |
2020 | } | |
2021 | } | |
2022 | ||
f67539c2 | 2023 | void reactor::add_timer(timer<lowres_clock>* tmr) noexcept { |
11fdf7f2 TL |
2024 | if (queue_timer(tmr)) { |
2025 | _lowres_next_timeout = _lowres_timers.get_next_timeout(); | |
2026 | } | |
2027 | } | |
2028 | ||
f67539c2 | 2029 | bool reactor::queue_timer(timer<lowres_clock>* tmr) noexcept { |
11fdf7f2 TL |
2030 | return _lowres_timers.insert(*tmr); |
2031 | } | |
2032 | ||
f67539c2 | 2033 | void reactor::del_timer(timer<lowres_clock>* tmr) noexcept { |
11fdf7f2 TL |
2034 | if (tmr->_expired) { |
2035 | _expired_lowres_timers.erase(_expired_lowres_timers.iterator_to(*tmr)); | |
2036 | tmr->_expired = false; | |
2037 | } else { | |
2038 | _lowres_timers.remove(*tmr); | |
2039 | } | |
2040 | } | |
2041 | ||
f67539c2 | 2042 | void reactor::add_timer(timer<manual_clock>* tmr) noexcept { |
11fdf7f2 TL |
2043 | queue_timer(tmr); |
2044 | } | |
2045 | ||
f67539c2 | 2046 | bool reactor::queue_timer(timer<manual_clock>* tmr) noexcept { |
11fdf7f2 TL |
2047 | return _manual_timers.insert(*tmr); |
2048 | } | |
2049 | ||
f67539c2 | 2050 | void reactor::del_timer(timer<manual_clock>* tmr) noexcept { |
11fdf7f2 TL |
2051 | if (tmr->_expired) { |
2052 | _expired_manual_timers.erase(_expired_manual_timers.iterator_to(*tmr)); | |
2053 | tmr->_expired = false; | |
2054 | } else { | |
2055 | _manual_timers.remove(*tmr); | |
2056 | } | |
2057 | } | |
2058 | ||
9f95a23c | 2059 | void reactor::at_exit(noncopyable_function<future<> ()> func) { |
11fdf7f2 TL |
2060 | assert(!_stopping); |
2061 | _exit_funcs.push_back(std::move(func)); | |
2062 | } | |
2063 | ||
2064 | future<> reactor::run_exit_tasks() { | |
2065 | _stop_requested.broadcast(); | |
2066 | _stopping = true; | |
2067 | stop_aio_eventfd_loop(); | |
2068 | return do_for_each(_exit_funcs.rbegin(), _exit_funcs.rend(), [] (auto& func) { | |
2069 | return func(); | |
2070 | }); | |
2071 | } | |
2072 | ||
2073 | void reactor::stop() { | |
f67539c2 | 2074 | assert(_id == 0); |
11fdf7f2 TL |
2075 | smp::cleanup_cpu(); |
2076 | if (!_stopping) { | |
9f95a23c TL |
2077 | // Run exit tasks locally and then stop all other engines |
2078 | // in the background and wait on semaphore for all to complete. | |
2079 | // Finally, set _stopped on cpu 0. | |
2080 | (void)run_exit_tasks().then([this] { | |
2081 | return do_with(semaphore(0), [this] (semaphore& sem) { | |
2082 | // Stop other cpus asynchronously, signal when done. | |
2083 | (void)smp::invoke_on_others(0, [] { | |
2084 | smp::cleanup_cpu(); | |
2085 | return engine().run_exit_tasks().then([] { | |
2086 | engine()._stopped = true; | |
11fdf7f2 | 2087 | }); |
9f95a23c TL |
2088 | }).then([&sem]() { |
2089 | sem.signal(); | |
2090 | }); | |
2091 | return sem.wait().then([this] { | |
11fdf7f2 TL |
2092 | _stopped = true; |
2093 | }); | |
2094 | }); | |
2095 | }); | |
2096 | } | |
2097 | } | |
2098 | ||
2099 | void reactor::exit(int ret) { | |
9f95a23c TL |
2100 | // Run stop() asynchronously on cpu 0. |
2101 | (void)smp::submit_to(0, [this, ret] { _return = ret; stop(); }); | |
11fdf7f2 TL |
2102 | } |
2103 | ||
2104 | uint64_t | |
2105 | reactor::pending_task_count() const { | |
2106 | uint64_t ret = 0; | |
2107 | for (auto&& tq : _task_queues) { | |
2108 | ret += tq->_q.size(); | |
2109 | } | |
2110 | return ret; | |
2111 | } | |
2112 | ||
2113 | uint64_t | |
2114 | reactor::tasks_processed() const { | |
9f95a23c | 2115 | return _global_tasks_processed; |
11fdf7f2 TL |
2116 | } |
2117 | ||
2118 | void reactor::register_metrics() { | |
2119 | ||
2120 | namespace sm = seastar::metrics; | |
2121 | ||
2122 | _metric_groups.add_group("reactor", { | |
2123 | sm::make_gauge("tasks_pending", std::bind(&reactor::pending_task_count, this), sm::description("Number of pending tasks in the queue")), | |
2124 | // total_operations value:DERIVE:0:U | |
2125 | sm::make_derive("tasks_processed", std::bind(&reactor::tasks_processed, this), sm::description("Total tasks processed")), | |
9f95a23c | 2126 | sm::make_derive("polls", _polls, sm::description("Number of times pollers were executed")), |
11fdf7f2 TL |
2127 | sm::make_derive("timers_pending", std::bind(&decltype(_timers)::size, &_timers), sm::description("Number of tasks in the timer-pending queue")), |
2128 | sm::make_gauge("utilization", [this] { return (1-_load) * 100; }, sm::description("CPU utilization")), | |
2129 | sm::make_derive("cpu_busy_ms", [this] () -> int64_t { return total_busy_time() / 1ms; }, | |
2130 | sm::description("Total cpu busy time in milliseconds")), | |
2131 | sm::make_derive("cpu_steal_time_ms", [this] () -> int64_t { return total_steal_time() / 1ms; }, | |
2132 | sm::description("Total steal time, the time in which some other process was running while Seastar was not trying to run (not sleeping)." | |
2133 | "Because this is in userspace, some time that could be legitimally thought as steal time is not accounted as such. For example, if we are sleeping and can wake up but the kernel hasn't woken us up yet.")), | |
2134 | // total_operations value:DERIVE:0:U | |
2135 | sm::make_derive("aio_reads", _io_stats.aio_reads, sm::description("Total aio-reads operations")), | |
2136 | ||
2137 | sm::make_total_bytes("aio_bytes_read", _io_stats.aio_read_bytes, sm::description("Total aio-reads bytes")), | |
2138 | // total_operations value:DERIVE:0:U | |
2139 | sm::make_derive("aio_writes", _io_stats.aio_writes, sm::description("Total aio-writes operations")), | |
2140 | sm::make_total_bytes("aio_bytes_write", _io_stats.aio_write_bytes, sm::description("Total aio-writes bytes")), | |
2141 | sm::make_derive("aio_errors", _io_stats.aio_errors, sm::description("Total aio errors")), | |
2142 | // total_operations value:DERIVE:0:U | |
2143 | sm::make_derive("fsyncs", _fsyncs, sm::description("Total number of fsync operations")), | |
2144 | // total_operations value:DERIVE:0:U | |
2145 | sm::make_derive("io_threaded_fallbacks", std::bind(&thread_pool::operation_count, _thread_pool.get()), | |
2146 | sm::description("Total number of io-threaded-fallbacks operations")), | |
2147 | ||
2148 | }); | |
2149 | ||
2150 | _metric_groups.add_group("memory", { | |
2151 | sm::make_derive("malloc_operations", [] { return memory::stats().mallocs(); }, | |
2152 | sm::description("Total number of malloc operations")), | |
2153 | sm::make_derive("free_operations", [] { return memory::stats().frees(); }, sm::description("Total number of free operations")), | |
2154 | sm::make_derive("cross_cpu_free_operations", [] { return memory::stats().cross_cpu_frees(); }, sm::description("Total number of cross cpu free")), | |
2155 | sm::make_gauge("malloc_live_objects", [] { return memory::stats().live_objects(); }, sm::description("Number of live objects")), | |
2156 | sm::make_current_bytes("free_memory", [] { return memory::stats().free_memory(); }, sm::description("Free memeory size in bytes")), | |
2157 | sm::make_current_bytes("total_memory", [] { return memory::stats().total_memory(); }, sm::description("Total memeory size in bytes")), | |
2158 | sm::make_current_bytes("allocated_memory", [] { return memory::stats().allocated_memory(); }, sm::description("Allocated memeory size in bytes")), | |
2159 | sm::make_derive("reclaims_operations", [] { return memory::stats().reclaims(); }, sm::description("Total reclaims operations")) | |
2160 | }); | |
2161 | ||
2162 | _metric_groups.add_group("reactor", { | |
2163 | sm::make_derive("logging_failures", [] { return logging_failures; }, sm::description("Total number of logging failures")), | |
2164 | // total_operations value:DERIVE:0:U | |
2165 | sm::make_derive("cpp_exceptions", _cxx_exceptions, sm::description("Total number of C++ exceptions")), | |
9f95a23c | 2166 | sm::make_derive("abandoned_failed_futures", _abandoned_failed_futures, sm::description("Total number of abandoned failed futures, futures destroyed while still containing an exception")), |
11fdf7f2 TL |
2167 | }); |
2168 | ||
11fdf7f2 TL |
2169 | using namespace seastar::metrics; |
2170 | _metric_groups.add_group("reactor", { | |
2171 | make_counter("fstream_reads", _io_stats.fstream_reads, | |
2172 | description( | |
2173 | "Counts reads from disk file streams. A high rate indicates high disk activity." | |
2174 | " Contrast with other fstream_read* counters to locate bottlenecks.")), | |
2175 | make_derive("fstream_read_bytes", _io_stats.fstream_read_bytes, | |
2176 | description( | |
2177 | "Counts bytes read from disk file streams. A high rate indicates high disk activity." | |
2178 | " Divide by fstream_reads to determine average read size.")), | |
2179 | make_counter("fstream_reads_blocked", _io_stats.fstream_reads_blocked, | |
2180 | description( | |
2181 | "Counts the number of times a disk read could not be satisfied from read-ahead buffers, and had to block." | |
2182 | " Indicates short streams, or incorrect read ahead configuration.")), | |
2183 | make_derive("fstream_read_bytes_blocked", _io_stats.fstream_read_bytes_blocked, | |
2184 | description( | |
2185 | "Counts the number of bytes read from disk that could not be satisfied from read-ahead buffers, and had to block." | |
2186 | " Indicates short streams, or incorrect read ahead configuration.")), | |
2187 | make_counter("fstream_reads_aheads_discarded", _io_stats.fstream_read_aheads_discarded, | |
2188 | description( | |
2189 | "Counts the number of times a buffer that was read ahead of time and was discarded because it was not needed, wasting disk bandwidth." | |
2190 | " Indicates over-eager read ahead configuration.")), | |
2191 | make_derive("fstream_reads_ahead_bytes_discarded", _io_stats.fstream_read_ahead_discarded_bytes, | |
2192 | description( | |
2193 | "Counts the number of buffered bytes that were read ahead of time and were discarded because they were not needed, wasting disk bandwidth." | |
2194 | " Indicates over-eager read ahead configuration.")), | |
2195 | }); | |
2196 | } | |
2197 | ||
2198 | void reactor::run_tasks(task_queue& tq) { | |
2199 | // Make sure new tasks will inherit our scheduling group | |
2200 | *internal::current_scheduling_group_ptr() = scheduling_group(tq._id); | |
2201 | auto& tasks = tq._q; | |
2202 | while (!tasks.empty()) { | |
9f95a23c | 2203 | auto tsk = tasks.front(); |
11fdf7f2 TL |
2204 | tasks.pop_front(); |
2205 | STAP_PROBE(seastar, reactor_run_tasks_single_start); | |
2206 | task_histogram_add_task(*tsk); | |
f67539c2 | 2207 | _current_task = tsk; |
11fdf7f2 | 2208 | tsk->run_and_dispose(); |
f67539c2 | 2209 | _current_task = nullptr; |
11fdf7f2 TL |
2210 | STAP_PROBE(seastar, reactor_run_tasks_single_end); |
2211 | ++tq._tasks_processed; | |
9f95a23c | 2212 | ++_global_tasks_processed; |
11fdf7f2 TL |
2213 | // check at end of loop, to allow at least one task to run |
2214 | if (need_preempt()) { | |
2215 | if (tasks.size() <= _max_task_backlog) { | |
2216 | break; | |
2217 | } else { | |
2218 | // While need_preempt() is set, task execution is inefficient due to | |
2219 | // need_preempt() checks breaking out of loops and .then() calls. See | |
2220 | // #302. | |
2221 | reset_preemption_monitor(); | |
2222 | } | |
2223 | } | |
2224 | } | |
2225 | } | |
2226 | ||
2227 | #ifdef SEASTAR_SHUFFLE_TASK_QUEUE | |
9f95a23c | 2228 | void reactor::shuffle(task*& t, task_queue& q) { |
11fdf7f2 TL |
2229 | static thread_local std::mt19937 gen = std::mt19937(std::default_random_engine()()); |
2230 | std::uniform_int_distribution<size_t> tasks_dist{0, q._q.size() - 1}; | |
2231 | auto& to_swap = q._q[tasks_dist(gen)]; | |
2232 | std::swap(to_swap, t); | |
2233 | } | |
2234 | #endif | |
2235 | ||
2236 | void reactor::force_poll() { | |
2237 | request_preemption(); | |
2238 | } | |
2239 | ||
2240 | bool | |
2241 | reactor::flush_tcp_batches() { | |
2242 | bool work = _flush_batching.size(); | |
2243 | while (!_flush_batching.empty()) { | |
2244 | auto os = std::move(_flush_batching.front()); | |
2245 | _flush_batching.pop_front(); | |
2246 | os->poll_flush(); | |
2247 | } | |
2248 | return work; | |
2249 | } | |
2250 | ||
2251 | bool | |
f67539c2 | 2252 | reactor::do_expire_lowres_timers() noexcept { |
11fdf7f2 TL |
2253 | if (_lowres_next_timeout == lowres_clock::time_point()) { |
2254 | return false; | |
2255 | } | |
2256 | auto now = lowres_clock::now(); | |
2257 | if (now >= _lowres_next_timeout) { | |
f67539c2 | 2258 | complete_timers(_lowres_timers, _expired_lowres_timers, [this] () noexcept { |
11fdf7f2 TL |
2259 | if (!_lowres_timers.empty()) { |
2260 | _lowres_next_timeout = _lowres_timers.get_next_timeout(); | |
2261 | } else { | |
2262 | _lowres_next_timeout = lowres_clock::time_point(); | |
2263 | } | |
2264 | }); | |
2265 | return true; | |
2266 | } | |
2267 | return false; | |
2268 | } | |
2269 | ||
2270 | void | |
f67539c2 TL |
2271 | reactor::expire_manual_timers() noexcept { |
2272 | complete_timers(_manual_timers, _expired_manual_timers, [] () noexcept {}); | |
11fdf7f2 TL |
2273 | } |
2274 | ||
2275 | void | |
f67539c2 | 2276 | manual_clock::expire_timers() noexcept { |
11fdf7f2 TL |
2277 | local_engine->expire_manual_timers(); |
2278 | } | |
2279 | ||
2280 | void | |
f67539c2 | 2281 | manual_clock::advance(manual_clock::duration d) noexcept { |
11fdf7f2 TL |
2282 | _now.fetch_add(d.count()); |
2283 | if (local_engine) { | |
2284 | schedule_urgent(make_task(default_scheduling_group(), &manual_clock::expire_timers)); | |
9f95a23c TL |
2285 | // Expire timers on all cores in the background. |
2286 | (void)smp::invoke_on_all(&manual_clock::expire_timers); | |
11fdf7f2 TL |
2287 | } |
2288 | } | |
2289 | ||
2290 | bool | |
f67539c2 | 2291 | reactor::do_check_lowres_timers() const noexcept { |
11fdf7f2 TL |
2292 | if (_lowres_next_timeout == lowres_clock::time_point()) { |
2293 | return false; | |
2294 | } | |
2295 | return lowres_clock::now() > _lowres_next_timeout; | |
2296 | } | |
2297 | ||
2298 | #ifndef HAVE_OSV | |
2299 | ||
f67539c2 | 2300 | class reactor::kernel_submit_work_pollfn final : public simple_pollfn<true> { |
11fdf7f2 TL |
2301 | reactor& _r; |
2302 | public: | |
9f95a23c | 2303 | kernel_submit_work_pollfn(reactor& r) : _r(r) {} |
11fdf7f2 | 2304 | virtual bool poll() override final { |
9f95a23c | 2305 | return _r._backend->kernel_submit_work(); |
11fdf7f2 | 2306 | } |
11fdf7f2 TL |
2307 | }; |
2308 | ||
2309 | #endif | |
2310 | ||
2311 | class reactor::signal_pollfn final : public reactor::pollfn { | |
2312 | reactor& _r; | |
2313 | public: | |
2314 | signal_pollfn(reactor& r) : _r(r) {} | |
2315 | virtual bool poll() final override { | |
2316 | return _r._signals.poll_signal(); | |
2317 | } | |
2318 | virtual bool pure_poll() override final { | |
2319 | return _r._signals.pure_poll_signal(); | |
2320 | } | |
2321 | virtual bool try_enter_interrupt_mode() override { | |
2322 | // Signals will interrupt our epoll_pwait() call, but | |
2323 | // disable them now to avoid a signal between this point | |
2324 | // and epoll_pwait() | |
2325 | sigset_t block_all; | |
2326 | sigfillset(&block_all); | |
2327 | ::pthread_sigmask(SIG_SETMASK, &block_all, &_r._active_sigmask); | |
2328 | if (poll()) { | |
2329 | // raced already, and lost | |
2330 | exit_interrupt_mode(); | |
2331 | return false; | |
2332 | } | |
2333 | return true; | |
2334 | } | |
2335 | virtual void exit_interrupt_mode() override final { | |
2336 | ::pthread_sigmask(SIG_SETMASK, &_r._active_sigmask, nullptr); | |
2337 | } | |
2338 | }; | |
2339 | ||
f67539c2 | 2340 | class reactor::batch_flush_pollfn final : public simple_pollfn<true> { |
11fdf7f2 TL |
2341 | reactor& _r; |
2342 | public: | |
2343 | batch_flush_pollfn(reactor& r) : _r(r) {} | |
2344 | virtual bool poll() final override { | |
2345 | return _r.flush_tcp_batches(); | |
2346 | } | |
11fdf7f2 TL |
2347 | }; |
2348 | ||
9f95a23c TL |
2349 | class reactor::reap_kernel_completions_pollfn final : public reactor::pollfn { |
2350 | reactor& _r; | |
2351 | public: | |
2352 | reap_kernel_completions_pollfn(reactor& r) : _r(r) {} | |
2353 | virtual bool poll() final override { | |
f67539c2 | 2354 | return _r.reap_kernel_completions(); |
9f95a23c TL |
2355 | } |
2356 | virtual bool pure_poll() override final { | |
2357 | return poll(); // actually performs work, but triggers no user continuations, so okay | |
2358 | } | |
2359 | virtual bool try_enter_interrupt_mode() override { | |
2360 | return _r._backend->kernel_events_can_sleep(); | |
2361 | } | |
2362 | virtual void exit_interrupt_mode() override final { | |
2363 | } | |
2364 | }; | |
2365 | ||
f67539c2 | 2366 | class reactor::io_queue_submission_pollfn final : public simple_pollfn<true> { |
11fdf7f2 TL |
2367 | reactor& _r; |
2368 | public: | |
9f95a23c | 2369 | io_queue_submission_pollfn(reactor& r) : _r(r) {} |
11fdf7f2 TL |
2370 | virtual bool poll() final override { |
2371 | return _r.flush_pending_aio(); | |
2372 | } | |
11fdf7f2 TL |
2373 | }; |
2374 | ||
f67539c2 TL |
2375 | // Other cpus can queue items for us to free; and they won't notify |
2376 | // us about them. But it's okay to ignore those items, freeing them | |
2377 | // doesn't have any side effects. | |
2378 | // | |
2379 | // We'll take care of those items when we wake up for another reason. | |
2380 | class reactor::drain_cross_cpu_freelist_pollfn final : public simple_pollfn<true> { | |
11fdf7f2 TL |
2381 | public: |
2382 | virtual bool poll() final override { | |
2383 | return memory::drain_cross_cpu_freelist(); | |
2384 | } | |
11fdf7f2 TL |
2385 | }; |
2386 | ||
2387 | class reactor::lowres_timer_pollfn final : public reactor::pollfn { | |
2388 | reactor& _r; | |
2389 | // A highres timer is implemented as a waking signal; so | |
2390 | // we arm one when we have a lowres timer during sleep, so | |
2391 | // it can wake us up. | |
2392 | timer<> _nearest_wakeup { [this] { _armed = false; } }; | |
2393 | bool _armed = false; | |
2394 | public: | |
2395 | lowres_timer_pollfn(reactor& r) : _r(r) {} | |
2396 | virtual bool poll() final override { | |
2397 | return _r.do_expire_lowres_timers(); | |
2398 | } | |
2399 | virtual bool pure_poll() final override { | |
2400 | return _r.do_check_lowres_timers(); | |
2401 | } | |
2402 | virtual bool try_enter_interrupt_mode() override { | |
2403 | // arm our highres timer so a signal will wake us up | |
2404 | auto next = _r._lowres_next_timeout; | |
2405 | if (next == lowres_clock::time_point()) { | |
2406 | // no pending timers | |
2407 | return true; | |
2408 | } | |
2409 | auto now = lowres_clock::now(); | |
2410 | if (next <= now) { | |
2411 | // whoops, go back | |
2412 | return false; | |
2413 | } | |
2414 | _nearest_wakeup.arm(next - now); | |
2415 | _armed = true; | |
2416 | return true; | |
2417 | } | |
2418 | virtual void exit_interrupt_mode() override final { | |
2419 | if (_armed) { | |
2420 | _nearest_wakeup.cancel(); | |
2421 | _armed = false; | |
2422 | } | |
2423 | } | |
2424 | }; | |
2425 | ||
2426 | class reactor::smp_pollfn final : public reactor::pollfn { | |
2427 | reactor& _r; | |
2428 | public: | |
2429 | smp_pollfn(reactor& r) : _r(r) {} | |
2430 | virtual bool poll() final override { | |
2431 | return (smp::poll_queues() | | |
2432 | alien::smp::poll_queues()); | |
2433 | } | |
2434 | virtual bool pure_poll() final override { | |
2435 | return (smp::pure_poll_queues() || | |
2436 | alien::smp::pure_poll_queues()); | |
2437 | } | |
2438 | virtual bool try_enter_interrupt_mode() override { | |
2439 | // systemwide_memory_barrier() is very slow if run concurrently, | |
2440 | // so don't go to sleep if it is running now. | |
2441 | _r._sleeping.store(true, std::memory_order_relaxed); | |
2442 | bool barrier_done = try_systemwide_memory_barrier(); | |
2443 | if (!barrier_done) { | |
2444 | _r._sleeping.store(false, std::memory_order_relaxed); | |
2445 | return false; | |
2446 | } | |
2447 | if (poll()) { | |
2448 | // raced | |
2449 | _r._sleeping.store(false, std::memory_order_relaxed); | |
2450 | return false; | |
2451 | } | |
2452 | return true; | |
2453 | } | |
2454 | virtual void exit_interrupt_mode() override final { | |
2455 | _r._sleeping.store(false, std::memory_order_relaxed); | |
2456 | } | |
2457 | }; | |
2458 | ||
2459 | class reactor::execution_stage_pollfn final : public reactor::pollfn { | |
2460 | internal::execution_stage_manager& _esm; | |
2461 | public: | |
2462 | execution_stage_pollfn() : _esm(internal::execution_stage_manager::get()) { } | |
2463 | ||
2464 | virtual bool poll() override { | |
2465 | return _esm.flush(); | |
2466 | } | |
2467 | virtual bool pure_poll() override { | |
2468 | return _esm.poll(); | |
2469 | } | |
2470 | virtual bool try_enter_interrupt_mode() override { | |
2471 | // This is a passive poller, so if a previous poll | |
2472 | // returned false (idle), there's no more work to do. | |
2473 | return true; | |
2474 | } | |
2475 | virtual void exit_interrupt_mode() override { } | |
2476 | }; | |
2477 | ||
2478 | class reactor::syscall_pollfn final : public reactor::pollfn { | |
2479 | reactor& _r; | |
2480 | public: | |
2481 | syscall_pollfn(reactor& r) : _r(r) {} | |
2482 | virtual bool poll() final override { | |
2483 | return _r._thread_pool->complete(); | |
2484 | } | |
2485 | virtual bool pure_poll() override final { | |
2486 | return poll(); // actually performs work, but triggers no user continuations, so okay | |
2487 | } | |
2488 | virtual bool try_enter_interrupt_mode() override { | |
2489 | _r._thread_pool->enter_interrupt_mode(); | |
2490 | if (poll()) { | |
2491 | // raced | |
2492 | _r._thread_pool->exit_interrupt_mode(); | |
2493 | return false; | |
2494 | } | |
2495 | return true; | |
2496 | } | |
2497 | virtual void exit_interrupt_mode() override final { | |
2498 | _r._thread_pool->exit_interrupt_mode(); | |
2499 | } | |
2500 | }; | |
2501 | ||
11fdf7f2 TL |
2502 | void |
2503 | reactor::wakeup() { | |
2504 | uint64_t one = 1; | |
2505 | ::write(_notify_eventfd.get(), &one, sizeof(one)); | |
2506 | } | |
2507 | ||
2508 | void reactor::start_aio_eventfd_loop() { | |
2509 | if (!_aio_eventfd) { | |
2510 | return; | |
2511 | } | |
2512 | future<> loop_done = repeat([this] { | |
2513 | return _aio_eventfd->readable().then([this] { | |
2514 | char garbage[8]; | |
2515 | ::read(_aio_eventfd->get_fd(), garbage, 8); // totally uninteresting | |
2516 | return _stopping ? stop_iteration::yes : stop_iteration::no; | |
2517 | }); | |
2518 | }); | |
2519 | // must use make_lw_shared, because at_exit expects a copyable function | |
2520 | at_exit([loop_done = make_lw_shared(std::move(loop_done))] { | |
2521 | return std::move(*loop_done); | |
2522 | }); | |
2523 | } | |
2524 | ||
2525 | void reactor::stop_aio_eventfd_loop() { | |
2526 | if (!_aio_eventfd) { | |
2527 | return; | |
2528 | } | |
2529 | uint64_t one = 1; | |
2530 | ::write(_aio_eventfd->get_fd(), &one, 8); | |
2531 | } | |
2532 | ||
2533 | inline | |
2534 | bool | |
2535 | reactor::have_more_tasks() const { | |
2536 | return _active_task_queues.size() + _activating_task_queues.size(); | |
2537 | } | |
2538 | ||
2539 | void reactor::insert_active_task_queue(task_queue* tq) { | |
2540 | tq->_active = true; | |
2541 | auto& atq = _active_task_queues; | |
2542 | auto less = task_queue::indirect_compare(); | |
2543 | if (atq.empty() || less(atq.back(), tq)) { | |
2544 | // Common case: idle->working | |
2545 | // Common case: CPU intensive task queue going to the back | |
2546 | atq.push_back(tq); | |
2547 | } else { | |
2548 | // Common case: newly activated queue preempting everything else | |
2549 | atq.push_front(tq); | |
2550 | // Less common case: newly activated queue behind something already active | |
2551 | size_t i = 0; | |
2552 | while (i + 1 != atq.size() && !less(atq[i], atq[i+1])) { | |
2553 | std::swap(atq[i], atq[i+1]); | |
2554 | ++i; | |
2555 | } | |
2556 | } | |
2557 | } | |
2558 | ||
f67539c2 TL |
2559 | reactor::task_queue* reactor::pop_active_task_queue(sched_clock::time_point now) { |
2560 | task_queue* tq = _active_task_queues.front(); | |
2561 | _active_task_queues.pop_front(); | |
2562 | tq->_starvetime += now - tq->_ts; | |
2563 | return tq; | |
2564 | } | |
2565 | ||
11fdf7f2 TL |
2566 | void |
2567 | reactor::insert_activating_task_queues() { | |
2568 | // Quadratic, but since we expect the common cases in insert_active_task_queue() to dominate, faster | |
2569 | for (auto&& tq : _activating_task_queues) { | |
2570 | insert_active_task_queue(tq); | |
2571 | } | |
2572 | _activating_task_queues.clear(); | |
2573 | } | |
2574 | ||
2575 | void | |
2576 | reactor::run_some_tasks() { | |
2577 | if (!have_more_tasks()) { | |
2578 | return; | |
2579 | } | |
2580 | sched_print("run_some_tasks: start"); | |
2581 | reset_preemption_monitor(); | |
2582 | ||
2583 | sched_clock::time_point t_run_completed = std::chrono::steady_clock::now(); | |
2584 | STAP_PROBE(seastar, reactor_run_tasks_start); | |
2585 | _cpu_stall_detector->start_task_run(t_run_completed); | |
2586 | do { | |
2587 | auto t_run_started = t_run_completed; | |
2588 | insert_activating_task_queues(); | |
f67539c2 | 2589 | task_queue* tq = pop_active_task_queue(t_run_started); |
11fdf7f2 TL |
2590 | sched_print("running tq {} {}", (void*)tq, tq->_name); |
2591 | tq->_current = true; | |
2592 | _last_vruntime = std::max(tq->_vruntime, _last_vruntime); | |
2593 | run_tasks(*tq); | |
2594 | tq->_current = false; | |
2595 | t_run_completed = std::chrono::steady_clock::now(); | |
2596 | auto delta = t_run_completed - t_run_started; | |
2597 | account_runtime(*tq, delta); | |
2598 | sched_print("run complete ({} {}); time consumed {} usec; final vruntime {} empty {}", | |
2599 | (void*)tq, tq->_name, delta / 1us, tq->_vruntime, tq->_q.empty()); | |
f67539c2 | 2600 | tq->_ts = t_run_completed; |
11fdf7f2 TL |
2601 | if (!tq->_q.empty()) { |
2602 | insert_active_task_queue(tq); | |
2603 | } else { | |
2604 | tq->_active = false; | |
2605 | } | |
2606 | } while (have_more_tasks() && !need_preempt()); | |
2607 | _cpu_stall_detector->end_task_run(t_run_completed); | |
2608 | STAP_PROBE(seastar, reactor_run_tasks_end); | |
2609 | *internal::current_scheduling_group_ptr() = default_scheduling_group(); // Prevent inheritance from last group run | |
2610 | sched_print("run_some_tasks: end"); | |
2611 | } | |
2612 | ||
2613 | void | |
2614 | reactor::activate(task_queue& tq) { | |
2615 | if (tq._active) { | |
2616 | return; | |
2617 | } | |
2618 | sched_print("activating {} {}", (void*)&tq, tq._name); | |
2619 | // If activate() was called, the task queue is likely network-bound or I/O bound, not CPU-bound. As | |
2620 | // such its vruntime will be low, and it will have a large advantage over other task queues. Limit | |
2621 | // the advantage so it doesn't dominate scheduling for a long time, in case it _does_ become CPU | |
2622 | // bound later. | |
2623 | // | |
2624 | // FIXME: different scheduling groups have different sensitivity to jitter, take advantage | |
2625 | if (_last_vruntime > tq._vruntime) { | |
2626 | sched_print("tq {} {} losing vruntime {} due to sleep", (void*)&tq, tq._name, _last_vruntime - tq._vruntime); | |
2627 | } | |
2628 | tq._vruntime = std::max(_last_vruntime, tq._vruntime); | |
f67539c2 TL |
2629 | auto now = std::chrono::steady_clock::now(); |
2630 | tq._waittime += now - tq._ts; | |
2631 | tq._ts = now; | |
11fdf7f2 TL |
2632 | _activating_task_queues.push_back(&tq); |
2633 | } | |
2634 | ||
f67539c2 TL |
2635 | void reactor::service_highres_timer() noexcept { |
2636 | complete_timers(_timers, _expired_timers, [this] () noexcept { | |
11fdf7f2 TL |
2637 | if (!_timers.empty()) { |
2638 | enable_timer(_timers.get_next_timeout()); | |
2639 | } | |
2640 | }); | |
2641 | } | |
2642 | ||
2643 | int reactor::run() { | |
9f95a23c TL |
2644 | #ifndef SEASTAR_ASAN_ENABLED |
2645 | // SIGSTKSZ is too small when using asan. We also don't need to | |
2646 | // handle SIGSEGV ourselves when using asan, so just don't install | |
2647 | // a signal handler stack. | |
11fdf7f2 | 2648 | auto signal_stack = install_signal_handler_stack(); |
9f95a23c TL |
2649 | #else |
2650 | (void)install_signal_handler_stack; | |
2651 | #endif | |
11fdf7f2 TL |
2652 | |
2653 | register_metrics(); | |
2654 | ||
9f95a23c TL |
2655 | // The order in which we execute the pollers is very important for performance. |
2656 | // | |
2657 | // This is because events that are generated in one poller may feed work into others. If | |
2658 | // they were reversed, we'd only be able to do that work in the next task quota. | |
2659 | // | |
2660 | // One example is the relationship between the smp poller and the I/O submission poller: | |
2661 | // If the smp poller runs first, requests from remote I/O queues can be dispatched right away | |
2662 | // | |
2663 | // We will run the pollers in the following order: | |
2664 | // | |
2665 | // 1. SMP: any remote event arrives before anything else | |
2666 | // 2. reap kernel events completion: storage related completions may free up space in the I/O | |
2667 | // queue. | |
2668 | // 4. I/O queue: must be after reap, to free up events. If new slots are freed may submit I/O | |
2669 | // 5. kernel submission: for I/O, will submit what was generated from last step. | |
2670 | // 6. reap kernel events completion: some of the submissions from last step may return immediately. | |
2671 | // For example if we are dealing with poll() on a fd that has events. | |
f67539c2 | 2672 | poller smp_poller(std::make_unique<smp_pollfn>(*this)); |
9f95a23c TL |
2673 | |
2674 | poller reap_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this)); | |
2675 | poller io_queue_submission_poller(std::make_unique<io_queue_submission_pollfn>(*this)); | |
2676 | poller kernel_submit_work_poller(std::make_unique<kernel_submit_work_pollfn>(*this)); | |
2677 | poller final_real_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this)); | |
11fdf7f2 TL |
2678 | |
2679 | poller batch_flush_poller(std::make_unique<batch_flush_pollfn>(*this)); | |
2680 | poller execution_stage_poller(std::make_unique<execution_stage_pollfn>()); | |
2681 | ||
2682 | start_aio_eventfd_loop(); | |
2683 | ||
9f95a23c | 2684 | if (_id == 0 && _cfg.auto_handle_sigint_sigterm) { |
11fdf7f2 TL |
2685 | if (_handle_sigint) { |
2686 | _signals.handle_signal_once(SIGINT, [this] { stop(); }); | |
2687 | } | |
2688 | _signals.handle_signal_once(SIGTERM, [this] { stop(); }); | |
2689 | } | |
2690 | ||
9f95a23c TL |
2691 | // Start initialization in the background. |
2692 | // Communicate when done using _start_promise. | |
2693 | (void)_cpu_started.wait(smp::count).then([this] { | |
2694 | (void)_network_stack->initialize().then([this] { | |
11fdf7f2 TL |
2695 | _start_promise.set_value(); |
2696 | }); | |
2697 | }); | |
9f95a23c TL |
2698 | // Wait for network stack in the background and then signal all cpus. |
2699 | (void)_network_stack_ready->then([this] (std::unique_ptr<network_stack> stack) { | |
11fdf7f2 | 2700 | _network_stack = std::move(stack); |
9f95a23c TL |
2701 | return smp::invoke_on_all([] { |
2702 | engine()._cpu_started.signal(); | |
2703 | }); | |
11fdf7f2 TL |
2704 | }); |
2705 | ||
2706 | poller syscall_poller(std::make_unique<syscall_pollfn>(*this)); | |
2707 | ||
2708 | poller drain_cross_cpu_freelist(std::make_unique<drain_cross_cpu_freelist_pollfn>()); | |
2709 | ||
11fdf7f2 TL |
2710 | poller expire_lowres_timers(std::make_unique<lowres_timer_pollfn>(*this)); |
2711 | poller sig_poller(std::make_unique<signal_pollfn>(*this)); | |
2712 | ||
2713 | using namespace std::chrono_literals; | |
2714 | timer<lowres_clock> load_timer; | |
2715 | auto last_idle = _total_idle; | |
2716 | auto idle_start = sched_clock::now(), idle_end = idle_start; | |
2717 | load_timer.set_callback([this, &last_idle, &idle_start, &idle_end] () mutable { | |
2718 | _total_idle += idle_end - idle_start; | |
2719 | auto load = double((_total_idle - last_idle).count()) / double(std::chrono::duration_cast<sched_clock::duration>(1s).count()); | |
2720 | last_idle = _total_idle; | |
2721 | load = std::min(load, 1.0); | |
2722 | idle_start = idle_end; | |
2723 | _loads.push_front(load); | |
2724 | if (_loads.size() > 5) { | |
2725 | auto drop = _loads.back(); | |
2726 | _loads.pop_back(); | |
2727 | _load -= (drop/5); | |
2728 | } | |
2729 | _load += (load/5); | |
2730 | }); | |
2731 | load_timer.arm_periodic(1s); | |
2732 | ||
2733 | itimerspec its = seastar::posix::to_relative_itimerspec(_task_quota, _task_quota); | |
2734 | _task_quota_timer.timerfd_settime(0, its); | |
2735 | auto& task_quote_itimerspec = its; | |
2736 | ||
2737 | struct sigaction sa_block_notifier = {}; | |
2738 | sa_block_notifier.sa_handler = &reactor::block_notifier; | |
2739 | sa_block_notifier.sa_flags = SA_RESTART; | |
2740 | auto r = sigaction(cpu_stall_detector::signal_number(), &sa_block_notifier, nullptr); | |
2741 | assert(r == 0); | |
2742 | ||
2743 | bool idle = false; | |
2744 | ||
2745 | std::function<bool()> check_for_work = [this] () { | |
9f95a23c | 2746 | return poll_once() || have_more_tasks(); |
11fdf7f2 TL |
2747 | }; |
2748 | std::function<bool()> pure_check_for_work = [this] () { | |
9f95a23c | 2749 | return pure_poll_once() || have_more_tasks(); |
11fdf7f2 TL |
2750 | }; |
2751 | while (true) { | |
2752 | run_some_tasks(); | |
2753 | if (_stopped) { | |
2754 | load_timer.cancel(); | |
2755 | // Final tasks may include sending the last response to cpu 0, so run them | |
2756 | while (have_more_tasks()) { | |
2757 | run_some_tasks(); | |
2758 | } | |
2759 | while (!_at_destroy_tasks->_q.empty()) { | |
2760 | run_tasks(*_at_destroy_tasks); | |
2761 | } | |
9f95a23c | 2762 | _finished_running_tasks = true; |
11fdf7f2 TL |
2763 | smp::arrive_at_event_loop_end(); |
2764 | if (_id == 0) { | |
2765 | smp::join_all(); | |
2766 | } | |
2767 | break; | |
2768 | } | |
2769 | ||
9f95a23c | 2770 | _polls++; |
11fdf7f2 TL |
2771 | |
2772 | if (check_for_work()) { | |
2773 | if (idle) { | |
2774 | _total_idle += idle_end - idle_start; | |
2775 | account_idle(idle_end - idle_start); | |
2776 | idle_start = idle_end; | |
2777 | idle = false; | |
2778 | } | |
2779 | } else { | |
2780 | idle_end = sched_clock::now(); | |
2781 | if (!idle) { | |
2782 | idle_start = idle_end; | |
2783 | idle = true; | |
2784 | } | |
2785 | bool go_to_sleep = true; | |
2786 | try { | |
2787 | // we can't run check_for_work(), because that can run tasks in the context | |
2788 | // of the idle handler which change its state, without the idle handler expecting | |
2789 | // it. So run pure_check_for_work() instead. | |
2790 | auto handler_result = _idle_cpu_handler(pure_check_for_work); | |
2791 | go_to_sleep = handler_result == idle_cpu_handler_result::no_more_work; | |
2792 | } catch (...) { | |
2793 | report_exception("Exception while running idle cpu handler", std::current_exception()); | |
2794 | } | |
2795 | if (go_to_sleep) { | |
2796 | internal::cpu_relax(); | |
2797 | if (idle_end - idle_start > _max_poll_time) { | |
2798 | // Turn off the task quota timer to avoid spurious wakeups | |
2799 | struct itimerspec zero_itimerspec = {}; | |
2800 | _task_quota_timer.timerfd_settime(0, zero_itimerspec); | |
2801 | auto start_sleep = sched_clock::now(); | |
2802 | _cpu_stall_detector->start_sleep(); | |
2803 | sleep(); | |
2804 | _cpu_stall_detector->end_sleep(); | |
2805 | // We may have slept for a while, so freshen idle_end | |
2806 | idle_end = sched_clock::now(); | |
2807 | _total_sleep += idle_end - start_sleep; | |
2808 | _task_quota_timer.timerfd_settime(0, task_quote_itimerspec); | |
2809 | } | |
2810 | } else { | |
2811 | // We previously ran pure_check_for_work(), might not actually have performed | |
2812 | // any work. | |
2813 | check_for_work(); | |
2814 | } | |
2815 | } | |
2816 | } | |
2817 | // To prevent ordering issues from rising, destroy the I/O queue explicitly at this point. | |
2818 | // This is needed because the reactor is destroyed from the thread_local destructors. If | |
2819 | // the I/O queue happens to use any other infrastructure that is also kept this way (for | |
2820 | // instance, collectd), we will not have any way to guarantee who is destroyed first. | |
2821 | my_io_queues.clear(); | |
2822 | return _return; | |
2823 | } | |
2824 | ||
2825 | void | |
2826 | reactor::sleep() { | |
2827 | for (auto i = _pollers.begin(); i != _pollers.end(); ++i) { | |
2828 | auto ok = (*i)->try_enter_interrupt_mode(); | |
2829 | if (!ok) { | |
2830 | while (i != _pollers.begin()) { | |
2831 | (*--i)->exit_interrupt_mode(); | |
2832 | } | |
2833 | return; | |
2834 | } | |
2835 | } | |
9f95a23c TL |
2836 | |
2837 | _backend->wait_and_process_events(&_active_sigmask); | |
2838 | ||
11fdf7f2 TL |
2839 | for (auto i = _pollers.rbegin(); i != _pollers.rend(); ++i) { |
2840 | (*i)->exit_interrupt_mode(); | |
2841 | } | |
2842 | } | |
2843 | ||
11fdf7f2 TL |
2844 | bool |
2845 | reactor::poll_once() { | |
2846 | bool work = false; | |
2847 | for (auto c : _pollers) { | |
2848 | work |= c->poll(); | |
2849 | } | |
2850 | ||
2851 | return work; | |
2852 | } | |
2853 | ||
2854 | bool | |
2855 | reactor::pure_poll_once() { | |
2856 | for (auto c : _pollers) { | |
2857 | if (c->pure_poll()) { | |
2858 | return true; | |
2859 | } | |
2860 | } | |
2861 | return false; | |
2862 | } | |
2863 | ||
f67539c2 TL |
2864 | namespace internal { |
2865 | ||
2866 | class poller::registration_task final : public task { | |
11fdf7f2 TL |
2867 | private: |
2868 | poller* _p; | |
2869 | public: | |
2870 | explicit registration_task(poller* p) : _p(p) {} | |
2871 | virtual void run_and_dispose() noexcept override { | |
2872 | if (_p) { | |
2873 | engine().register_poller(_p->_pollfn.get()); | |
2874 | _p->_registration_task = nullptr; | |
2875 | } | |
2876 | delete this; | |
2877 | } | |
f67539c2 | 2878 | task* waiting_task() noexcept override { return nullptr; } |
11fdf7f2 TL |
2879 | void cancel() { |
2880 | _p = nullptr; | |
2881 | } | |
2882 | void moved(poller* p) { | |
2883 | _p = p; | |
2884 | } | |
2885 | }; | |
2886 | ||
f67539c2 | 2887 | class poller::deregistration_task final : public task { |
11fdf7f2 TL |
2888 | private: |
2889 | std::unique_ptr<pollfn> _p; | |
2890 | public: | |
2891 | explicit deregistration_task(std::unique_ptr<pollfn>&& p) : _p(std::move(p)) {} | |
2892 | virtual void run_and_dispose() noexcept override { | |
2893 | engine().unregister_poller(_p.get()); | |
2894 | delete this; | |
2895 | } | |
f67539c2 | 2896 | task* waiting_task() noexcept override { return nullptr; } |
11fdf7f2 TL |
2897 | }; |
2898 | ||
f67539c2 TL |
2899 | } |
2900 | ||
11fdf7f2 TL |
2901 | void reactor::register_poller(pollfn* p) { |
2902 | _pollers.push_back(p); | |
2903 | } | |
2904 | ||
2905 | void reactor::unregister_poller(pollfn* p) { | |
2906 | _pollers.erase(std::find(_pollers.begin(), _pollers.end(), p)); | |
2907 | } | |
2908 | ||
2909 | void reactor::replace_poller(pollfn* old, pollfn* neww) { | |
2910 | std::replace(_pollers.begin(), _pollers.end(), old, neww); | |
2911 | } | |
2912 | ||
f67539c2 TL |
2913 | namespace internal { |
2914 | ||
2915 | poller::poller(poller&& x) noexcept | |
9f95a23c | 2916 | : _pollfn(std::move(x._pollfn)), _registration_task(std::exchange(x._registration_task, nullptr)) { |
11fdf7f2 TL |
2917 | if (_pollfn && _registration_task) { |
2918 | _registration_task->moved(this); | |
2919 | } | |
2920 | } | |
2921 | ||
f67539c2 TL |
2922 | poller& |
2923 | poller::operator=(poller&& x) noexcept { | |
11fdf7f2 TL |
2924 | if (this != &x) { |
2925 | this->~poller(); | |
2926 | new (this) poller(std::move(x)); | |
2927 | } | |
2928 | return *this; | |
2929 | } | |
2930 | ||
2931 | void | |
f67539c2 | 2932 | poller::do_register() noexcept { |
11fdf7f2 TL |
2933 | // We can't just insert a poller into reactor::_pollers, because we |
2934 | // may be running inside a poller ourselves, and so in the middle of | |
2935 | // iterating reactor::_pollers itself. So we schedule a task to add | |
2936 | // the poller instead. | |
9f95a23c TL |
2937 | auto task = new registration_task(this); |
2938 | engine().add_task(task); | |
2939 | _registration_task = task; | |
11fdf7f2 TL |
2940 | } |
2941 | ||
f67539c2 | 2942 | poller::~poller() { |
11fdf7f2 TL |
2943 | // We can't just remove the poller from reactor::_pollers, because we |
2944 | // may be running inside a poller ourselves, and so in the middle of | |
2945 | // iterating reactor::_pollers itself. So we schedule a task to remove | |
2946 | // the poller instead. | |
2947 | // | |
2948 | // Since we don't want to call the poller after we exit the destructor, | |
2949 | // we replace it atomically with another one, and schedule a task to | |
2950 | // delete the replacement. | |
2951 | if (_pollfn) { | |
2952 | if (_registration_task) { | |
2953 | // not added yet, so don't do it at all. | |
2954 | _registration_task->cancel(); | |
9f95a23c TL |
2955 | delete _registration_task; |
2956 | } else if (!engine()._finished_running_tasks) { | |
2957 | // If _finished_running_tasks, the call to add_task() below will just | |
2958 | // leak it, since no one will call task::run_and_dispose(). Just leave | |
2959 | // the poller there, the reactor will never use it. | |
11fdf7f2 TL |
2960 | auto dummy = make_pollfn([] { return false; }); |
2961 | auto dummy_p = dummy.get(); | |
9f95a23c TL |
2962 | auto task = new deregistration_task(std::move(dummy)); |
2963 | engine().add_task(task); | |
11fdf7f2 TL |
2964 | engine().replace_poller(_pollfn.get(), dummy_p); |
2965 | } | |
2966 | } | |
2967 | } | |
2968 | ||
f67539c2 TL |
2969 | } |
2970 | ||
11fdf7f2 TL |
2971 | syscall_work_queue::syscall_work_queue() |
2972 | : _pending() | |
2973 | , _completed() | |
2974 | , _start_eventfd(0) { | |
2975 | } | |
2976 | ||
2977 | void syscall_work_queue::submit_item(std::unique_ptr<syscall_work_queue::work_item> item) { | |
f67539c2 TL |
2978 | (void)_queue_has_room.wait().then_wrapped([this, item = std::move(item)] (future<> f) mutable { |
2979 | // propagate wait failure via work_item | |
2980 | if (f.failed()) { | |
2981 | item->set_exception(f.get_exception()); | |
2982 | return; | |
2983 | } | |
11fdf7f2 TL |
2984 | _pending.push(item.release()); |
2985 | _start_eventfd.signal(1); | |
2986 | }); | |
2987 | } | |
2988 | ||
2989 | unsigned syscall_work_queue::complete() { | |
2990 | std::array<work_item*, queue_length> tmp_buf; | |
2991 | auto end = tmp_buf.data(); | |
2992 | auto nr = _completed.consume_all([&] (work_item* wi) { | |
2993 | *end++ = wi; | |
2994 | }); | |
2995 | for (auto p = tmp_buf.data(); p != end; ++p) { | |
2996 | auto wi = *p; | |
2997 | wi->complete(); | |
2998 | delete wi; | |
2999 | } | |
3000 | _queue_has_room.signal(nr); | |
3001 | return nr; | |
3002 | } | |
3003 | ||
9f95a23c | 3004 | |
11fdf7f2 TL |
3005 | smp_message_queue::smp_message_queue(reactor* from, reactor* to) |
3006 | : _pending(to) | |
3007 | , _completed(from) | |
3008 | { | |
3009 | } | |
3010 | ||
3011 | smp_message_queue::~smp_message_queue() | |
3012 | { | |
3013 | if (_pending.remote != _completed.remote) { | |
3014 | _tx.a.~aa(); | |
3015 | } | |
3016 | } | |
3017 | ||
3018 | void smp_message_queue::stop() { | |
3019 | _metrics.clear(); | |
3020 | } | |
3021 | ||
3022 | void smp_message_queue::move_pending() { | |
3023 | auto begin = _tx.a.pending_fifo.cbegin(); | |
3024 | auto end = _tx.a.pending_fifo.cend(); | |
3025 | end = _pending.push(begin, end); | |
3026 | if (begin == end) { | |
3027 | return; | |
3028 | } | |
3029 | auto nr = end - begin; | |
3030 | _pending.maybe_wakeup(); | |
3031 | _tx.a.pending_fifo.erase(begin, end); | |
3032 | _current_queue_length += nr; | |
3033 | _last_snt_batch = nr; | |
3034 | _sent += nr; | |
3035 | } | |
3036 | ||
3037 | bool smp_message_queue::pure_poll_tx() const { | |
3038 | // can't use read_available(), not available on older boost | |
3039 | // empty() is not const, so need const_cast. | |
3040 | return !const_cast<lf_queue&>(_completed).empty(); | |
3041 | } | |
3042 | ||
9f95a23c TL |
3043 | void smp_message_queue::submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<smp_message_queue::work_item> item) { |
3044 | // matching signal() in process_completions() | |
3045 | auto ssg_id = internal::smp_service_group_id(item->ssg); | |
3046 | auto& sem = get_smp_service_groups_semaphore(ssg_id, t); | |
3047 | // Future indirectly forwarded to `item`. | |
3048 | (void)get_units(sem, 1, timeout).then_wrapped([this, item = std::move(item)] (future<smp_service_group_semaphore_units> units_fut) mutable { | |
3049 | if (units_fut.failed()) { | |
3050 | item->fail_with(units_fut.get_exception()); | |
3051 | ++_compl; | |
3052 | ++_last_cmpl_batch; | |
3053 | return; | |
3054 | } | |
11fdf7f2 | 3055 | _tx.a.pending_fifo.push_back(item.get()); |
9f95a23c | 3056 | // no exceptions from this point |
11fdf7f2 | 3057 | item.release(); |
9f95a23c | 3058 | units_fut.get0().release(); |
11fdf7f2 TL |
3059 | if (_tx.a.pending_fifo.size() >= batch_size) { |
3060 | move_pending(); | |
3061 | } | |
9f95a23c | 3062 | }); |
11fdf7f2 TL |
3063 | } |
3064 | ||
3065 | void smp_message_queue::respond(work_item* item) { | |
3066 | _completed_fifo.push_back(item); | |
3067 | if (_completed_fifo.size() >= batch_size || engine()._stopped) { | |
3068 | flush_response_batch(); | |
3069 | } | |
3070 | } | |
3071 | ||
3072 | void smp_message_queue::flush_response_batch() { | |
3073 | if (!_completed_fifo.empty()) { | |
3074 | auto begin = _completed_fifo.cbegin(); | |
3075 | auto end = _completed_fifo.cend(); | |
3076 | end = _completed.push(begin, end); | |
3077 | if (begin == end) { | |
3078 | return; | |
3079 | } | |
3080 | _completed.maybe_wakeup(); | |
3081 | _completed_fifo.erase(begin, end); | |
3082 | } | |
3083 | } | |
3084 | ||
3085 | bool smp_message_queue::has_unflushed_responses() const { | |
3086 | return !_completed_fifo.empty(); | |
3087 | } | |
3088 | ||
3089 | bool smp_message_queue::pure_poll_rx() const { | |
3090 | // can't use read_available(), not available on older boost | |
3091 | // empty() is not const, so need const_cast. | |
3092 | return !const_cast<lf_queue&>(_pending).empty(); | |
3093 | } | |
3094 | ||
3095 | void | |
3096 | smp_message_queue::lf_queue::maybe_wakeup() { | |
3097 | // Called after lf_queue_base::push(). | |
3098 | // | |
3099 | // This is read-after-write, which wants memory_order_seq_cst, | |
3100 | // but we insert that barrier using systemwide_memory_barrier() | |
3101 | // because seq_cst is so expensive. | |
3102 | // | |
3103 | // However, we do need a compiler barrier: | |
3104 | std::atomic_signal_fence(std::memory_order_seq_cst); | |
3105 | if (remote->_sleeping.load(std::memory_order_relaxed)) { | |
3106 | // We are free to clear it, because we're sending a signal now | |
3107 | remote->_sleeping.store(false, std::memory_order_relaxed); | |
3108 | remote->wakeup(); | |
3109 | } | |
3110 | } | |
3111 | ||
9f95a23c TL |
3112 | smp_message_queue::lf_queue::~lf_queue() { |
3113 | consume_all([] (work_item* ptr) { | |
3114 | delete ptr; | |
3115 | }); | |
3116 | } | |
3117 | ||
3118 | ||
11fdf7f2 TL |
3119 | template<size_t PrefetchCnt, typename Func> |
3120 | size_t smp_message_queue::process_queue(lf_queue& q, Func process) { | |
3121 | // copy batch to local memory in order to minimize | |
3122 | // time in which cross-cpu data is accessed | |
3123 | work_item* items[queue_length + PrefetchCnt]; | |
3124 | work_item* wi; | |
3125 | if (!q.pop(wi)) | |
3126 | return 0; | |
3127 | // start prefetching first item before popping the rest to overlap memory | |
3128 | // access with potential cache miss the second pop may cause | |
3129 | prefetch<2>(wi); | |
3130 | auto nr = q.pop(items); | |
3131 | std::fill(std::begin(items) + nr, std::begin(items) + nr + PrefetchCnt, nr ? items[nr - 1] : wi); | |
3132 | unsigned i = 0; | |
3133 | do { | |
3134 | prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + PrefetchCnt); | |
3135 | process(wi); | |
3136 | wi = items[i++]; | |
3137 | } while(i <= nr); | |
3138 | ||
3139 | return nr + 1; | |
3140 | } | |
3141 | ||
9f95a23c TL |
3142 | size_t smp_message_queue::process_completions(shard_id t) { |
3143 | auto nr = process_queue<prefetch_cnt*2>(_completed, [t] (work_item* wi) { | |
11fdf7f2 | 3144 | wi->complete(); |
9f95a23c TL |
3145 | auto ssg_id = smp_service_group_id(wi->ssg); |
3146 | get_smp_service_groups_semaphore(ssg_id, t).signal(); | |
11fdf7f2 TL |
3147 | delete wi; |
3148 | }); | |
3149 | _current_queue_length -= nr; | |
3150 | _compl += nr; | |
3151 | _last_cmpl_batch = nr; | |
3152 | ||
3153 | return nr; | |
3154 | } | |
3155 | ||
3156 | void smp_message_queue::flush_request_batch() { | |
3157 | if (!_tx.a.pending_fifo.empty()) { | |
3158 | move_pending(); | |
3159 | } | |
3160 | } | |
3161 | ||
3162 | size_t smp_message_queue::process_incoming() { | |
3163 | auto nr = process_queue<prefetch_cnt>(_pending, [] (work_item* wi) { | |
3164 | wi->process(); | |
3165 | }); | |
3166 | _received += nr; | |
3167 | _last_rcv_batch = nr; | |
3168 | return nr; | |
3169 | } | |
3170 | ||
3171 | void smp_message_queue::start(unsigned cpuid) { | |
3172 | _tx.init(); | |
3173 | namespace sm = seastar::metrics; | |
3174 | char instance[10]; | |
f67539c2 | 3175 | std::snprintf(instance, sizeof(instance), "%u-%u", this_shard_id(), cpuid); |
11fdf7f2 TL |
3176 | _metrics.add_group("smp", { |
3177 | // queue_length value:GAUGE:0:U | |
3178 | // Absolute value of num packets in last tx batch. | |
3179 | sm::make_queue_length("send_batch_queue_length", _last_snt_batch, sm::description("Current send batch queue length"), {sm::shard_label(instance)})(sm::metric_disabled), | |
3180 | sm::make_queue_length("receive_batch_queue_length", _last_rcv_batch, sm::description("Current receive batch queue length"), {sm::shard_label(instance)})(sm::metric_disabled), | |
3181 | sm::make_queue_length("complete_batch_queue_length", _last_cmpl_batch, sm::description("Current complete batch queue length"), {sm::shard_label(instance)})(sm::metric_disabled), | |
3182 | sm::make_queue_length("send_queue_length", _current_queue_length, sm::description("Current send queue length"), {sm::shard_label(instance)})(sm::metric_disabled), | |
3183 | // total_operations value:DERIVE:0:U | |
3184 | sm::make_derive("total_received_messages", _received, sm::description("Total number of received messages"), {sm::shard_label(instance)})(sm::metric_disabled), | |
3185 | // total_operations value:DERIVE:0:U | |
3186 | sm::make_derive("total_sent_messages", _sent, sm::description("Total number of sent messages"), {sm::shard_label(instance)})(sm::metric_disabled), | |
3187 | // total_operations value:DERIVE:0:U | |
3188 | sm::make_derive("total_completed_messages", _compl, sm::description("Total number of messages completed"), {sm::shard_label(instance)})(sm::metric_disabled) | |
3189 | }); | |
3190 | } | |
3191 | ||
11fdf7f2 TL |
3192 | readable_eventfd writeable_eventfd::read_side() { |
3193 | return readable_eventfd(_fd.dup()); | |
3194 | } | |
3195 | ||
3196 | file_desc writeable_eventfd::try_create_eventfd(size_t initial) { | |
3197 | assert(size_t(int(initial)) == initial); | |
3198 | return file_desc::eventfd(initial, EFD_CLOEXEC); | |
3199 | } | |
3200 | ||
3201 | void writeable_eventfd::signal(size_t count) { | |
3202 | uint64_t c = count; | |
3203 | auto r = _fd.write(&c, sizeof(c)); | |
3204 | assert(r == sizeof(c)); | |
3205 | } | |
3206 | ||
3207 | writeable_eventfd readable_eventfd::write_side() { | |
3208 | return writeable_eventfd(_fd.get_file_desc().dup()); | |
3209 | } | |
3210 | ||
3211 | file_desc readable_eventfd::try_create_eventfd(size_t initial) { | |
3212 | assert(size_t(int(initial)) == initial); | |
3213 | return file_desc::eventfd(initial, EFD_CLOEXEC | EFD_NONBLOCK); | |
3214 | } | |
3215 | ||
3216 | future<size_t> readable_eventfd::wait() { | |
3217 | return engine().readable(*_fd._s).then([this] { | |
3218 | uint64_t count; | |
3219 | int r = ::read(_fd.get_fd(), &count, sizeof(count)); | |
3220 | assert(r == sizeof(count)); | |
3221 | return make_ready_future<size_t>(count); | |
3222 | }); | |
3223 | } | |
3224 | ||
9f95a23c TL |
3225 | void schedule(task* t) noexcept { |
3226 | engine().add_task(t); | |
11fdf7f2 TL |
3227 | } |
3228 | ||
9f95a23c TL |
3229 | void schedule_urgent(task* t) noexcept { |
3230 | engine().add_urgent_task(t); | |
11fdf7f2 TL |
3231 | } |
3232 | ||
3233 | } | |
3234 | ||
3235 | bool operator==(const ::sockaddr_in a, const ::sockaddr_in b) { | |
3236 | return (a.sin_addr.s_addr == b.sin_addr.s_addr) && (a.sin_port == b.sin_port); | |
3237 | } | |
3238 | ||
3239 | namespace seastar { | |
3240 | ||
3241 | void network_stack_registry::register_stack(sstring name, | |
3242 | boost::program_options::options_description opts, | |
9f95a23c TL |
3243 | noncopyable_function<future<std::unique_ptr<network_stack>> (options opts)> create, bool make_default) { |
3244 | if (_map().count(name)) { | |
3245 | return; | |
3246 | } | |
11fdf7f2 TL |
3247 | _map()[name] = std::move(create); |
3248 | options_description().add(opts); | |
3249 | if (make_default) { | |
3250 | _default() = name; | |
3251 | } | |
3252 | } | |
3253 | ||
3254 | void register_network_stack(sstring name, boost::program_options::options_description opts, | |
9f95a23c | 3255 | noncopyable_function<future<std::unique_ptr<network_stack>>(boost::program_options::variables_map)> |
11fdf7f2 TL |
3256 | create, |
3257 | bool make_default) { | |
3258 | return network_stack_registry::register_stack( | |
3259 | std::move(name), std::move(opts), std::move(create), make_default); | |
3260 | } | |
3261 | ||
3262 | sstring network_stack_registry::default_stack() { | |
3263 | return _default(); | |
3264 | } | |
3265 | ||
3266 | std::vector<sstring> network_stack_registry::list() { | |
3267 | std::vector<sstring> ret; | |
3268 | for (auto&& ns : _map()) { | |
3269 | ret.push_back(ns.first); | |
3270 | } | |
3271 | return ret; | |
3272 | } | |
3273 | ||
3274 | future<std::unique_ptr<network_stack>> | |
3275 | network_stack_registry::create(options opts) { | |
3276 | return create(_default(), opts); | |
3277 | } | |
3278 | ||
3279 | future<std::unique_ptr<network_stack>> | |
3280 | network_stack_registry::create(sstring name, options opts) { | |
3281 | if (!_map().count(name)) { | |
3282 | throw std::runtime_error(format("network stack {} not registered", name)); | |
3283 | } | |
3284 | return _map()[name](opts); | |
3285 | } | |
3286 | ||
9f95a23c TL |
3287 | static bool kernel_supports_aio_fsync() { |
3288 | return kernel_uname().whitelisted({"4.18"}); | |
3289 | } | |
3290 | ||
11fdf7f2 | 3291 | boost::program_options::options_description |
9f95a23c | 3292 | reactor::get_options_description(reactor_config cfg) { |
11fdf7f2 TL |
3293 | namespace bpo = boost::program_options; |
3294 | bpo::options_description opts("Core options"); | |
3295 | auto net_stack_names = network_stack_registry::list(); | |
3296 | opts.add_options() | |
3297 | ("network-stack", bpo::value<std::string>(), | |
3298 | format("select network stack (valid values: {})", | |
3299 | format_separated(net_stack_names.begin(), net_stack_names.end(), ", ")).c_str()) | |
11fdf7f2 TL |
3300 | ("poll-mode", "poll continuously (100% cpu use)") |
3301 | ("idle-poll-time-us", bpo::value<unsigned>()->default_value(calculate_poll_time() / 1us), | |
3302 | "idle polling time in microseconds (reduce for overprovisioned environments or laptops)") | |
3303 | ("poll-aio", bpo::value<bool>()->default_value(true), | |
3304 | "busy-poll for disk I/O (reduces latency and increases throughput)") | |
9f95a23c | 3305 | ("task-quota-ms", bpo::value<double>()->default_value(cfg.task_quota / 1ms), "Max time (ms) between polls") |
11fdf7f2 | 3306 | ("max-task-backlog", bpo::value<unsigned>()->default_value(1000), "Maximum number of task backlog to allow; above this we ignore I/O") |
9f95a23c | 3307 | ("blocked-reactor-notify-ms", bpo::value<unsigned>()->default_value(200), "threshold in miliseconds over which the reactor is considered blocked if no progress is made") |
11fdf7f2 TL |
3308 | ("blocked-reactor-reports-per-minute", bpo::value<unsigned>()->default_value(5), "Maximum number of backtraces reported by stall detector per minute") |
3309 | ("relaxed-dma", "allow using buffered I/O if DMA is not available (reduces performance)") | |
9f95a23c TL |
3310 | ("linux-aio-nowait", |
3311 | bpo::value<bool>()->default_value(aio_nowait_supported), | |
3312 | "use the Linux NOWAIT AIO feature, which reduces reactor stalls due to aio (autodetected)") | |
11fdf7f2 TL |
3313 | ("unsafe-bypass-fsync", bpo::value<bool>()->default_value(false), "Bypass fsync(), may result in data loss. Use for testing on consumer drives") |
3314 | ("overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0") | |
3315 | ("abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory") | |
3316 | ("force-aio-syscalls", bpo::value<bool>()->default_value(false), | |
3317 | "Force io_getevents(2) to issue a system call, instead of bypassing the kernel when possible." | |
3318 | " This makes strace output more useful, but slows down the application") | |
f67539c2 TL |
3319 | ("dump-memory-diagnostics-on-alloc-failure-kind", bpo::value<std::string>()->default_value("critical"), "Dump diagnostics of the seastar allocator state on allocation failure." |
3320 | " Accepted values: never, critical (default), always. When set to critical, only allocations marked as critical will trigger diagnostics dump." | |
3321 | " The diagnostics will be written to the seastar_memory logger, with error level." | |
3322 | " Note that if the seastar_memory logger is set to debug or trace level, the diagnostics will be logged irrespective of this setting.") | |
11fdf7f2 TL |
3323 | ("reactor-backend", bpo::value<reactor_backend_selector>()->default_value(reactor_backend_selector::default_backend()), |
3324 | format("Internal reactor implementation ({})", reactor_backend_selector::available()).c_str()) | |
9f95a23c TL |
3325 | ("aio-fsync", bpo::value<bool>()->default_value(kernel_supports_aio_fsync()), |
3326 | "Use Linux aio for fsync() calls. This reduces latency; requires Linux 4.18 or later.") | |
11fdf7f2 TL |
3327 | #ifdef SEASTAR_HEAPPROF |
3328 | ("heapprof", "enable seastar heap profiling") | |
3329 | #endif | |
3330 | ; | |
9f95a23c TL |
3331 | if (cfg.auto_handle_sigint_sigterm) { |
3332 | opts.add_options() | |
3333 | ("no-handle-interrupt", "ignore SIGINT (for gdb)") | |
3334 | ; | |
3335 | } | |
11fdf7f2 TL |
3336 | opts.add(network_stack_registry::options_description()); |
3337 | return opts; | |
3338 | } | |
3339 | ||
3340 | boost::program_options::options_description | |
3341 | smp::get_options_description() | |
3342 | { | |
3343 | namespace bpo = boost::program_options; | |
3344 | bpo::options_description opts("SMP options"); | |
3345 | opts.add_options() | |
3346 | ("smp,c", bpo::value<unsigned>(), "number of threads (default: one per CPU)") | |
3347 | ("cpuset", bpo::value<cpuset_bpo_wrapper>(), "CPUs to use (in cpuset(7) format; default: all))") | |
3348 | ("memory,m", bpo::value<std::string>(), "memory to use, in bytes (ex: 4G) (default: all)") | |
3349 | ("reserve-memory", bpo::value<std::string>(), "memory reserved to OS (if --memory not specified)") | |
3350 | ("hugepages", bpo::value<std::string>(), "path to accessible hugetlbfs mount (typically /dev/hugepages/something)") | |
3351 | ("lock-memory", bpo::value<bool>(), "lock all memory (prevents swapping)") | |
3352 | ("thread-affinity", bpo::value<bool>()->default_value(true), "pin threads to their cpus (disable for overprovisioning)") | |
3353 | #ifdef SEASTAR_HAVE_HWLOC | |
3354 | ("num-io-queues", bpo::value<unsigned>(), "Number of IO queues. Each IO unit will be responsible for a fraction of the IO requests. Defaults to the number of threads") | |
3355 | ("max-io-requests", bpo::value<unsigned>(), "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of IO queues") | |
3356 | #else | |
3357 | ("max-io-requests", bpo::value<unsigned>(), "Maximum amount of concurrent requests to be sent to the disk. Defaults to 128 times the number of processors") | |
3358 | #endif | |
3359 | ("io-properties-file", bpo::value<std::string>(), "path to a YAML file describing the characteristics of the I/O Subsystem") | |
3360 | ("io-properties", bpo::value<std::string>(), "a YAML string describing the characteristics of the I/O Subsystem") | |
3361 | ("mbind", bpo::value<bool>()->default_value(true), "enable mbind") | |
3362 | #ifndef SEASTAR_NO_EXCEPTION_HACK | |
3363 | ("enable-glibc-exception-scaling-workaround", bpo::value<bool>()->default_value(true), "enable workaround for glibc/gcc c++ exception scalablity problem") | |
f67539c2 TL |
3364 | #endif |
3365 | #ifdef SEASTAR_HAVE_HWLOC | |
3366 | ("allow-cpus-in-remote-numa-nodes", bpo::value<bool>()->default_value(true), "if some CPUs are found not to have any local NUMA nodes, allow assigning them to remote ones") | |
11fdf7f2 TL |
3367 | #endif |
3368 | ; | |
3369 | return opts; | |
3370 | } | |
3371 | ||
3372 | thread_local scollectd::impl scollectd_impl; | |
3373 | ||
3374 | scollectd::impl & scollectd::get_impl() { | |
3375 | return scollectd_impl; | |
3376 | } | |
3377 | ||
3378 | struct reactor_deleter { | |
3379 | void operator()(reactor* p) { | |
3380 | p->~reactor(); | |
3381 | free(p); | |
3382 | } | |
3383 | }; | |
3384 | ||
3385 | thread_local std::unique_ptr<reactor, reactor_deleter> reactor_holder; | |
3386 | ||
3387 | std::vector<posix_thread> smp::_threads; | |
3388 | std::vector<std::function<void ()>> smp::_thread_loops; | |
f67539c2 | 3389 | std::optional<boost::barrier> smp::_all_event_loops_done; |
11fdf7f2 TL |
3390 | std::vector<reactor*> smp::_reactors; |
3391 | std::unique_ptr<smp_message_queue*[], smp::qs_deleter> smp::_qs; | |
3392 | std::thread::id smp::_tmain; | |
3393 | unsigned smp::count = 1; | |
3394 | bool smp::_using_dpdk; | |
3395 | ||
3396 | void smp::start_all_queues() | |
3397 | { | |
3398 | for (unsigned c = 0; c < count; c++) { | |
f67539c2 TL |
3399 | if (c != this_shard_id()) { |
3400 | _qs[c][this_shard_id()].start(c); | |
11fdf7f2 TL |
3401 | } |
3402 | } | |
f67539c2 | 3403 | alien::smp::_qs[this_shard_id()].start(); |
11fdf7f2 TL |
3404 | } |
3405 | ||
3406 | #ifdef SEASTAR_HAVE_DPDK | |
3407 | ||
3408 | int dpdk_thread_adaptor(void* f) | |
3409 | { | |
3410 | (*static_cast<std::function<void ()>*>(f))(); | |
3411 | return 0; | |
3412 | } | |
3413 | ||
3414 | #endif | |
3415 | ||
3416 | void smp::join_all() | |
3417 | { | |
3418 | #ifdef SEASTAR_HAVE_DPDK | |
3419 | if (_using_dpdk) { | |
3420 | rte_eal_mp_wait_lcore(); | |
3421 | return; | |
3422 | } | |
3423 | #endif | |
3424 | for (auto&& t: smp::_threads) { | |
3425 | t.join(); | |
3426 | } | |
3427 | } | |
3428 | ||
3429 | void smp::pin(unsigned cpu_id) { | |
3430 | if (_using_dpdk) { | |
3431 | // dpdk does its own pinning | |
3432 | return; | |
3433 | } | |
3434 | pin_this_thread(cpu_id); | |
3435 | } | |
3436 | ||
3437 | void smp::arrive_at_event_loop_end() { | |
3438 | if (_all_event_loops_done) { | |
3439 | _all_event_loops_done->wait(); | |
3440 | } | |
3441 | } | |
3442 | ||
9f95a23c | 3443 | void smp::allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg) { |
11fdf7f2 TL |
3444 | assert(!reactor_holder); |
3445 | ||
3446 | // we cannot just write "local_engin = new reactor" since reactor's constructor | |
3447 | // uses local_engine | |
3448 | void *buf; | |
3449 | int r = posix_memalign(&buf, cache_line_size, sizeof(reactor)); | |
3450 | assert(r == 0); | |
3451 | local_engine = reinterpret_cast<reactor*>(buf); | |
f67539c2 | 3452 | *internal::this_shard_id_ptr() = id; |
9f95a23c | 3453 | new (buf) reactor(id, std::move(rbs), cfg); |
11fdf7f2 TL |
3454 | reactor_holder.reset(local_engine); |
3455 | } | |
3456 | ||
3457 | void smp::cleanup() { | |
3458 | smp::_threads = std::vector<posix_thread>(); | |
3459 | _thread_loops.clear(); | |
3460 | } | |
3461 | ||
3462 | void smp::cleanup_cpu() { | |
f67539c2 | 3463 | size_t cpuid = this_shard_id(); |
11fdf7f2 TL |
3464 | |
3465 | if (_qs) { | |
3466 | for(unsigned i = 0; i < smp::count; i++) { | |
3467 | _qs[i][cpuid].stop(); | |
3468 | } | |
3469 | } | |
3470 | if (alien::smp::_qs) { | |
3471 | alien::smp::_qs[cpuid].stop(); | |
3472 | } | |
3473 | } | |
3474 | ||
3475 | void smp::create_thread(std::function<void ()> thread_loop) { | |
3476 | if (_using_dpdk) { | |
3477 | _thread_loops.push_back(std::move(thread_loop)); | |
3478 | } else { | |
3479 | _threads.emplace_back(std::move(thread_loop)); | |
3480 | } | |
3481 | } | |
3482 | ||
3483 | // Installs handler for Signal which ensures that Func is invoked only once | |
3484 | // in the whole program and that after it is invoked the default handler is restored. | |
3485 | template<int Signal, void(*Func)()> | |
3486 | void install_oneshot_signal_handler() { | |
3487 | static bool handled = false; | |
3488 | static util::spinlock lock; | |
3489 | ||
3490 | struct sigaction sa; | |
3491 | sa.sa_sigaction = [](int sig, siginfo_t *info, void *p) { | |
3492 | std::lock_guard<util::spinlock> g(lock); | |
3493 | if (!handled) { | |
3494 | handled = true; | |
3495 | Func(); | |
3496 | signal(sig, SIG_DFL); | |
3497 | } | |
3498 | }; | |
3499 | sigfillset(&sa.sa_mask); | |
3500 | sa.sa_flags = SA_SIGINFO | SA_RESTART; | |
3501 | if (Signal == SIGSEGV) { | |
3502 | sa.sa_flags |= SA_ONSTACK; | |
3503 | } | |
3504 | auto r = ::sigaction(Signal, &sa, nullptr); | |
3505 | throw_system_error_on(r == -1); | |
3506 | } | |
3507 | ||
3508 | static void sigsegv_action() noexcept { | |
3509 | print_with_backtrace("Segmentation fault"); | |
3510 | } | |
3511 | ||
3512 | static void sigabrt_action() noexcept { | |
3513 | print_with_backtrace("Aborting"); | |
3514 | } | |
3515 | ||
3516 | void smp::qs_deleter::operator()(smp_message_queue** qs) const { | |
3517 | for (unsigned i = 0; i < smp::count; i++) { | |
3518 | for (unsigned j = 0; j < smp::count; j++) { | |
3519 | qs[i][j].~smp_message_queue(); | |
3520 | } | |
3521 | ::operator delete[](qs[i]); | |
3522 | } | |
3523 | delete[](qs); | |
3524 | } | |
3525 | ||
3526 | class disk_config_params { | |
3527 | private: | |
3528 | unsigned _num_io_queues = 0; | |
f67539c2 | 3529 | std::optional<unsigned> _capacity; |
11fdf7f2 TL |
3530 | std::unordered_map<dev_t, mountpoint_params> _mountpoints; |
3531 | std::chrono::duration<double> _latency_goal; | |
3532 | ||
3533 | public: | |
3534 | uint64_t per_io_queue(uint64_t qty, dev_t devid) const { | |
3535 | const mountpoint_params& p = _mountpoints.at(devid); | |
3536 | return std::max(qty / p.num_io_queues, 1ul); | |
3537 | } | |
3538 | ||
3539 | unsigned num_io_queues(dev_t devid) const { | |
3540 | const mountpoint_params& p = _mountpoints.at(devid); | |
3541 | return p.num_io_queues; | |
3542 | } | |
3543 | ||
3544 | std::chrono::duration<double> latency_goal() const { | |
3545 | return _latency_goal; | |
3546 | } | |
3547 | ||
3548 | void parse_config(boost::program_options::variables_map& configuration) { | |
3549 | seastar_logger.debug("smp::count: {}", smp::count); | |
3550 | _latency_goal = std::chrono::duration_cast<std::chrono::duration<double>>(configuration["task-quota-ms"].as<double>() * 1.5 * 1ms); | |
3551 | seastar_logger.debug("latency_goal: {}", latency_goal().count()); | |
3552 | ||
3553 | if (configuration.count("max-io-requests")) { | |
3554 | _capacity = configuration["max-io-requests"].as<unsigned>(); | |
3555 | } | |
3556 | ||
3557 | if (configuration.count("num-io-queues")) { | |
3558 | _num_io_queues = configuration["num-io-queues"].as<unsigned>(); | |
3559 | if (!_num_io_queues) { | |
3560 | throw std::runtime_error("num-io-queues must be greater than zero"); | |
3561 | } | |
3562 | } | |
3563 | if (configuration.count("io-properties-file") && configuration.count("io-properties")) { | |
3564 | throw std::runtime_error("Both io-properties and io-properties-file specified. Don't know which to trust!"); | |
3565 | } | |
3566 | ||
f67539c2 | 3567 | std::optional<YAML::Node> doc; |
11fdf7f2 TL |
3568 | if (configuration.count("io-properties-file")) { |
3569 | doc = YAML::LoadFile(configuration["io-properties-file"].as<std::string>()); | |
3570 | } else if (configuration.count("io-properties")) { | |
3571 | doc = YAML::Load(configuration["io-properties"].as<std::string>()); | |
3572 | } | |
3573 | ||
3574 | if (doc) { | |
3575 | static constexpr unsigned task_quotas_in_default_latency_goal = 3; | |
3576 | unsigned auto_num_io_queues = smp::count; | |
3577 | ||
3578 | for (auto&& section : *doc) { | |
3579 | auto sec_name = section.first.as<std::string>(); | |
3580 | if (sec_name != "disks") { | |
3581 | throw std::runtime_error(fmt::format("While parsing I/O options: section {} currently unsupported.", sec_name)); | |
3582 | } | |
3583 | auto disks = section.second.as<std::vector<mountpoint_params>>(); | |
3584 | for (auto& d : disks) { | |
3585 | struct ::stat buf; | |
3586 | auto ret = stat(d.mountpoint.c_str(), &buf); | |
3587 | if (ret < 0) { | |
3588 | throw std::runtime_error(fmt::format("Couldn't stat {}", d.mountpoint)); | |
3589 | } | |
3590 | if (_mountpoints.count(buf.st_dev)) { | |
3591 | throw std::runtime_error(fmt::format("Mountpoint {} already configured", d.mountpoint)); | |
3592 | } | |
3593 | if (_mountpoints.size() >= reactor::max_queues) { | |
3594 | throw std::runtime_error(fmt::format("Configured number of queues {} is larger than the maximum {}", | |
3595 | _mountpoints.size(), reactor::max_queues)); | |
3596 | } | |
9f95a23c TL |
3597 | if (d.read_bytes_rate == 0 || d.write_bytes_rate == 0 || |
3598 | d.read_req_rate == 0 || d.write_req_rate == 0) { | |
3599 | throw std::runtime_error(fmt::format("R/W bytes and req rates must not be zero")); | |
3600 | } | |
11fdf7f2 TL |
3601 | |
3602 | // Ideally we wouldn't have I/O Queues and would dispatch from every shard (https://github.com/scylladb/seastar/issues/485) | |
3603 | // While we don't do that, we'll just be conservative and try to recommend values of I/O Queues that are close to what we | |
3604 | // suggested before the I/O Scheduler rework. The I/O Scheduler has traditionally tried to make sure that each queue would have | |
3605 | // at least 4 requests in depth, and all its requests were 4kB in size. Therefore, try to arrange the I/O Queues so that we would | |
3606 | // end up in the same situation here (that's where the 4 comes from). | |
3607 | // | |
3608 | // For the bandwidth limit, we want that to be 4 * 4096, so each I/O Queue has the same bandwidth as before. | |
3609 | if (!_num_io_queues) { | |
3610 | unsigned dev_io_queues = smp::count; | |
3611 | dev_io_queues = std::min(dev_io_queues, unsigned((task_quotas_in_default_latency_goal * d.write_req_rate * latency_goal().count()) / 4)); | |
3612 | dev_io_queues = std::min(dev_io_queues, unsigned((task_quotas_in_default_latency_goal * d.write_bytes_rate * latency_goal().count()) / (4 * 4096))); | |
3613 | dev_io_queues = std::max(dev_io_queues, 1u); | |
3614 | seastar_logger.debug("dev_io_queues: {}", dev_io_queues); | |
3615 | d.num_io_queues = dev_io_queues; | |
3616 | auto_num_io_queues = std::min(auto_num_io_queues, dev_io_queues); | |
3617 | } else { | |
3618 | d.num_io_queues = _num_io_queues; | |
3619 | } | |
3620 | ||
3621 | seastar_logger.debug("dev_id: {} mountpoint: {}", buf.st_dev, d.mountpoint); | |
3622 | _mountpoints.emplace(buf.st_dev, d); | |
3623 | } | |
3624 | } | |
3625 | if (!_num_io_queues) { | |
3626 | _num_io_queues = auto_num_io_queues; | |
3627 | } | |
3628 | } else if (!_num_io_queues) { | |
3629 | _num_io_queues = smp::count; | |
3630 | } | |
3631 | ||
3632 | // Placeholder for unconfigured disks. | |
3633 | mountpoint_params d = {}; | |
3634 | d.num_io_queues = _num_io_queues; | |
3635 | seastar_logger.debug("num_io_queues: {}", d.num_io_queues); | |
3636 | _mountpoints.emplace(0, d); | |
3637 | } | |
3638 | ||
3639 | struct io_queue::config generate_config(dev_t devid) const { | |
3640 | seastar_logger.debug("generate_config dev_id: {}", devid); | |
3641 | const mountpoint_params& p = _mountpoints.at(devid); | |
3642 | struct io_queue::config cfg; | |
3643 | uint64_t max_bandwidth = std::max(p.read_bytes_rate, p.write_bytes_rate); | |
3644 | uint64_t max_iops = std::max(p.read_req_rate, p.write_req_rate); | |
3645 | ||
f67539c2 TL |
3646 | cfg.devid = devid; |
3647 | cfg.disk_bytes_write_to_read_multiplier = io_queue::read_request_base_count; | |
3648 | cfg.disk_req_write_to_read_multiplier = io_queue::read_request_base_count; | |
3649 | ||
11fdf7f2 | 3650 | if (!_capacity) { |
11fdf7f2 | 3651 | if (max_bandwidth != std::numeric_limits<uint64_t>::max()) { |
f67539c2 | 3652 | cfg.disk_bytes_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_bytes_rate) / p.write_bytes_rate; |
11fdf7f2 TL |
3653 | cfg.max_bytes_count = io_queue::read_request_base_count * per_io_queue(max_bandwidth * latency_goal().count(), devid); |
3654 | } | |
3655 | if (max_iops != std::numeric_limits<uint64_t>::max()) { | |
3656 | cfg.max_req_count = io_queue::read_request_base_count * per_io_queue(max_iops * latency_goal().count(), devid); | |
f67539c2 | 3657 | cfg.disk_req_write_to_read_multiplier = (io_queue::read_request_base_count * p.read_req_rate) / p.write_req_rate; |
11fdf7f2 TL |
3658 | } |
3659 | cfg.mountpoint = p.mountpoint; | |
3660 | } else { | |
f67539c2 TL |
3661 | // For backwards compatibility |
3662 | cfg.capacity = *_capacity; | |
3663 | // Legacy configuration when only concurrency is specified. | |
3664 | cfg.max_req_count = io_queue::read_request_base_count * std::min(*_capacity, reactor::max_aio_per_queue); | |
3665 | // specify size in terms of 16kB IOPS. | |
3666 | cfg.max_bytes_count = io_queue::read_request_base_count * (cfg.max_req_count << 14); | |
11fdf7f2 TL |
3667 | } |
3668 | return cfg; | |
3669 | } | |
3670 | ||
3671 | auto device_ids() { | |
3672 | return boost::adaptors::keys(_mountpoints); | |
3673 | } | |
3674 | }; | |
3675 | ||
9f95a23c | 3676 | void smp::register_network_stacks() { |
11fdf7f2 TL |
3677 | register_posix_stack(); |
3678 | register_native_stack(); | |
3679 | } | |
3680 | ||
9f95a23c | 3681 | void smp::configure(boost::program_options::variables_map configuration, reactor_config reactor_cfg) |
11fdf7f2 | 3682 | { |
11fdf7f2 TL |
3683 | #ifndef SEASTAR_NO_EXCEPTION_HACK |
3684 | if (configuration["enable-glibc-exception-scaling-workaround"].as<bool>()) { | |
3685 | init_phdr_cache(); | |
3686 | } | |
3687 | #endif | |
3688 | ||
3689 | // Mask most, to prevent threads (esp. dpdk helper threads) | |
3690 | // from servicing a signal. Individual reactors will unmask signals | |
3691 | // as they become prepared to handle them. | |
3692 | // | |
3693 | // We leave some signals unmasked since we don't handle them ourself. | |
3694 | sigset_t sigs; | |
3695 | sigfillset(&sigs); | |
3696 | for (auto sig : {SIGHUP, SIGQUIT, SIGILL, SIGABRT, SIGFPE, SIGSEGV, | |
3697 | SIGALRM, SIGCONT, SIGSTOP, SIGTSTP, SIGTTIN, SIGTTOU}) { | |
3698 | sigdelset(&sigs, sig); | |
3699 | } | |
9f95a23c TL |
3700 | if (!reactor_cfg.auto_handle_sigint_sigterm) { |
3701 | sigdelset(&sigs, SIGINT); | |
3702 | sigdelset(&sigs, SIGTERM); | |
3703 | } | |
11fdf7f2 TL |
3704 | pthread_sigmask(SIG_BLOCK, &sigs, nullptr); |
3705 | ||
9f95a23c TL |
3706 | #ifndef SEASTAR_ASAN_ENABLED |
3707 | // We don't need to handle SIGSEGV when asan is enabled. | |
11fdf7f2 | 3708 | install_oneshot_signal_handler<SIGSEGV, sigsegv_action>(); |
9f95a23c TL |
3709 | #else |
3710 | (void)sigsegv_action; | |
3711 | #endif | |
11fdf7f2 TL |
3712 | install_oneshot_signal_handler<SIGABRT, sigabrt_action>(); |
3713 | ||
3714 | #ifdef SEASTAR_HAVE_DPDK | |
3715 | _using_dpdk = configuration.count("dpdk-pmd"); | |
3716 | #endif | |
3717 | auto thread_affinity = configuration["thread-affinity"].as<bool>(); | |
3718 | if (configuration.count("overprovisioned") | |
3719 | && configuration["thread-affinity"].defaulted()) { | |
3720 | thread_affinity = false; | |
3721 | } | |
3722 | if (!thread_affinity && _using_dpdk) { | |
3723 | fmt::print("warning: --thread-affinity 0 ignored in dpdk mode\n"); | |
3724 | } | |
3725 | auto mbind = configuration["mbind"].as<bool>(); | |
3726 | if (!thread_affinity) { | |
3727 | mbind = false; | |
3728 | } | |
3729 | ||
3730 | smp::count = 1; | |
3731 | smp::_tmain = std::this_thread::get_id(); | |
3732 | auto nr_cpus = resource::nr_processing_units(); | |
3733 | resource::cpuset cpu_set; | |
9f95a23c TL |
3734 | auto cgroup_cpu_set = cgroup::cpu_set(); |
3735 | ||
11fdf7f2 TL |
3736 | std::copy(boost::counting_iterator<unsigned>(0), boost::counting_iterator<unsigned>(nr_cpus), |
3737 | std::inserter(cpu_set, cpu_set.end())); | |
9f95a23c | 3738 | |
11fdf7f2 TL |
3739 | if (configuration.count("cpuset")) { |
3740 | cpu_set = configuration["cpuset"].as<cpuset_bpo_wrapper>().value; | |
9f95a23c TL |
3741 | if (cgroup_cpu_set && *cgroup_cpu_set != cpu_set) { |
3742 | // CPUs that are not available are those pinned by | |
3743 | // --cpuset but not by cgroups, if mounted. | |
3744 | std::set<unsigned int> not_available_cpus; | |
3745 | std::set_difference(cpu_set.begin(), cpu_set.end(), | |
3746 | cgroup_cpu_set->begin(), cgroup_cpu_set->end(), | |
3747 | std::inserter(not_available_cpus, not_available_cpus.end())); | |
3748 | ||
3749 | if (!not_available_cpus.empty()) { | |
3750 | std::ostringstream not_available_cpus_list; | |
3751 | for (auto cpu_id : not_available_cpus) { | |
3752 | not_available_cpus_list << " " << cpu_id; | |
3753 | } | |
3754 | seastar_logger.error("Bad value for --cpuset:{} not allowed. Shutting down.", not_available_cpus_list.str()); | |
3755 | exit(1); | |
3756 | } | |
3757 | } | |
3758 | } else if (cgroup_cpu_set) { | |
3759 | cpu_set = *cgroup_cpu_set; | |
11fdf7f2 | 3760 | } |
9f95a23c | 3761 | |
11fdf7f2 TL |
3762 | if (configuration.count("smp")) { |
3763 | nr_cpus = configuration["smp"].as<unsigned>(); | |
3764 | } else { | |
3765 | nr_cpus = cpu_set.size(); | |
3766 | } | |
3767 | smp::count = nr_cpus; | |
3768 | _reactors.resize(nr_cpus); | |
3769 | resource::configuration rc; | |
3770 | if (configuration.count("memory")) { | |
3771 | rc.total_memory = parse_memory_size(configuration["memory"].as<std::string>()); | |
3772 | #ifdef SEASTAR_HAVE_DPDK | |
3773 | if (configuration.count("hugepages") && | |
3774 | !configuration["network-stack"].as<std::string>().compare("native") && | |
3775 | _using_dpdk) { | |
3776 | size_t dpdk_memory = dpdk::eal::mem_size(smp::count); | |
3777 | ||
3778 | if (dpdk_memory >= rc.total_memory) { | |
3779 | std::cerr<<"Can't run with the given amount of memory: "; | |
3780 | std::cerr<<configuration["memory"].as<std::string>(); | |
3781 | std::cerr<<". Consider giving more."<<std::endl; | |
3782 | exit(1); | |
3783 | } | |
3784 | ||
3785 | // | |
3786 | // Subtract the memory we are about to give to DPDK from the total | |
3787 | // amount of memory we are allowed to use. | |
3788 | // | |
3789 | rc.total_memory.value() -= dpdk_memory; | |
3790 | } | |
3791 | #endif | |
3792 | } | |
3793 | if (configuration.count("reserve-memory")) { | |
3794 | rc.reserve_memory = parse_memory_size(configuration["reserve-memory"].as<std::string>()); | |
3795 | } | |
f67539c2 | 3796 | std::optional<std::string> hugepages_path; |
11fdf7f2 TL |
3797 | if (configuration.count("hugepages")) { |
3798 | hugepages_path = configuration["hugepages"].as<std::string>(); | |
3799 | } | |
3800 | auto mlock = false; | |
3801 | if (configuration.count("lock-memory")) { | |
3802 | mlock = configuration["lock-memory"].as<bool>(); | |
3803 | } | |
3804 | if (mlock) { | |
f67539c2 TL |
3805 | auto extra_flags = 0; |
3806 | #ifdef MCL_ONFAULT | |
3807 | // Linux will serialize faulting in anonymous memory, and also | |
3808 | // serialize marking them as locked. This can take many minutes on | |
3809 | // terabyte class machines, so fault them in the future to spread | |
3810 | // out the cost. This isn't good since we'll see contention if | |
3811 | // multiple shards fault in memory at once, but if that work can be | |
3812 | // in parallel to regular reactor work on other shards. | |
3813 | extra_flags |= MCL_ONFAULT; // Linux 4.4+ | |
3814 | #endif | |
3815 | auto r = mlockall(MCL_CURRENT | MCL_FUTURE | extra_flags); | |
11fdf7f2 TL |
3816 | if (r) { |
3817 | // Don't hard fail for now, it's hard to get the configuration right | |
3818 | fmt::print("warning: failed to mlockall: {}\n", strerror(errno)); | |
3819 | } | |
3820 | } | |
3821 | ||
3822 | rc.cpus = smp::count; | |
3823 | rc.cpu_set = std::move(cpu_set); | |
3824 | ||
3825 | disk_config_params disk_config; | |
3826 | disk_config.parse_config(configuration); | |
3827 | for (auto& id : disk_config.device_ids()) { | |
3828 | rc.num_io_queues.emplace(id, disk_config.num_io_queues(id)); | |
3829 | } | |
3830 | ||
f67539c2 TL |
3831 | #ifdef SEASTAR_HAVE_HWLOC |
3832 | if (configuration["allow-cpus-in-remote-numa-nodes"].as<bool>()) { | |
3833 | rc.assign_orphan_cpus = true; | |
3834 | } | |
3835 | #endif | |
3836 | ||
11fdf7f2 TL |
3837 | auto resources = resource::allocate(rc); |
3838 | std::vector<resource::cpu> allocations = std::move(resources.cpus); | |
3839 | if (thread_affinity) { | |
3840 | smp::pin(allocations[0].cpu_id); | |
3841 | } | |
3842 | memory::configure(allocations[0].mem, mbind, hugepages_path); | |
3843 | ||
3844 | if (configuration.count("abort-on-seastar-bad-alloc")) { | |
3845 | memory::enable_abort_on_allocation_failure(); | |
3846 | } | |
3847 | ||
f67539c2 TL |
3848 | if (configuration.count("dump-memory-diagnostics-on-alloc-failure-kind")) { |
3849 | memory::set_dump_memory_diagnostics_on_alloc_failure_kind(configuration["dump-memory-diagnostics-on-alloc-failure-kind"].as<std::string>()); | |
3850 | } | |
3851 | ||
11fdf7f2 | 3852 | bool heapprof_enabled = configuration.count("heapprof"); |
9f95a23c TL |
3853 | if (heapprof_enabled) { |
3854 | memory::set_heap_profiling_enabled(heapprof_enabled); | |
3855 | } | |
11fdf7f2 TL |
3856 | |
3857 | #ifdef SEASTAR_HAVE_DPDK | |
3858 | if (smp::_using_dpdk) { | |
3859 | dpdk::eal::cpuset cpus; | |
3860 | for (auto&& a : allocations) { | |
3861 | cpus[a.cpu_id] = true; | |
3862 | } | |
3863 | dpdk::eal::init(cpus, configuration); | |
3864 | } | |
3865 | #endif | |
3866 | ||
3867 | // Better to put it into the smp class, but at smp construction time | |
3868 | // correct smp::count is not known. | |
3869 | static boost::barrier reactors_registered(smp::count); | |
3870 | static boost::barrier smp_queues_constructed(smp::count); | |
3871 | static boost::barrier inited(smp::count); | |
3872 | ||
3873 | auto ioq_topology = std::move(resources.ioq_topology); | |
3874 | ||
3875 | std::unordered_map<dev_t, std::vector<io_queue*>> all_io_queues; | |
11fdf7f2 TL |
3876 | |
3877 | for (auto& id : disk_config.device_ids()) { | |
3878 | auto io_info = ioq_topology.at(id); | |
f67539c2 | 3879 | all_io_queues.emplace(id, io_info.nr_coordinators); |
11fdf7f2 TL |
3880 | } |
3881 | ||
3882 | auto alloc_io_queue = [&ioq_topology, &all_io_queues, &disk_config] (unsigned shard, dev_t id) { | |
3883 | auto io_info = ioq_topology.at(id); | |
3884 | auto cid = io_info.shard_to_coordinator[shard]; | |
3885 | auto vec_idx = io_info.coordinator_to_idx[cid]; | |
3886 | assert(io_info.coordinator_to_idx_valid[cid]); | |
3887 | if (shard == cid) { | |
3888 | struct io_queue::config cfg = disk_config.generate_config(id); | |
3889 | cfg.coordinator = cid; | |
11fdf7f2 TL |
3890 | assert(vec_idx < all_io_queues[id].size()); |
3891 | assert(!all_io_queues[id][vec_idx]); | |
3892 | all_io_queues[id][vec_idx] = new io_queue(std::move(cfg)); | |
3893 | } | |
3894 | }; | |
3895 | ||
9f95a23c | 3896 | auto assign_io_queue = [&ioq_topology, &all_io_queues] (shard_id shard_id, dev_t dev_id) { |
11fdf7f2 TL |
3897 | auto io_info = ioq_topology.at(dev_id); |
3898 | auto cid = io_info.shard_to_coordinator[shard_id]; | |
3899 | auto queue_idx = io_info.coordinator_to_idx[cid]; | |
3900 | if (all_io_queues[dev_id][queue_idx]->coordinator() == shard_id) { | |
3901 | engine().my_io_queues.emplace_back(all_io_queues[dev_id][queue_idx]); | |
3902 | } | |
3903 | engine()._io_queues.emplace(dev_id, all_io_queues[dev_id][queue_idx]); | |
3904 | }; | |
3905 | ||
3906 | _all_event_loops_done.emplace(smp::count); | |
3907 | ||
3908 | auto backend_selector = configuration["reactor-backend"].as<reactor_backend_selector>(); | |
3909 | ||
3910 | unsigned i; | |
3911 | for (i = 1; i < smp::count; i++) { | |
3912 | auto allocation = allocations[i]; | |
9f95a23c TL |
3913 | create_thread([configuration, &disk_config, hugepages_path, i, allocation, assign_io_queue, alloc_io_queue, thread_affinity, heapprof_enabled, mbind, backend_selector, reactor_cfg] { |
3914 | try { | |
11fdf7f2 TL |
3915 | auto thread_name = seastar::format("reactor-{}", i); |
3916 | pthread_setname_np(pthread_self(), thread_name.c_str()); | |
3917 | if (thread_affinity) { | |
3918 | smp::pin(allocation.cpu_id); | |
3919 | } | |
3920 | memory::configure(allocation.mem, mbind, hugepages_path); | |
9f95a23c TL |
3921 | if (heapprof_enabled) { |
3922 | memory::set_heap_profiling_enabled(heapprof_enabled); | |
3923 | } | |
11fdf7f2 TL |
3924 | sigset_t mask; |
3925 | sigfillset(&mask); | |
3926 | for (auto sig : { SIGSEGV }) { | |
3927 | sigdelset(&mask, sig); | |
3928 | } | |
3929 | auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL); | |
3930 | throw_pthread_error(r); | |
9f95a23c TL |
3931 | init_default_smp_service_group(i); |
3932 | allocate_reactor(i, backend_selector, reactor_cfg); | |
11fdf7f2 TL |
3933 | _reactors[i] = &engine(); |
3934 | for (auto& dev_id : disk_config.device_ids()) { | |
3935 | alloc_io_queue(i, dev_id); | |
3936 | } | |
3937 | reactors_registered.wait(); | |
3938 | smp_queues_constructed.wait(); | |
3939 | start_all_queues(); | |
3940 | for (auto& dev_id : disk_config.device_ids()) { | |
3941 | assign_io_queue(i, dev_id); | |
3942 | } | |
3943 | inited.wait(); | |
3944 | engine().configure(configuration); | |
3945 | engine().run(); | |
9f95a23c TL |
3946 | } catch (const std::exception& e) { |
3947 | seastar_logger.error(e.what()); | |
3948 | _exit(1); | |
3949 | } | |
11fdf7f2 TL |
3950 | }); |
3951 | } | |
3952 | ||
9f95a23c TL |
3953 | init_default_smp_service_group(0); |
3954 | try { | |
3955 | allocate_reactor(0, backend_selector, reactor_cfg); | |
3956 | } catch (const std::exception& e) { | |
3957 | seastar_logger.error(e.what()); | |
3958 | _exit(1); | |
3959 | } | |
3960 | ||
11fdf7f2 TL |
3961 | _reactors[0] = &engine(); |
3962 | for (auto& dev_id : disk_config.device_ids()) { | |
3963 | alloc_io_queue(0, dev_id); | |
3964 | } | |
3965 | ||
3966 | #ifdef SEASTAR_HAVE_DPDK | |
3967 | if (_using_dpdk) { | |
3968 | auto it = _thread_loops.begin(); | |
3969 | RTE_LCORE_FOREACH_SLAVE(i) { | |
3970 | rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&*(it++)), i); | |
3971 | } | |
3972 | } | |
3973 | #endif | |
3974 | ||
3975 | reactors_registered.wait(); | |
3976 | smp::_qs = decltype(smp::_qs){new smp_message_queue* [smp::count], qs_deleter{}}; | |
3977 | for(unsigned i = 0; i < smp::count; i++) { | |
3978 | smp::_qs[i] = reinterpret_cast<smp_message_queue*>(operator new[] (sizeof(smp_message_queue) * smp::count)); | |
3979 | for (unsigned j = 0; j < smp::count; ++j) { | |
3980 | new (&smp::_qs[i][j]) smp_message_queue(_reactors[j], _reactors[i]); | |
3981 | } | |
3982 | } | |
3983 | alien::smp::_qs = alien::smp::create_qs(_reactors); | |
3984 | smp_queues_constructed.wait(); | |
3985 | start_all_queues(); | |
3986 | for (auto& dev_id : disk_config.device_ids()) { | |
3987 | assign_io_queue(0, dev_id); | |
3988 | } | |
3989 | inited.wait(); | |
3990 | ||
3991 | engine().configure(configuration); | |
3992 | // The raw `new` is necessary because of the private constructor of `lowres_clock_impl`. | |
3993 | engine()._lowres_clock_impl = std::unique_ptr<lowres_clock_impl>(new lowres_clock_impl); | |
3994 | } | |
3995 | ||
3996 | bool smp::poll_queues() { | |
3997 | size_t got = 0; | |
3998 | for (unsigned i = 0; i < count; i++) { | |
f67539c2 TL |
3999 | if (this_shard_id() != i) { |
4000 | auto& rxq = _qs[this_shard_id()][i]; | |
11fdf7f2 TL |
4001 | rxq.flush_response_batch(); |
4002 | got += rxq.has_unflushed_responses(); | |
4003 | got += rxq.process_incoming(); | |
f67539c2 | 4004 | auto& txq = _qs[i][this_shard_id()]; |
11fdf7f2 | 4005 | txq.flush_request_batch(); |
9f95a23c | 4006 | got += txq.process_completions(i); |
11fdf7f2 TL |
4007 | } |
4008 | } | |
4009 | return got != 0; | |
4010 | } | |
4011 | ||
4012 | bool smp::pure_poll_queues() { | |
4013 | for (unsigned i = 0; i < count; i++) { | |
f67539c2 TL |
4014 | if (this_shard_id() != i) { |
4015 | auto& rxq = _qs[this_shard_id()][i]; | |
11fdf7f2 | 4016 | rxq.flush_response_batch(); |
f67539c2 | 4017 | auto& txq = _qs[i][this_shard_id()]; |
11fdf7f2 TL |
4018 | txq.flush_request_batch(); |
4019 | if (rxq.pure_poll_rx() || txq.pure_poll_tx() || rxq.has_unflushed_responses()) { | |
4020 | return true; | |
4021 | } | |
4022 | } | |
4023 | } | |
4024 | return false; | |
4025 | } | |
4026 | ||
4027 | internal::preemption_monitor bootstrap_preemption_monitor{}; | |
4028 | __thread const internal::preemption_monitor* g_need_preempt = &bootstrap_preemption_monitor; | |
4029 | ||
4030 | __thread reactor* local_engine; | |
4031 | ||
f67539c2 | 4032 | void report_exception(std::string_view message, std::exception_ptr eptr) noexcept { |
11fdf7f2 TL |
4033 | seastar_logger.error("{}: {}", message, eptr); |
4034 | } | |
4035 | ||
f67539c2 | 4036 | future<> check_direct_io_support(std::string_view path) noexcept { |
11fdf7f2 TL |
4037 | struct w { |
4038 | sstring path; | |
4039 | open_flags flags; | |
4040 | std::function<future<>()> cleanup; | |
4041 | ||
f67539c2 | 4042 | static w parse(sstring path, std::optional<directory_entry_type> type) { |
11fdf7f2 TL |
4043 | if (!type) { |
4044 | throw std::invalid_argument(format("Could not open file at {}. Make sure it exists", path)); | |
4045 | } | |
4046 | ||
4047 | if (type == directory_entry_type::directory) { | |
4048 | auto fpath = path + "/.o_direct_test"; | |
4049 | return w{fpath, open_flags::wo | open_flags::create | open_flags::truncate, [fpath] { return remove_file(fpath); }}; | |
4050 | } else if ((type == directory_entry_type::regular) || (type == directory_entry_type::link)) { | |
4051 | return w{path, open_flags::ro, [] { return make_ready_future<>(); }}; | |
4052 | } else { | |
4053 | throw std::invalid_argument(format("{} neither a directory nor file. Can't be opened with O_DIRECT", path)); | |
4054 | } | |
4055 | }; | |
4056 | }; | |
4057 | ||
f67539c2 TL |
4058 | // Allocating memory for a sstring can throw, hence the futurize_invoke |
4059 | return futurize_invoke([path] { | |
4060 | return engine().file_type(path).then([path = sstring(path)] (auto type) { | |
4061 | auto w = w::parse(path, type); | |
4062 | return open_file_dma(w.path, w.flags).then_wrapped([path = w.path, cleanup = std::move(w.cleanup)] (future<file> f) { | |
4063 | try { | |
4064 | auto fd = f.get0(); | |
4065 | return cleanup().finally([fd = std::move(fd)] () mutable { | |
4066 | return fd.close(); | |
4067 | }); | |
4068 | } catch (std::system_error& e) { | |
4069 | if (e.code() == std::error_code(EINVAL, std::system_category())) { | |
4070 | report_exception(format("Could not open file at {}. Does your filesystem support O_DIRECT?", path), std::current_exception()); | |
4071 | } | |
4072 | throw; | |
11fdf7f2 | 4073 | } |
11fdf7f2 TL |
4074 | }); |
4075 | }); | |
4076 | }); | |
4077 | } | |
4078 | ||
11fdf7f2 TL |
4079 | server_socket listen(socket_address sa) { |
4080 | return engine().listen(sa); | |
4081 | } | |
4082 | ||
4083 | server_socket listen(socket_address sa, listen_options opts) { | |
4084 | return engine().listen(sa, opts); | |
4085 | } | |
4086 | ||
4087 | future<connected_socket> connect(socket_address sa) { | |
4088 | return engine().connect(sa); | |
4089 | } | |
4090 | ||
4091 | future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) { | |
4092 | return engine().connect(sa, local, proto); | |
4093 | } | |
4094 | ||
f67539c2 TL |
4095 | socket make_socket() { |
4096 | return engine().net().socket(); | |
4097 | } | |
4098 | ||
4099 | net::udp_channel make_udp_channel() { | |
4100 | return engine().net().make_udp_channel(); | |
4101 | } | |
4102 | ||
4103 | net::udp_channel make_udp_channel(const socket_address& local) { | |
4104 | return engine().net().make_udp_channel(local); | |
4105 | } | |
4106 | ||
9f95a23c TL |
4107 | void reactor::add_high_priority_task(task* t) noexcept { |
4108 | add_urgent_task(t); | |
11fdf7f2 TL |
4109 | // break .then() chains |
4110 | request_preemption(); | |
4111 | } | |
4112 | ||
f67539c2 TL |
4113 | |
4114 | void set_idle_cpu_handler(idle_cpu_handler&& handler) { | |
4115 | engine().set_idle_cpu_handler(std::move(handler)); | |
4116 | } | |
4117 | ||
11fdf7f2 TL |
4118 | static |
4119 | bool | |
4120 | virtualized() { | |
9f95a23c | 4121 | return fs::exists("/sys/hypervisor/type"); |
11fdf7f2 TL |
4122 | } |
4123 | ||
4124 | std::chrono::nanoseconds | |
4125 | reactor::calculate_poll_time() { | |
4126 | // In a non-virtualized environment, select a poll time | |
4127 | // that is competitive with halt/unhalt. | |
4128 | // | |
4129 | // In a virutalized environment, IPIs are slow and dominate | |
4130 | // sleep/wake (mprotect/tgkill), so increase poll time to reduce | |
4131 | // so we don't sleep in a request/reply workload | |
4132 | return virtualized() ? 2000us : 200us; | |
4133 | } | |
4134 | ||
f67539c2 TL |
4135 | future<> later() noexcept { |
4136 | memory::scoped_critical_alloc_section _; | |
11fdf7f2 | 4137 | engine().force_poll(); |
f67539c2 TL |
4138 | auto tsk = make_task(default_scheduling_group(), [] {}); |
4139 | schedule(tsk); | |
4140 | return tsk->get_future(); | |
11fdf7f2 TL |
4141 | } |
4142 | ||
4143 | void add_to_flush_poller(output_stream<char>* os) { | |
4144 | engine()._flush_batching.emplace_back(os); | |
4145 | } | |
4146 | ||
4147 | reactor::sched_clock::duration reactor::total_idle_time() { | |
4148 | return _total_idle; | |
4149 | } | |
4150 | ||
4151 | reactor::sched_clock::duration reactor::total_busy_time() { | |
4152 | return sched_clock::now() - _start_time - _total_idle; | |
4153 | } | |
4154 | ||
4155 | std::chrono::nanoseconds reactor::total_steal_time() { | |
4156 | // Steal time: this mimics the concept some Hypervisors have about Steal time. | |
4157 | // That is the time in which a VM has something to run, but is not running because some other | |
4158 | // process (another VM or the hypervisor itself) is in control. | |
4159 | // | |
4160 | // For us, we notice that during the time in which we were not sleeping (either running or busy | |
4161 | // polling while idle), we should be accumulating thread runtime. If we are not, that's because | |
4162 | // someone stole it from us. | |
4163 | // | |
4164 | // Because this is totally in userspace we can miss some events. For instance, if the seastar | |
4165 | // process is ready to run but the kernel hasn't scheduled us yet, that would be technically | |
4166 | // steal time but we have no ways to account it. | |
4167 | // | |
4168 | // But what we have here should be good enough and at least has a well defined meaning. | |
4169 | return std::chrono::duration_cast<std::chrono::nanoseconds>(sched_clock::now() - _start_time - _total_sleep) - | |
4170 | std::chrono::duration_cast<std::chrono::nanoseconds>(thread_cputime_clock::now().time_since_epoch()); | |
4171 | } | |
4172 | ||
4173 | static std::atomic<unsigned long> s_used_scheduling_group_ids_bitmap{3}; // 0=main, 1=atexit | |
9f95a23c | 4174 | static std::atomic<unsigned long> s_next_scheduling_group_specific_key{0}; |
11fdf7f2 TL |
4175 | |
4176 | static | |
f67539c2 TL |
4177 | int |
4178 | allocate_scheduling_group_id() noexcept { | |
9f95a23c | 4179 | static_assert(max_scheduling_groups() <= std::numeric_limits<unsigned long>::digits, "more scheduling groups than available bits"); |
11fdf7f2 TL |
4180 | auto b = s_used_scheduling_group_ids_bitmap.load(std::memory_order_relaxed); |
4181 | auto nb = b; | |
4182 | unsigned i = 0; | |
4183 | do { | |
4184 | if (__builtin_popcountl(b) == max_scheduling_groups()) { | |
f67539c2 | 4185 | return -1; |
11fdf7f2 TL |
4186 | } |
4187 | i = count_trailing_zeros(~b); | |
4188 | nb = b | (1ul << i); | |
4189 | } while (!s_used_scheduling_group_ids_bitmap.compare_exchange_weak(b, nb, std::memory_order_relaxed)); | |
4190 | return i; | |
4191 | } | |
4192 | ||
9f95a23c TL |
4193 | static |
4194 | unsigned long | |
f67539c2 | 4195 | allocate_scheduling_group_specific_key() noexcept { |
9f95a23c TL |
4196 | return s_next_scheduling_group_specific_key.fetch_add(1, std::memory_order_relaxed); |
4197 | } | |
4198 | ||
11fdf7f2 TL |
4199 | static |
4200 | void | |
f67539c2 | 4201 | deallocate_scheduling_group_id(unsigned id) noexcept { |
11fdf7f2 TL |
4202 | s_used_scheduling_group_ids_bitmap.fetch_and(~(1ul << id), std::memory_order_relaxed); |
4203 | } | |
4204 | ||
4205 | void | |
9f95a23c | 4206 | reactor::allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key) { |
f67539c2 TL |
4207 | auto& sg_data = _scheduling_group_specific_data; |
4208 | auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; | |
4209 | this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key.id()+1)); | |
4210 | this_sg.specific_vals[key.id()] = | |
4211 | aligned_alloc(sg_data.scheduling_group_key_configs[key.id()].alignment, | |
4212 | sg_data.scheduling_group_key_configs[key.id()].allocation_size); | |
4213 | if (!this_sg.specific_vals[key.id()]) { | |
9f95a23c TL |
4214 | std::abort(); |
4215 | } | |
f67539c2 TL |
4216 | if (sg_data.scheduling_group_key_configs[key.id()].constructor) { |
4217 | sg_data.scheduling_group_key_configs[key.id()].constructor(this_sg.specific_vals[key.id()]); | |
9f95a23c TL |
4218 | } |
4219 | } | |
4220 | ||
4221 | future<> | |
11fdf7f2 | 4222 | reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, float shares) { |
f67539c2 TL |
4223 | auto& sg_data = _scheduling_group_specific_data; |
4224 | auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; | |
4225 | this_sg.queue_is_initialized = true; | |
11fdf7f2 TL |
4226 | _task_queues.resize(std::max<size_t>(_task_queues.size(), sg._id + 1)); |
4227 | _task_queues[sg._id] = std::make_unique<task_queue>(sg._id, name, shares); | |
9f95a23c TL |
4228 | unsigned long num_keys = s_next_scheduling_group_specific_key.load(std::memory_order_relaxed); |
4229 | ||
4230 | return with_scheduling_group(sg, [this, num_keys, sg] () { | |
4231 | for (unsigned long key_id = 0; key_id < num_keys; key_id++) { | |
4232 | allocate_scheduling_group_specific_data(sg, scheduling_group_key(key_id)); | |
4233 | } | |
4234 | }); | |
11fdf7f2 TL |
4235 | } |
4236 | ||
9f95a23c TL |
4237 | future<> |
4238 | reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) { | |
f67539c2 TL |
4239 | auto& sg_data = _scheduling_group_specific_data; |
4240 | sg_data.scheduling_group_key_configs.resize(std::max<size_t>(sg_data.scheduling_group_key_configs.size(), key.id() + 1)); | |
4241 | sg_data.scheduling_group_key_configs[key.id()] = cfg; | |
9f95a23c TL |
4242 | return parallel_for_each(_task_queues, [this, cfg, key] (std::unique_ptr<task_queue>& tq) { |
4243 | if (tq) { | |
4244 | scheduling_group sg = scheduling_group(tq->_id); | |
4245 | return with_scheduling_group(sg, [this, key, sg] () { | |
4246 | allocate_scheduling_group_specific_data(sg, key); | |
4247 | }); | |
4248 | } | |
4249 | return make_ready_future(); | |
4250 | }); | |
4251 | } | |
4252 | ||
4253 | future<> | |
11fdf7f2 | 4254 | reactor::destroy_scheduling_group(scheduling_group sg) { |
9f95a23c | 4255 | return with_scheduling_group(sg, [this, sg] () { |
f67539c2 TL |
4256 | auto& sg_data = _scheduling_group_specific_data; |
4257 | auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; | |
4258 | for (unsigned long key_id = 0; key_id < sg_data.scheduling_group_key_configs.size(); key_id++) { | |
4259 | void* val = this_sg.specific_vals[key_id]; | |
9f95a23c | 4260 | if (val) { |
f67539c2 TL |
4261 | if (sg_data.scheduling_group_key_configs[key_id].destructor) { |
4262 | sg_data.scheduling_group_key_configs[key_id].destructor(val); | |
9f95a23c TL |
4263 | } |
4264 | free(val); | |
f67539c2 | 4265 | this_sg.specific_vals[key_id] = nullptr; |
9f95a23c TL |
4266 | } |
4267 | } | |
4268 | }).then( [this, sg] () { | |
f67539c2 TL |
4269 | auto& sg_data = _scheduling_group_specific_data; |
4270 | auto& this_sg = sg_data.per_scheduling_group_data[sg._id]; | |
4271 | this_sg.queue_is_initialized = false; | |
9f95a23c TL |
4272 | _task_queues[sg._id].reset(); |
4273 | }); | |
4274 | ||
4275 | } | |
4276 | ||
4277 | void | |
f67539c2 TL |
4278 | internal::no_such_scheduling_group(scheduling_group sg) { |
4279 | throw std::invalid_argument(format("The scheduling group does not exist ({})", internal::scheduling_group_index(sg))); | |
11fdf7f2 TL |
4280 | } |
4281 | ||
4282 | const sstring& | |
f67539c2 | 4283 | scheduling_group::name() const noexcept { |
11fdf7f2 TL |
4284 | return engine()._task_queues[_id]->_name; |
4285 | } | |
4286 | ||
4287 | void | |
f67539c2 | 4288 | scheduling_group::set_shares(float shares) noexcept { |
11fdf7f2 TL |
4289 | engine()._task_queues[_id]->set_shares(shares); |
4290 | } | |
4291 | ||
4292 | future<scheduling_group> | |
f67539c2 TL |
4293 | create_scheduling_group(sstring name, float shares) noexcept { |
4294 | auto aid = allocate_scheduling_group_id(); | |
4295 | if (aid < 0) { | |
4296 | return make_exception_future<scheduling_group>(std::runtime_error("Scheduling group limit exceeded")); | |
4297 | } | |
4298 | auto id = static_cast<unsigned>(aid); | |
11fdf7f2 TL |
4299 | assert(id < max_scheduling_groups()); |
4300 | auto sg = scheduling_group(id); | |
4301 | return smp::invoke_on_all([sg, name, shares] { | |
9f95a23c | 4302 | return engine().init_scheduling_group(sg, name, shares); |
11fdf7f2 TL |
4303 | }).then([sg] { |
4304 | return make_ready_future<scheduling_group>(sg); | |
4305 | }); | |
4306 | } | |
4307 | ||
9f95a23c | 4308 | future<scheduling_group_key> |
f67539c2 | 4309 | scheduling_group_key_create(scheduling_group_key_config cfg) noexcept { |
9f95a23c TL |
4310 | scheduling_group_key key = allocate_scheduling_group_specific_key(); |
4311 | return smp::invoke_on_all([key, cfg] { | |
4312 | return engine().init_new_scheduling_group_key(key, cfg); | |
4313 | }).then([key] { | |
4314 | return make_ready_future<scheduling_group_key>(key); | |
4315 | }); | |
4316 | } | |
4317 | ||
4318 | future<> | |
4319 | rename_priority_class(io_priority_class pc, sstring new_name) { | |
4320 | return reactor::rename_priority_class(pc, new_name); | |
4321 | } | |
4322 | ||
11fdf7f2 | 4323 | future<> |
f67539c2 | 4324 | destroy_scheduling_group(scheduling_group sg) noexcept { |
11fdf7f2 | 4325 | if (sg == default_scheduling_group()) { |
f67539c2 | 4326 | return make_exception_future<>(make_backtraced_exception_ptr<std::runtime_error>("Attempt to destroy the default scheduling group")); |
11fdf7f2 TL |
4327 | } |
4328 | if (sg == current_scheduling_group()) { | |
f67539c2 | 4329 | return make_exception_future<>(make_backtraced_exception_ptr<std::runtime_error>("Attempt to destroy the current scheduling group")); |
11fdf7f2 TL |
4330 | } |
4331 | return smp::invoke_on_all([sg] { | |
9f95a23c | 4332 | return engine().destroy_scheduling_group(sg); |
11fdf7f2 TL |
4333 | }).then([sg] { |
4334 | deallocate_scheduling_group_id(sg._id); | |
4335 | }); | |
4336 | } | |
4337 | ||
9f95a23c | 4338 | future<> |
f67539c2 | 4339 | rename_scheduling_group(scheduling_group sg, sstring new_name) noexcept { |
9f95a23c | 4340 | if (sg == default_scheduling_group()) { |
f67539c2 | 4341 | return make_exception_future<>(make_backtraced_exception_ptr<std::runtime_error>("Attempt to rename the default scheduling group")); |
9f95a23c TL |
4342 | } |
4343 | return smp::invoke_on_all([sg, new_name] { | |
4344 | engine()._task_queues[sg._id]->rename(new_name); | |
4345 | }); | |
4346 | } | |
11fdf7f2 TL |
4347 | |
4348 | namespace internal { | |
4349 | ||
4350 | inline | |
4351 | std::chrono::steady_clock::duration | |
4352 | timeval_to_duration(::timeval tv) { | |
4353 | return std::chrono::seconds(tv.tv_sec) + std::chrono::microseconds(tv.tv_usec); | |
4354 | } | |
4355 | ||
4356 | class reactor_stall_sampler : public reactor::pollfn { | |
4357 | std::chrono::steady_clock::time_point _run_start; | |
4358 | ::rusage _run_start_rusage; | |
4359 | uint64_t _kernel_stalls = 0; | |
4360 | std::chrono::steady_clock::duration _nonsleep_cpu_time = {}; | |
4361 | std::chrono::steady_clock::duration _nonsleep_wall_time = {}; | |
4362 | private: | |
4363 | static ::rusage get_rusage() { | |
4364 | struct ::rusage ru; | |
4365 | ::getrusage(RUSAGE_THREAD, &ru); | |
4366 | return ru; | |
4367 | } | |
4368 | static std::chrono::steady_clock::duration cpu_time(const ::rusage& ru) { | |
4369 | return timeval_to_duration(ru.ru_stime) + timeval_to_duration(ru.ru_utime); | |
4370 | } | |
4371 | void mark_run_start() { | |
4372 | _run_start = std::chrono::steady_clock::now(); | |
4373 | _run_start_rusage = get_rusage(); | |
4374 | } | |
4375 | void mark_run_end() { | |
4376 | auto start_nvcsw = _run_start_rusage.ru_nvcsw; | |
4377 | auto start_cpu_time = cpu_time(_run_start_rusage); | |
4378 | auto start_time = _run_start; | |
4379 | _run_start = std::chrono::steady_clock::now(); | |
4380 | _run_start_rusage = get_rusage(); | |
4381 | _kernel_stalls += _run_start_rusage.ru_nvcsw - start_nvcsw; | |
4382 | _nonsleep_cpu_time += cpu_time(_run_start_rusage) - start_cpu_time; | |
4383 | _nonsleep_wall_time += _run_start - start_time; | |
4384 | } | |
4385 | public: | |
4386 | reactor_stall_sampler() { mark_run_start(); } | |
4387 | virtual bool poll() override { return false; } | |
4388 | virtual bool pure_poll() override { return false; } | |
4389 | virtual bool try_enter_interrupt_mode() override { | |
4390 | // try_enter_interrupt_mode marks the end of a reactor run that should be context-switch free | |
4391 | mark_run_end(); | |
4392 | return true; | |
4393 | } | |
4394 | virtual void exit_interrupt_mode() override { | |
4395 | // start a reactor run that should be context switch free | |
4396 | mark_run_start(); | |
4397 | } | |
4398 | stall_report report() const { | |
4399 | stall_report r; | |
4400 | // mark_run_end() with an immediate mark_run_start() is logically a no-op, | |
4401 | // but each one of them has an effect, so they can't be marked const | |
4402 | const_cast<reactor_stall_sampler*>(this)->mark_run_end(); | |
4403 | r.kernel_stalls = _kernel_stalls; | |
4404 | r.run_wall_time = _nonsleep_wall_time; | |
4405 | r.stall_time = _nonsleep_wall_time - _nonsleep_cpu_time; | |
4406 | const_cast<reactor_stall_sampler*>(this)->mark_run_start(); | |
4407 | return r; | |
4408 | } | |
4409 | }; | |
4410 | ||
4411 | future<stall_report> | |
4412 | report_reactor_stalls(noncopyable_function<future<> ()> uut) { | |
4413 | auto reporter = std::make_unique<reactor_stall_sampler>(); | |
4414 | auto p_reporter = reporter.get(); | |
4415 | auto poller = reactor::poller(std::move(reporter)); | |
4416 | return uut().then([poller = std::move(poller), p_reporter] () mutable { | |
4417 | return p_reporter->report(); | |
4418 | }); | |
4419 | } | |
4420 | ||
4421 | std::ostream& operator<<(std::ostream& os, const stall_report& sr) { | |
4422 | auto to_ms = [] (std::chrono::steady_clock::duration d) -> float { | |
4423 | return std::chrono::duration<float>(d) / 1ms; | |
4424 | }; | |
4425 | return os << format("{} stalls, {} ms stall time, {} ms run time", sr.kernel_stalls, to_ms(sr.stall_time), to_ms(sr.run_wall_time)); | |
4426 | } | |
4427 | ||
4428 | } | |
4429 | ||
f67539c2 TL |
4430 | #ifdef SEASTAR_TASK_BACKTRACE |
4431 | ||
4432 | void task::make_backtrace() noexcept { | |
4433 | memory::disable_backtrace_temporarily dbt; | |
4434 | try { | |
4435 | _bt = make_lw_shared<simple_backtrace>(current_backtrace_tasklocal()); | |
4436 | } catch (...) { | |
4437 | _bt = nullptr; | |
4438 | } | |
4439 | } | |
4440 | ||
4441 | #endif | |
4442 | ||
11fdf7f2 | 4443 | } |