]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/smp.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / smp.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21
22 #pragma once
23
24 #include <seastar/core/future.hh>
25 #include <seastar/core/loop.hh>
26 #include <seastar/core/semaphore.hh>
27 #include <seastar/core/metrics.hh>
28 #include <seastar/core/posix.hh>
29 #include <seastar/core/reactor_config.hh>
30 #include <seastar/core/resource.hh>
31 #include <seastar/util/program-options.hh>
32 #include <boost/lockfree/spsc_queue.hpp>
33 #include <boost/thread/barrier.hpp>
34 #include <boost/range/irange.hpp>
35 #include <boost/program_options.hpp>
36 #include <deque>
37 #include <thread>
38
39 /// \file
40
41 namespace seastar {
42
43 using shard_id = unsigned;
44
45 class smp_service_group;
46 class reactor_backend_selector;
47
48 namespace alien {
49
50 class instance;
51
52 }
53
54 namespace internal {
55
56 unsigned smp_service_group_id(smp_service_group ssg) noexcept;
57
58 inline shard_id* this_shard_id_ptr() noexcept {
59 static thread_local shard_id g_this_shard_id;
60 return &g_this_shard_id;
61 }
62
63 }
64
65 /// Returns shard_id of the of the current shard.
66 inline shard_id this_shard_id() noexcept {
67 return *internal::this_shard_id_ptr();
68 }
69
70 /// Configuration for smp_service_group objects.
71 ///
72 /// \see create_smp_service_group()
73 struct smp_service_group_config {
74 /// The maximum number of non-local requests that execute on a shard concurrently
75 ///
76 /// Will be adjusted upwards to allow at least one request per non-local shard.
77 unsigned max_nonlocal_requests = 0;
78 /// An optional name for this smp group
79 ///
80 /// If this optional is engaged, timeout exception messages of the group's
81 /// semaphores will indicate the group's name.
82 std::optional<sstring> group_name;
83 };
84
85 /// A resource controller for cross-shard calls.
86 ///
87 /// An smp_service_group allows you to limit the concurrency of
88 /// smp::submit_to() and similar calls. While it's easy to limit
89 /// the caller's concurrency (for example, by using a semaphore),
90 /// the concurrency at the remote end can be multiplied by a factor
91 /// of smp::count-1, which can be large.
92 ///
93 /// The class is called a service _group_ because it can be used
94 /// to group similar calls that share resource usage characteristics,
95 /// need not be isolated from each other, but do need to be isolated
96 /// from other groups. Calls in a group should not nest; doing so
97 /// can result in ABA deadlocks.
98 ///
99 /// Nested submit_to() calls must form a directed acyclic graph
100 /// when considering their smp_service_groups as nodes. For example,
101 /// if a call using ssg1 then invokes another call using ssg2, the
102 /// internal call may not call again via either ssg1 or ssg2, or it
103 /// may form a cycle (and risking an ABBA deadlock). Create a
104 /// new smp_service_group_instead.
105 class smp_service_group {
106 unsigned _id;
107 private:
108 explicit smp_service_group(unsigned id) noexcept : _id(id) {}
109
110 friend unsigned internal::smp_service_group_id(smp_service_group ssg) noexcept;
111 friend smp_service_group default_smp_service_group() noexcept;
112 friend future<smp_service_group> create_smp_service_group(smp_service_group_config ssgc) noexcept;
113 };
114
115 inline
116 unsigned
117 internal::smp_service_group_id(smp_service_group ssg) noexcept {
118 return ssg._id;
119 }
120
121 /// Returns the default smp_service_group. This smp_service_group
122 /// does not impose any limits on concurrency in the target shard.
123 /// This makes is deadlock-safe, but can consume unbounded resources,
124 /// and should therefore only be used when initiator concurrency is
125 /// very low (e.g. administrative tasks).
126 smp_service_group default_smp_service_group() noexcept;
127
128 /// Creates an smp_service_group with the specified configuration.
129 ///
130 /// The smp_service_group is global, and after this call completes,
131 /// the returned value can be used on any shard.
132 future<smp_service_group> create_smp_service_group(smp_service_group_config ssgc) noexcept;
133
134 /// Destroy an smp_service_group.
135 ///
136 /// Frees all resources used by an smp_service_group. It must not
137 /// be used again once this function is called.
138 future<> destroy_smp_service_group(smp_service_group ssg) noexcept;
139
140 inline
141 smp_service_group default_smp_service_group() noexcept {
142 return smp_service_group(0);
143 }
144
145 using smp_timeout_clock = lowres_clock;
146 using smp_service_group_semaphore = basic_semaphore<named_semaphore_exception_factory, smp_timeout_clock>;
147 using smp_service_group_semaphore_units = semaphore_units<named_semaphore_exception_factory, smp_timeout_clock>;
148
149 static constexpr smp_timeout_clock::time_point smp_no_timeout = smp_timeout_clock::time_point::max();
150
151 /// Options controlling the behaviour of \ref smp::submit_to().
152 struct smp_submit_to_options {
153 /// Controls resource allocation.
154 smp_service_group service_group = default_smp_service_group();
155 /// The timeout is relevant only to the time the call spends waiting to be
156 /// processed by the remote shard, and *not* to the time it takes to be
157 /// executed there.
158 smp_timeout_clock::time_point timeout = smp_no_timeout;
159
160 smp_submit_to_options(smp_service_group service_group = default_smp_service_group(), smp_timeout_clock::time_point timeout = smp_no_timeout) noexcept
161 : service_group(service_group)
162 , timeout(timeout) {
163 }
164 };
165
166 void init_default_smp_service_group(shard_id cpu);
167
168 smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t) noexcept;
169
170 class smp_message_queue {
171 static constexpr size_t queue_length = 128;
172 static constexpr size_t batch_size = 16;
173 static constexpr size_t prefetch_cnt = 2;
174 struct work_item;
175 struct lf_queue_remote {
176 reactor* remote;
177 };
178 using lf_queue_base = boost::lockfree::spsc_queue<work_item*,
179 boost::lockfree::capacity<queue_length>>;
180 // use inheritence to control placement order
181 struct lf_queue : lf_queue_remote, lf_queue_base {
182 lf_queue(reactor* remote) : lf_queue_remote{remote} {}
183 void maybe_wakeup();
184 ~lf_queue();
185 };
186 lf_queue _pending;
187 lf_queue _completed;
188 struct alignas(seastar::cache_line_size) {
189 size_t _sent = 0;
190 size_t _compl = 0;
191 size_t _last_snt_batch = 0;
192 size_t _last_cmpl_batch = 0;
193 size_t _current_queue_length = 0;
194 };
195 // keep this between two structures with statistics
196 // this makes sure that they have at least one cache line
197 // between them, so hw prefetcher will not accidentally prefetch
198 // cache line used by another cpu.
199 metrics::metric_groups _metrics;
200 struct alignas(seastar::cache_line_size) {
201 size_t _received = 0;
202 size_t _last_rcv_batch = 0;
203 };
204 struct work_item : public task {
205 explicit work_item(smp_service_group ssg) : task(current_scheduling_group()), ssg(ssg) {}
206 smp_service_group ssg;
207 virtual ~work_item() {}
208 virtual void fail_with(std::exception_ptr) = 0;
209 void process();
210 virtual void complete() = 0;
211 };
212 template <typename Func>
213 struct async_work_item : work_item {
214 smp_message_queue& _queue;
215 Func _func;
216 using futurator = futurize<std::invoke_result_t<Func>>;
217 using future_type = typename futurator::type;
218 using value_type = typename future_type::value_type;
219 std::optional<value_type> _result;
220 std::exception_ptr _ex; // if !_result
221 typename futurator::promise_type _promise; // used on local side
222 async_work_item(smp_message_queue& queue, smp_service_group ssg, Func&& func) : work_item(ssg), _queue(queue), _func(std::move(func)) {}
223 virtual void fail_with(std::exception_ptr ex) override {
224 _promise.set_exception(std::move(ex));
225 }
226 virtual task* waiting_task() noexcept override {
227 // FIXME: waiting_tasking across shards is not implemented. Unsynchronized task access is unsafe.
228 return nullptr;
229 }
230 virtual void run_and_dispose() noexcept override {
231 // _queue.respond() below forwards the continuation chain back to the
232 // calling shard.
233 (void)futurator::invoke(this->_func).then_wrapped([this] (auto f) {
234 if (f.failed()) {
235 _ex = f.get_exception();
236 } else {
237 _result = f.get();
238 }
239 _queue.respond(this);
240 });
241 // We don't delete the task here as the creator of the work item will
242 // delete it on the origin shard.
243 }
244 virtual void complete() override {
245 if (_result) {
246 _promise.set_value(std::move(*_result));
247 } else {
248 // FIXME: _ex was allocated on another cpu
249 _promise.set_exception(std::move(_ex));
250 }
251 }
252 future_type get_future() { return _promise.get_future(); }
253 };
254 union tx_side {
255 tx_side() {}
256 ~tx_side() {}
257 void init() { new (&a) aa; }
258 struct aa {
259 std::deque<work_item*> pending_fifo;
260 } a;
261 } _tx;
262 std::vector<work_item*> _completed_fifo;
263 public:
264 smp_message_queue(reactor* from, reactor* to);
265 ~smp_message_queue();
266 template <typename Func>
267 futurize_t<std::invoke_result_t<Func>> submit(shard_id t, smp_submit_to_options options, Func&& func) noexcept {
268 memory::scoped_critical_alloc_section _;
269 auto wi = std::make_unique<async_work_item<Func>>(*this, options.service_group, std::forward<Func>(func));
270 auto fut = wi->get_future();
271 submit_item(t, options.timeout, std::move(wi));
272 return fut;
273 }
274 void start(unsigned cpuid);
275 template<size_t PrefetchCnt, typename Func>
276 size_t process_queue(lf_queue& q, Func process);
277 size_t process_incoming();
278 size_t process_completions(shard_id t);
279 void stop();
280 private:
281 void work();
282 void submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<work_item> wi);
283 void respond(work_item* wi);
284 void move_pending();
285 void flush_request_batch();
286 void flush_response_batch();
287 bool has_unflushed_responses() const;
288 bool pure_poll_rx() const;
289 bool pure_poll_tx() const;
290
291 friend class smp;
292 };
293
294 /// Configuration for the multicore aspect of seastar.
295 struct smp_options : public program_options::option_group {
296 /// Number of threads (default: one per CPU).
297 program_options::value<unsigned> smp;
298 /// CPUs to use (in cpuset(7) format; default: all)).
299 program_options::value<resource::cpuset> cpuset;
300 /// Memory to use, in bytes (ex: 4G) (default: all).
301 program_options::value<std::string> memory;
302 /// Memory reserved to OS (if \ref memory not specified).
303 program_options::value<std::string> reserve_memory;
304 /// Path to accessible hugetlbfs mount (typically /dev/hugepages/something).
305 program_options::value<std::string> hugepages;
306 /// Lock all memory (prevents swapping).
307 program_options::value<bool> lock_memory;
308 /// Pin threads to their cpus (disable for overprovisioning).
309 ///
310 /// Default: \p true.
311 program_options::value<bool> thread_affinity;
312 /// \brief Number of IO queues.
313 ///
314 /// Each IO unit will be responsible for a fraction of the IO requests.
315 /// Defaults to the number of threads
316 /// \note Unused when seastar is compiled without \p HWLOC support.
317 program_options::value<unsigned> num_io_queues;
318 /// \brief Number of IO groups.
319 ///
320 /// Each IO group will be responsible for a fraction of the IO requests.
321 /// Defaults to the number of NUMA nodes
322 /// \note Unused when seastar is compiled without \p HWLOC support.
323 program_options::value<unsigned> num_io_groups;
324 /// \brief Maximum amount of concurrent requests to be sent to the disk.
325 ///
326 /// Defaults to 128 times the number of IO queues
327 program_options::value<unsigned> max_io_requests;
328 /// Path to a YAML file describing the characteristics of the I/O Subsystem.
329 program_options::value<std::string> io_properties_file;
330 /// A YAML string describing the characteristics of the I/O Subsystem.
331 program_options::value<std::string> io_properties;
332 /// Enable mbind.
333 ///
334 /// Default: \p true.
335 program_options::value<bool> mbind;
336 /// Enable workaround for glibc/gcc c++ exception scalablity problem.
337 ///
338 /// Default: \p true.
339 /// \note Unused when seastar is compiled without the exception scaling support.
340 program_options::value<bool> enable_glibc_exception_scaling_workaround;
341 /// If some CPUs are found not to have any local NUMA nodes, allow assigning
342 /// them to remote ones.
343 /// \note Unused when seastar is compiled without \p HWLOC support.
344 program_options::value<bool> allow_cpus_in_remote_numa_nodes;
345
346 public:
347 smp_options(program_options::option_group* parent_group);
348 };
349
350 struct reactor_options;
351
352 class smp : public std::enable_shared_from_this<smp> {
353 alien::instance& _alien;
354 std::vector<posix_thread> _threads;
355 std::vector<std::function<void ()>> _thread_loops; // for dpdk
356 std::optional<boost::barrier> _all_event_loops_done;
357 struct qs_deleter {
358 void operator()(smp_message_queue** qs) const;
359 };
360 std::unique_ptr<smp_message_queue*[], qs_deleter> _qs_owner;
361 static thread_local smp_message_queue**_qs;
362 static thread_local std::thread::id _tmain;
363 bool _using_dpdk = false;
364
365 template <typename Func>
366 using returns_future = is_future<std::invoke_result_t<Func>>;
367 template <typename Func>
368 using returns_void = std::is_same<std::invoke_result_t<Func>, void>;
369 public:
370 explicit smp(alien::instance& alien) : _alien(alien) {}
371 void configure(const smp_options& smp_opts, const reactor_options& reactor_opts);
372 void cleanup() noexcept;
373 void cleanup_cpu();
374 void arrive_at_event_loop_end();
375 void join_all();
376 static bool main_thread() { return std::this_thread::get_id() == _tmain; }
377
378 /// Runs a function on a remote core.
379 ///
380 /// \param t designates the core to run the function on (may be a remote
381 /// core or the local core).
382 /// \param options an \ref smp_submit_to_options that contains options for this call.
383 /// \param func a callable to run on core \c t.
384 /// If \c func is a temporary object, its lifetime will be
385 /// extended by moving. This movement and the eventual
386 /// destruction of func are both done in the _calling_ core.
387 /// If \c func is a reference, the caller must guarantee that
388 /// it will survive the call.
389 /// \return whatever \c func returns, as a future<> (if \c func does not return a future,
390 /// submit_to() will wrap it in a future<>).
391 template <typename Func>
392 static futurize_t<std::invoke_result_t<Func>> submit_to(unsigned t, smp_submit_to_options options, Func&& func) noexcept {
393 using ret_type = std::invoke_result_t<Func>;
394 if (t == this_shard_id()) {
395 try {
396 if (!is_future<ret_type>::value) {
397 // Non-deferring function, so don't worry about func lifetime
398 return futurize<ret_type>::invoke(std::forward<Func>(func));
399 } else if (std::is_lvalue_reference<Func>::value) {
400 // func is an lvalue, so caller worries about its lifetime
401 return futurize<ret_type>::invoke(func);
402 } else {
403 // Deferring call on rvalue function, make sure to preserve it across call
404 auto w = std::make_unique<std::decay_t<Func>>(std::move(func));
405 auto ret = futurize<ret_type>::invoke(*w);
406 return ret.finally([w = std::move(w)] {});
407 }
408 } catch (...) {
409 // Consistently return a failed future rather than throwing, to simplify callers
410 return futurize<std::invoke_result_t<Func>>::make_exception_future(std::current_exception());
411 }
412 } else {
413 return _qs[t][this_shard_id()].submit(t, options, std::forward<Func>(func));
414 }
415 }
416 /// Runs a function on a remote core.
417 ///
418 /// Uses default_smp_service_group() to control resource allocation.
419 ///
420 /// \param t designates the core to run the function on (may be a remote
421 /// core or the local core).
422 /// \param func a callable to run on core \c t.
423 /// If \c func is a temporary object, its lifetime will be
424 /// extended by moving. This movement and the eventual
425 /// destruction of func are both done in the _calling_ core.
426 /// If \c func is a reference, the caller must guarantee that
427 /// it will survive the call.
428 /// \return whatever \c func returns, as a future<> (if \c func does not return a future,
429 /// submit_to() will wrap it in a future<>).
430 template <typename Func>
431 static futurize_t<std::invoke_result_t<Func>> submit_to(unsigned t, Func&& func) noexcept {
432 return submit_to(t, default_smp_service_group(), std::forward<Func>(func));
433 }
434 static bool poll_queues();
435 static bool pure_poll_queues();
436 static boost::integer_range<unsigned> all_cpus() noexcept {
437 return boost::irange(0u, count);
438 }
439 /// Invokes func on all shards.
440 ///
441 /// \param options the options to forward to the \ref smp::submit_to()
442 /// called behind the scenes.
443 /// \param func the function to be invoked on each shard. May return void or
444 /// future<>. Each async invocation will work with a separate copy
445 /// of \c func.
446 /// \returns a future that resolves when all async invocations finish.
447 template<typename Func>
448 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
449 static future<> invoke_on_all(smp_submit_to_options options, Func&& func) noexcept {
450 static_assert(std::is_same<future<>, typename futurize<std::invoke_result_t<Func>>::type>::value, "bad Func signature");
451 static_assert(std::is_nothrow_move_constructible_v<Func>);
452 return parallel_for_each(all_cpus(), [options, &func] (unsigned id) {
453 return smp::submit_to(id, options, Func(func));
454 });
455 }
456 /// Invokes func on all shards.
457 ///
458 /// \param func the function to be invoked on each shard. May return void or
459 /// future<>. Each async invocation will work with a separate copy
460 /// of \c func.
461 /// \returns a future that resolves when all async invocations finish.
462 ///
463 /// Passes the default \ref smp_submit_to_options to the
464 /// \ref smp::submit_to() called behind the scenes.
465 template<typename Func>
466 static future<> invoke_on_all(Func&& func) noexcept {
467 return invoke_on_all(smp_submit_to_options{}, std::forward<Func>(func));
468 }
469 /// Invokes func on all other shards.
470 ///
471 /// \param cpu_id the cpu on which **not** to run the function.
472 /// \param options the options to forward to the \ref smp::submit_to()
473 /// called behind the scenes.
474 /// \param func the function to be invoked on each shard. May return void or
475 /// future<>. Each async invocation will work with a separate copy
476 /// of \c func.
477 /// \returns a future that resolves when all async invocations finish.
478 template<typename Func>
479 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> &&
480 std::is_nothrow_copy_constructible_v<Func> )
481 static future<> invoke_on_others(unsigned cpu_id, smp_submit_to_options options, Func func) noexcept {
482 static_assert(std::is_same<future<>, typename futurize<std::invoke_result_t<Func>>::type>::value, "bad Func signature");
483 static_assert(std::is_nothrow_move_constructible_v<Func>);
484 return parallel_for_each(all_cpus(), [cpu_id, options, func = std::move(func)] (unsigned id) {
485 return id != cpu_id ? smp::submit_to(id, options, Func(func)) : make_ready_future<>();
486 });
487 }
488 /// Invokes func on all other shards.
489 ///
490 /// \param cpu_id the cpu on which **not** to run the function.
491 /// \param func the function to be invoked on each shard. May return void or
492 /// future<>. Each async invocation will work with a separate copy
493 /// of \c func.
494 /// \returns a future that resolves when all async invocations finish.
495 ///
496 /// Passes the default \ref smp_submit_to_options to the
497 /// \ref smp::submit_to() called behind the scenes.
498 template<typename Func>
499 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
500 static future<> invoke_on_others(unsigned cpu_id, Func func) noexcept {
501 return invoke_on_others(cpu_id, smp_submit_to_options{}, std::move(func));
502 }
503 /// Invokes func on all shards but the current one
504 ///
505 /// \param func the function to be invoked on each shard. May return void or
506 /// future<>. Each async invocation will work with a separate copy
507 /// of \c func.
508 /// \returns a future that resolves when all async invocations finish.
509 template<typename Func>
510 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
511 static future<> invoke_on_others(Func func) noexcept {
512 return invoke_on_others(this_shard_id(), std::move(func));
513 }
514 private:
515 void start_all_queues();
516 void pin(unsigned cpu_id);
517 void allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg);
518 void create_thread(std::function<void ()> thread_loop);
519 unsigned adjust_max_networking_aio_io_control_blocks(unsigned network_iocbs);
520 public:
521 static unsigned count;
522 };
523
524 }