]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/smp.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / smp.hh
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
f67539c2 25#include <seastar/core/loop.hh>
9f95a23c 26#include <seastar/core/semaphore.hh>
1e59de90 27#include <seastar/core/metrics_registration.hh>
9f95a23c
TL
28#include <seastar/core/posix.hh>
29#include <seastar/core/reactor_config.hh>
20effc67 30#include <seastar/core/resource.hh>
9f95a23c
TL
31#include <boost/lockfree/spsc_queue.hpp>
32#include <boost/thread/barrier.hpp>
33#include <boost/range/irange.hpp>
9f95a23c
TL
34#include <deque>
35#include <thread>
36
37/// \file
38
39namespace seastar {
40
41using shard_id = unsigned;
42
43class smp_service_group;
44class reactor_backend_selector;
45
20effc67
TL
46namespace alien {
47
48class instance;
49
50}
51
9f95a23c
TL
52namespace internal {
53
f67539c2
TL
54unsigned smp_service_group_id(smp_service_group ssg) noexcept;
55
56inline shard_id* this_shard_id_ptr() noexcept {
57 static thread_local shard_id g_this_shard_id;
58 return &g_this_shard_id;
59}
9f95a23c
TL
60
61}
62
63/// Returns shard_id of the of the current shard.
f67539c2
TL
64inline shard_id this_shard_id() noexcept {
65 return *internal::this_shard_id_ptr();
66}
9f95a23c
TL
67
68/// Configuration for smp_service_group objects.
69///
70/// \see create_smp_service_group()
71struct smp_service_group_config {
72 /// The maximum number of non-local requests that execute on a shard concurrently
73 ///
74 /// Will be adjusted upwards to allow at least one request per non-local shard.
75 unsigned max_nonlocal_requests = 0;
f67539c2
TL
76 /// An optional name for this smp group
77 ///
78 /// If this optional is engaged, timeout exception messages of the group's
79 /// semaphores will indicate the group's name.
80 std::optional<sstring> group_name;
9f95a23c
TL
81};
82
83/// A resource controller for cross-shard calls.
84///
85/// An smp_service_group allows you to limit the concurrency of
86/// smp::submit_to() and similar calls. While it's easy to limit
87/// the caller's concurrency (for example, by using a semaphore),
88/// the concurrency at the remote end can be multiplied by a factor
89/// of smp::count-1, which can be large.
90///
91/// The class is called a service _group_ because it can be used
92/// to group similar calls that share resource usage characteristics,
93/// need not be isolated from each other, but do need to be isolated
94/// from other groups. Calls in a group should not nest; doing so
95/// can result in ABA deadlocks.
96///
97/// Nested submit_to() calls must form a directed acyclic graph
98/// when considering their smp_service_groups as nodes. For example,
99/// if a call using ssg1 then invokes another call using ssg2, the
100/// internal call may not call again via either ssg1 or ssg2, or it
101/// may form a cycle (and risking an ABBA deadlock). Create a
102/// new smp_service_group_instead.
103class smp_service_group {
104 unsigned _id;
1e59de90
TL
105#ifdef SEASTAR_DEBUG
106 unsigned _version = 0;
107#endif
9f95a23c 108private:
f67539c2 109 explicit smp_service_group(unsigned id) noexcept : _id(id) {}
9f95a23c 110
f67539c2
TL
111 friend unsigned internal::smp_service_group_id(smp_service_group ssg) noexcept;
112 friend smp_service_group default_smp_service_group() noexcept;
113 friend future<smp_service_group> create_smp_service_group(smp_service_group_config ssgc) noexcept;
1e59de90 114 friend future<> destroy_smp_service_group(smp_service_group) noexcept;
9f95a23c
TL
115};
116
117inline
118unsigned
f67539c2 119internal::smp_service_group_id(smp_service_group ssg) noexcept {
9f95a23c
TL
120 return ssg._id;
121}
122
123/// Returns the default smp_service_group. This smp_service_group
124/// does not impose any limits on concurrency in the target shard.
125/// This makes is deadlock-safe, but can consume unbounded resources,
126/// and should therefore only be used when initiator concurrency is
127/// very low (e.g. administrative tasks).
f67539c2 128smp_service_group default_smp_service_group() noexcept;
9f95a23c
TL
129
130/// Creates an smp_service_group with the specified configuration.
131///
132/// The smp_service_group is global, and after this call completes,
133/// the returned value can be used on any shard.
f67539c2 134future<smp_service_group> create_smp_service_group(smp_service_group_config ssgc) noexcept;
9f95a23c
TL
135
136/// Destroy an smp_service_group.
137///
138/// Frees all resources used by an smp_service_group. It must not
139/// be used again once this function is called.
f67539c2 140future<> destroy_smp_service_group(smp_service_group ssg) noexcept;
9f95a23c
TL
141
142inline
f67539c2 143smp_service_group default_smp_service_group() noexcept {
9f95a23c
TL
144 return smp_service_group(0);
145}
146
147using smp_timeout_clock = lowres_clock;
148using smp_service_group_semaphore = basic_semaphore<named_semaphore_exception_factory, smp_timeout_clock>;
149using smp_service_group_semaphore_units = semaphore_units<named_semaphore_exception_factory, smp_timeout_clock>;
150
151static constexpr smp_timeout_clock::time_point smp_no_timeout = smp_timeout_clock::time_point::max();
152
153/// Options controlling the behaviour of \ref smp::submit_to().
154struct smp_submit_to_options {
155 /// Controls resource allocation.
156 smp_service_group service_group = default_smp_service_group();
157 /// The timeout is relevant only to the time the call spends waiting to be
158 /// processed by the remote shard, and *not* to the time it takes to be
159 /// executed there.
160 smp_timeout_clock::time_point timeout = smp_no_timeout;
161
f67539c2 162 smp_submit_to_options(smp_service_group service_group = default_smp_service_group(), smp_timeout_clock::time_point timeout = smp_no_timeout) noexcept
9f95a23c
TL
163 : service_group(service_group)
164 , timeout(timeout) {
165 }
166};
167
168void init_default_smp_service_group(shard_id cpu);
169
f67539c2 170smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t) noexcept;
9f95a23c
TL
171
172class smp_message_queue {
173 static constexpr size_t queue_length = 128;
174 static constexpr size_t batch_size = 16;
175 static constexpr size_t prefetch_cnt = 2;
176 struct work_item;
177 struct lf_queue_remote {
178 reactor* remote;
179 };
180 using lf_queue_base = boost::lockfree::spsc_queue<work_item*,
181 boost::lockfree::capacity<queue_length>>;
182 // use inheritence to control placement order
183 struct lf_queue : lf_queue_remote, lf_queue_base {
184 lf_queue(reactor* remote) : lf_queue_remote{remote} {}
185 void maybe_wakeup();
186 ~lf_queue();
187 };
188 lf_queue _pending;
189 lf_queue _completed;
190 struct alignas(seastar::cache_line_size) {
191 size_t _sent = 0;
192 size_t _compl = 0;
193 size_t _last_snt_batch = 0;
194 size_t _last_cmpl_batch = 0;
195 size_t _current_queue_length = 0;
196 };
197 // keep this between two structures with statistics
198 // this makes sure that they have at least one cache line
199 // between them, so hw prefetcher will not accidentally prefetch
200 // cache line used by another cpu.
201 metrics::metric_groups _metrics;
202 struct alignas(seastar::cache_line_size) {
203 size_t _received = 0;
204 size_t _last_rcv_batch = 0;
205 };
206 struct work_item : public task {
207 explicit work_item(smp_service_group ssg) : task(current_scheduling_group()), ssg(ssg) {}
208 smp_service_group ssg;
209 virtual ~work_item() {}
210 virtual void fail_with(std::exception_ptr) = 0;
211 void process();
212 virtual void complete() = 0;
213 };
214 template <typename Func>
215 struct async_work_item : work_item {
216 smp_message_queue& _queue;
217 Func _func;
20effc67 218 using futurator = futurize<std::invoke_result_t<Func>>;
9f95a23c
TL
219 using future_type = typename futurator::type;
220 using value_type = typename future_type::value_type;
f67539c2 221 std::optional<value_type> _result;
9f95a23c
TL
222 std::exception_ptr _ex; // if !_result
223 typename futurator::promise_type _promise; // used on local side
224 async_work_item(smp_message_queue& queue, smp_service_group ssg, Func&& func) : work_item(ssg), _queue(queue), _func(std::move(func)) {}
225 virtual void fail_with(std::exception_ptr ex) override {
226 _promise.set_exception(std::move(ex));
227 }
f67539c2
TL
228 virtual task* waiting_task() noexcept override {
229 // FIXME: waiting_tasking across shards is not implemented. Unsynchronized task access is unsafe.
230 return nullptr;
231 }
9f95a23c
TL
232 virtual void run_and_dispose() noexcept override {
233 // _queue.respond() below forwards the continuation chain back to the
234 // calling shard.
f67539c2 235 (void)futurator::invoke(this->_func).then_wrapped([this] (auto f) {
9f95a23c
TL
236 if (f.failed()) {
237 _ex = f.get_exception();
238 } else {
239 _result = f.get();
240 }
241 _queue.respond(this);
242 });
243 // We don't delete the task here as the creator of the work item will
244 // delete it on the origin shard.
245 }
246 virtual void complete() override {
247 if (_result) {
248 _promise.set_value(std::move(*_result));
249 } else {
250 // FIXME: _ex was allocated on another cpu
251 _promise.set_exception(std::move(_ex));
252 }
253 }
254 future_type get_future() { return _promise.get_future(); }
255 };
256 union tx_side {
257 tx_side() {}
258 ~tx_side() {}
259 void init() { new (&a) aa; }
260 struct aa {
261 std::deque<work_item*> pending_fifo;
262 } a;
263 } _tx;
264 std::vector<work_item*> _completed_fifo;
265public:
266 smp_message_queue(reactor* from, reactor* to);
267 ~smp_message_queue();
268 template <typename Func>
20effc67 269 futurize_t<std::invoke_result_t<Func>> submit(shard_id t, smp_submit_to_options options, Func&& func) noexcept {
f67539c2 270 memory::scoped_critical_alloc_section _;
9f95a23c
TL
271 auto wi = std::make_unique<async_work_item<Func>>(*this, options.service_group, std::forward<Func>(func));
272 auto fut = wi->get_future();
273 submit_item(t, options.timeout, std::move(wi));
274 return fut;
275 }
276 void start(unsigned cpuid);
277 template<size_t PrefetchCnt, typename Func>
278 size_t process_queue(lf_queue& q, Func process);
279 size_t process_incoming();
280 size_t process_completions(shard_id t);
281 void stop();
282private:
283 void work();
284 void submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<work_item> wi);
285 void respond(work_item* wi);
286 void move_pending();
287 void flush_request_batch();
288 void flush_response_batch();
289 bool has_unflushed_responses() const;
290 bool pure_poll_rx() const;
291 bool pure_poll_tx() const;
292
293 friend class smp;
294};
295
1e59de90 296class smp_message_queue;
20effc67 297struct reactor_options;
1e59de90 298struct smp_options;
20effc67
TL
299
300class smp : public std::enable_shared_from_this<smp> {
301 alien::instance& _alien;
302 std::vector<posix_thread> _threads;
303 std::vector<std::function<void ()>> _thread_loops; // for dpdk
304 std::optional<boost::barrier> _all_event_loops_done;
9f95a23c
TL
305 struct qs_deleter {
306 void operator()(smp_message_queue** qs) const;
307 };
20effc67
TL
308 std::unique_ptr<smp_message_queue*[], qs_deleter> _qs_owner;
309 static thread_local smp_message_queue**_qs;
310 static thread_local std::thread::id _tmain;
311 bool _using_dpdk = false;
9f95a23c
TL
312
313 template <typename Func>
20effc67 314 using returns_future = is_future<std::invoke_result_t<Func>>;
9f95a23c 315 template <typename Func>
20effc67 316 using returns_void = std::is_same<std::invoke_result_t<Func>, void>;
9f95a23c 317public:
20effc67
TL
318 explicit smp(alien::instance& alien) : _alien(alien) {}
319 void configure(const smp_options& smp_opts, const reactor_options& reactor_opts);
320 void cleanup() noexcept;
321 void cleanup_cpu();
322 void arrive_at_event_loop_end();
323 void join_all();
9f95a23c
TL
324 static bool main_thread() { return std::this_thread::get_id() == _tmain; }
325
326 /// Runs a function on a remote core.
327 ///
328 /// \param t designates the core to run the function on (may be a remote
329 /// core or the local core).
330 /// \param options an \ref smp_submit_to_options that contains options for this call.
331 /// \param func a callable to run on core \c t.
332 /// If \c func is a temporary object, its lifetime will be
333 /// extended by moving. This movement and the eventual
334 /// destruction of func are both done in the _calling_ core.
335 /// If \c func is a reference, the caller must guarantee that
336 /// it will survive the call.
337 /// \return whatever \c func returns, as a future<> (if \c func does not return a future,
338 /// submit_to() will wrap it in a future<>).
339 template <typename Func>
20effc67
TL
340 static futurize_t<std::invoke_result_t<Func>> submit_to(unsigned t, smp_submit_to_options options, Func&& func) noexcept {
341 using ret_type = std::invoke_result_t<Func>;
9f95a23c
TL
342 if (t == this_shard_id()) {
343 try {
344 if (!is_future<ret_type>::value) {
345 // Non-deferring function, so don't worry about func lifetime
f67539c2 346 return futurize<ret_type>::invoke(std::forward<Func>(func));
9f95a23c
TL
347 } else if (std::is_lvalue_reference<Func>::value) {
348 // func is an lvalue, so caller worries about its lifetime
f67539c2 349 return futurize<ret_type>::invoke(func);
9f95a23c
TL
350 } else {
351 // Deferring call on rvalue function, make sure to preserve it across call
352 auto w = std::make_unique<std::decay_t<Func>>(std::move(func));
f67539c2 353 auto ret = futurize<ret_type>::invoke(*w);
9f95a23c
TL
354 return ret.finally([w = std::move(w)] {});
355 }
356 } catch (...) {
357 // Consistently return a failed future rather than throwing, to simplify callers
20effc67 358 return futurize<std::invoke_result_t<Func>>::make_exception_future(std::current_exception());
9f95a23c
TL
359 }
360 } else {
361 return _qs[t][this_shard_id()].submit(t, options, std::forward<Func>(func));
362 }
363 }
364 /// Runs a function on a remote core.
365 ///
366 /// Uses default_smp_service_group() to control resource allocation.
367 ///
368 /// \param t designates the core to run the function on (may be a remote
369 /// core or the local core).
370 /// \param func a callable to run on core \c t.
371 /// If \c func is a temporary object, its lifetime will be
372 /// extended by moving. This movement and the eventual
373 /// destruction of func are both done in the _calling_ core.
374 /// If \c func is a reference, the caller must guarantee that
375 /// it will survive the call.
376 /// \return whatever \c func returns, as a future<> (if \c func does not return a future,
377 /// submit_to() will wrap it in a future<>).
378 template <typename Func>
20effc67 379 static futurize_t<std::invoke_result_t<Func>> submit_to(unsigned t, Func&& func) noexcept {
9f95a23c
TL
380 return submit_to(t, default_smp_service_group(), std::forward<Func>(func));
381 }
382 static bool poll_queues();
383 static bool pure_poll_queues();
f67539c2 384 static boost::integer_range<unsigned> all_cpus() noexcept {
9f95a23c
TL
385 return boost::irange(0u, count);
386 }
387 /// Invokes func on all shards.
388 ///
389 /// \param options the options to forward to the \ref smp::submit_to()
390 /// called behind the scenes.
391 /// \param func the function to be invoked on each shard. May return void or
392 /// future<>. Each async invocation will work with a separate copy
393 /// of \c func.
394 /// \returns a future that resolves when all async invocations finish.
395 template<typename Func>
f67539c2
TL
396 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
397 static future<> invoke_on_all(smp_submit_to_options options, Func&& func) noexcept {
20effc67 398 static_assert(std::is_same<future<>, typename futurize<std::invoke_result_t<Func>>::type>::value, "bad Func signature");
f67539c2 399 static_assert(std::is_nothrow_move_constructible_v<Func>);
9f95a23c
TL
400 return parallel_for_each(all_cpus(), [options, &func] (unsigned id) {
401 return smp::submit_to(id, options, Func(func));
402 });
403 }
404 /// Invokes func on all shards.
405 ///
406 /// \param func the function to be invoked on each shard. May return void or
407 /// future<>. Each async invocation will work with a separate copy
408 /// of \c func.
409 /// \returns a future that resolves when all async invocations finish.
410 ///
411 /// Passes the default \ref smp_submit_to_options to the
412 /// \ref smp::submit_to() called behind the scenes.
413 template<typename Func>
f67539c2 414 static future<> invoke_on_all(Func&& func) noexcept {
9f95a23c
TL
415 return invoke_on_all(smp_submit_to_options{}, std::forward<Func>(func));
416 }
417 /// Invokes func on all other shards.
418 ///
419 /// \param cpu_id the cpu on which **not** to run the function.
420 /// \param options the options to forward to the \ref smp::submit_to()
421 /// called behind the scenes.
422 /// \param func the function to be invoked on each shard. May return void or
423 /// future<>. Each async invocation will work with a separate copy
424 /// of \c func.
425 /// \returns a future that resolves when all async invocations finish.
426 template<typename Func>
20effc67
TL
427 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> &&
428 std::is_nothrow_copy_constructible_v<Func> )
f67539c2 429 static future<> invoke_on_others(unsigned cpu_id, smp_submit_to_options options, Func func) noexcept {
20effc67 430 static_assert(std::is_same<future<>, typename futurize<std::invoke_result_t<Func>>::type>::value, "bad Func signature");
f67539c2 431 static_assert(std::is_nothrow_move_constructible_v<Func>);
9f95a23c 432 return parallel_for_each(all_cpus(), [cpu_id, options, func = std::move(func)] (unsigned id) {
20effc67 433 return id != cpu_id ? smp::submit_to(id, options, Func(func)) : make_ready_future<>();
9f95a23c
TL
434 });
435 }
436 /// Invokes func on all other shards.
437 ///
438 /// \param cpu_id the cpu on which **not** to run the function.
439 /// \param func the function to be invoked on each shard. May return void or
440 /// future<>. Each async invocation will work with a separate copy
441 /// of \c func.
442 /// \returns a future that resolves when all async invocations finish.
443 ///
444 /// Passes the default \ref smp_submit_to_options to the
445 /// \ref smp::submit_to() called behind the scenes.
446 template<typename Func>
20effc67 447 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
f67539c2 448 static future<> invoke_on_others(unsigned cpu_id, Func func) noexcept {
9f95a23c
TL
449 return invoke_on_others(cpu_id, smp_submit_to_options{}, std::move(func));
450 }
20effc67
TL
451 /// Invokes func on all shards but the current one
452 ///
453 /// \param func the function to be invoked on each shard. May return void or
454 /// future<>. Each async invocation will work with a separate copy
455 /// of \c func.
456 /// \returns a future that resolves when all async invocations finish.
457 template<typename Func>
458 SEASTAR_CONCEPT( requires std::is_nothrow_move_constructible_v<Func> )
459 static future<> invoke_on_others(Func func) noexcept {
460 return invoke_on_others(this_shard_id(), std::move(func));
461 }
9f95a23c 462private:
20effc67
TL
463 void start_all_queues();
464 void pin(unsigned cpu_id);
465 void allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg);
466 void create_thread(std::function<void ()> thread_loop);
467 unsigned adjust_max_networking_aio_io_control_blocks(unsigned network_iocbs);
9f95a23c
TL
468public:
469 static unsigned count;
470};
471
472}