]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/sharded.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / sharded.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
f67539c2
TL
24#include <seastar/core/smp.hh>
25#include <seastar/core/loop.hh>
26#include <seastar/core/map_reduce.hh>
11fdf7f2 27#include <seastar/util/is_smart_ptr.hh>
9f95a23c 28#include <seastar/util/tuple_utils.hh>
11fdf7f2 29#include <seastar/core/do_with.hh>
f67539c2 30#include <seastar/util/concepts.hh>
20effc67 31#include <seastar/util/log.hh>
11fdf7f2
TL
32#include <boost/iterator/counting_iterator.hpp>
33#include <functional>
f67539c2
TL
34#if __has_include(<concepts>)
35#include <concepts>
36#endif
11fdf7f2 37
9f95a23c
TL
38/// \defgroup smp-module Multicore
39///
40/// \brief Support for exploiting multiple cores on a server.
41///
42/// Seastar supports multicore servers by using *sharding*. Each logical
43/// core (lcore) runs a separate event loop, with its own memory allocator,
44/// TCP/IP stack, and other services. Shards communicate by explicit message
45/// passing, rather than using locks and condition variables as with traditional
46/// threaded programming.
47
11fdf7f2
TL
48namespace seastar {
49
f67539c2
TL
50template <typename Func, typename... Param>
51class sharded_parameter;
52
1e59de90
TL
53template <typename Service>
54class sharded;
55
f67539c2
TL
56namespace internal {
57
58template <typename Func, typename... Param>
59auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
60
61using on_each_shard_func = std::function<future<> (unsigned shard)>;
62
63future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
64
1e59de90
TL
65template <typename Service>
66class either_sharded_or_local {
67 sharded<Service>& _sharded;
68public:
69 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
70 operator sharded<Service>& ();
71 operator Service& ();
72};
73
74template <typename T>
75struct sharded_unwrap {
76 using evaluated_type = T;
77 using type = T;
78};
79
80template <typename T>
81struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
82 using evaluated_type = T&;
83 using type = either_sharded_or_local<T>;
84};
85
86template <typename Func, typename... Param>
87struct sharded_unwrap<sharded_parameter<Func, Param...>> {
88 using type = std::invoke_result_t<Func, Param...>;
89};
90
91template <typename T>
92using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
93
94template <typename T>
95using sharded_unwrap_t = typename sharded_unwrap<T>::type;
96
97} // internal
98
f67539c2 99
9f95a23c
TL
100/// \addtogroup smp-module
101/// @{
102
11fdf7f2
TL
103template <typename T>
104class sharded;
105
1e59de90
TL
106/// If a sharded service inherits from this class, sharded::stop() will wait
107/// until all references to this service on each shard are released before
9f95a23c
TL
108/// returning. It is still service's own responsibility to track its references
109/// in asynchronous code by calling shared_from_this() and keeping returned smart
11fdf7f2
TL
110/// pointer as long as object is in use.
111template<typename T>
112class async_sharded_service : public enable_shared_from_this<T> {
113protected:
114 std::function<void()> _delete_cb;
f67539c2 115 async_sharded_service() noexcept = default;
11fdf7f2
TL
116 virtual ~async_sharded_service() {
117 if (_delete_cb) {
118 _delete_cb();
119 }
120 }
121 template <typename Service> friend class sharded;
122};
123
124
125/// \brief Provide a sharded service with access to its peers
126///
127/// If a service class inherits from this, it will gain a \code container()
9f95a23c
TL
128/// \endcode method that provides access to the \ref sharded object, with which
129/// it can call its peers.
11fdf7f2
TL
130template <typename Service>
131class peering_sharded_service {
132 sharded<Service>* _container = nullptr;
133private:
134 template <typename T> friend class sharded;
f67539c2 135 void set_container(sharded<Service>* container) noexcept { _container = container; }
11fdf7f2 136public:
f67539c2
TL
137 peering_sharded_service() noexcept = default;
138 peering_sharded_service(peering_sharded_service<Service>&&) noexcept = default;
11fdf7f2
TL
139 peering_sharded_service(const peering_sharded_service<Service>&) = delete;
140 peering_sharded_service& operator=(const peering_sharded_service<Service>&) = delete;
f67539c2
TL
141 sharded<Service>& container() noexcept { return *_container; }
142 const sharded<Service>& container() const noexcept { return *_container; }
11fdf7f2
TL
143};
144
145
146/// Exception thrown when a \ref sharded object does not exist
147class no_sharded_instance_exception : public std::exception {
20effc67 148 sstring _msg;
11fdf7f2 149public:
20effc67
TL
150 no_sharded_instance_exception() : _msg("sharded instance does not exist") {}
151 explicit no_sharded_instance_exception(sstring type_info)
152 : _msg("sharded instance does not exist: " + type_info) {}
11fdf7f2 153 virtual const char* what() const noexcept override {
20effc67 154 return _msg.c_str();
11fdf7f2
TL
155 }
156};
157
11fdf7f2
TL
158/// Template helper to distribute a service across all logical cores.
159///
160/// The \c sharded template manages a sharded service, by creating
161/// a copy of the service on each logical core, providing mechanisms to communicate
162/// with each shard's copy, and a way to stop the service.
163///
164/// \tparam Service a class to be instantiated on each core. Must expose
165/// a \c stop() method that returns a \c future<>, to be called when
166/// the service is stopped.
167template <typename Service>
168class sharded {
169 struct entry {
170 shared_ptr<Service> service;
171 promise<> freed;
172 };
173 std::vector<entry> _instances;
174private:
175 using invoke_on_all_func_type = std::function<future<> (Service&)>;
176private:
f67539c2
TL
177 void service_deleted() noexcept {
178 _instances[this_shard_id()].freed.set_value();
11fdf7f2
TL
179 }
180 template <typename U, bool async>
181 friend struct shared_ptr_make_helper;
182
183 template <typename T>
184 std::enable_if_t<std::is_base_of<peering_sharded_service<T>, T>::value>
f67539c2 185 set_container(T& service) noexcept {
11fdf7f2
TL
186 service.set_container(this);
187 }
188
189 template <typename T>
190 std::enable_if_t<!std::is_base_of<peering_sharded_service<T>, T>::value>
1e59de90 191 set_container(T&) noexcept {
f67539c2
TL
192 }
193
194 future<>
195 sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
196 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
11fdf7f2
TL
197 }
198public:
199 /// Constructs an empty \c sharded object. No instances of the service are
200 /// created.
f67539c2 201 sharded() noexcept {}
11fdf7f2 202 sharded(const sharded& other) = delete;
11fdf7f2 203 sharded& operator=(const sharded& other) = delete;
f67539c2
TL
204 /// Sharded object with T that inherits from peering_sharded_service
205 /// cannot be moved safely, so disable move operations.
206 sharded(sharded&& other) = delete;
207 sharded& operator=(sharded&& other) = delete;
11fdf7f2
TL
208 /// Destroyes a \c sharded object. Must not be in a started state.
209 ~sharded();
210
211 /// Starts \c Service by constructing an instance on every logical core
212 /// with a copy of \c args passed to the constructor.
213 ///
214 /// \param args Arguments to be forwarded to \c Service constructor
f67539c2 215 /// \return a \ref seastar::future<> that becomes ready when all instances have been
11fdf7f2
TL
216 /// constructed.
217 template <typename... Args>
f67539c2 218 future<> start(Args&&... args) noexcept;
11fdf7f2
TL
219
220 /// Starts \c Service by constructing an instance on a single logical core
221 /// with a copy of \c args passed to the constructor.
222 ///
223 /// \param args Arguments to be forwarded to \c Service constructor
f67539c2 224 /// \return a \ref seastar::future<> that becomes ready when the instance has been
11fdf7f2
TL
225 /// constructed.
226 template <typename... Args>
f67539c2 227 future<> start_single(Args&&... args) noexcept;
11fdf7f2
TL
228
229 /// Stops all started instances and destroys them.
230 ///
231 /// For every started instance, its \c stop() method is called, and then
232 /// it is destroyed.
f67539c2 233 future<> stop() noexcept;
11fdf7f2 234
f67539c2 235 /// Invoke a type-erased function on all instances of `Service`.
9f95a23c
TL
236 /// The return value becomes ready when all instances have processed
237 /// the message.
238 ///
239 /// \param options the options to forward to the \ref smp::submit_to()
240 /// called behind the scenes.
241 /// \param func Function to be invoked on all shards
242 /// \return Future that becomes ready once all calls have completed
f67539c2 243 future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept;
9f95a23c 244
f67539c2 245 /// Invoke a type-erased function on all instances of `Service`.
9f95a23c
TL
246 /// The return value becomes ready when all instances have processed
247 /// the message.
248 /// Passes the default \ref smp_submit_to_options to the
249 /// \ref smp::submit_to() called behind the scenes.
f67539c2
TL
250 future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept {
251 try {
9f95a23c 252 return invoke_on_all(smp_submit_to_options{}, std::move(func));
f67539c2
TL
253 } catch (...) {
254 return current_exception_as_future();
255 }
9f95a23c
TL
256 }
257
f67539c2 258 /// Invoke a function on all instances of `Service`.
9f95a23c 259 /// The return value becomes ready when all instances have processed
f67539c2
TL
260 /// the message. The function can be a member pointer to function,
261 /// a free function, or a functor. The first argument of the function
262 /// will be a reference to the local service on the shard.
263 ///
264 /// For a non-static pointer-to-member-function, the first argument
265 /// becomes `this`, not the first declared parameter.
9f95a23c
TL
266 ///
267 /// \param options the options to forward to the \ref smp::submit_to()
268 /// called behind the scenes.
f67539c2
TL
269 /// \param func invocable accepting a `Service&` as the first parameter
270 /// to be invoked on all shards
9f95a23c 271 /// \return Future that becomes ready once all calls have completed
f67539c2 272 template <typename Func, typename... Args>
1e59de90 273 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
f67539c2 274 future<> invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept;
11fdf7f2 275
f67539c2 276 /// Invoke a function on all instances of `Service`.
9f95a23c
TL
277 /// The return value becomes ready when all instances have processed
278 /// the message.
279 /// Passes the default \ref smp_submit_to_options to the
280 /// \ref smp::submit_to() called behind the scenes.
f67539c2 281 template <typename Func, typename... Args>
1e59de90 282 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
f67539c2
TL
283 future<> invoke_on_all(Func func, Args... args) noexcept {
284 try {
285 return invoke_on_all(smp_submit_to_options{}, std::move(func), std::move(args)...);
286 } catch (...) {
287 return current_exception_as_future();
288 }
9f95a23c 289 }
11fdf7f2
TL
290
291 /// Invoke a callable on all instances of \c Service except the instance
292 /// which is allocated on current shard.
293 ///
9f95a23c
TL
294 /// \param options the options to forward to the \ref smp::submit_to()
295 /// called behind the scenes.
11fdf7f2
TL
296 /// \param func a callable with the signature `void (Service&)`
297 /// or `future<> (Service&)`, to be called on each core
298 /// with the local instance as an argument.
299 /// \return a `future<>` that becomes ready when all cores but the current one have
300 /// processed the message.
f67539c2
TL
301 template <typename Func, typename... Args>
302 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
303 future<> invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept;
9f95a23c
TL
304
305 /// Invoke a callable on all instances of \c Service except the instance
306 /// which is allocated on current shard.
307 ///
308 /// \param func a callable with the signature `void (Service&)`
309 /// or `future<> (Service&)`, to be called on each core
310 /// with the local instance as an argument.
311 /// \return a `future<>` that becomes ready when all cores but the current one have
312 /// processed the message.
313 ///
314 /// Passes the default \ref smp_submit_to_options to the
315 /// \ref smp::submit_to() called behind the scenes.
f67539c2
TL
316 template <typename Func, typename... Args>
317 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
318 future<> invoke_on_others(Func func, Args... args) noexcept {
319 try {
320 return invoke_on_others(smp_submit_to_options{}, std::move(func), std::move(args)...);
321 } catch (...) {
322 return current_exception_as_future();
323 }
9f95a23c 324 }
11fdf7f2 325
11fdf7f2
TL
326 /// Invoke a callable on all instances of `Service` and reduce the results using
327 /// `Reducer`.
328 ///
329 /// \see map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
1e59de90 330 template <typename Reducer, typename Func, typename... Args>
11fdf7f2 331 inline
1e59de90 332 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
11fdf7f2
TL
333 {
334 return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
335 boost::make_counting_iterator<unsigned>(_instances.size()),
1e59de90
TL
336 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
337 return smp::submit_to(c, [this, &func, args] () mutable {
338 return std::apply([this, &func] (Args&&... args) mutable {
339 auto inst = get_local_service();
340 return std::invoke(func, *inst, std::forward<Args>(args)...);
341 }, std::move(args));
11fdf7f2
TL
342 });
343 }, std::forward<Reducer>(r));
344 }
345
20effc67 346 /// The const version of \ref map_reduce(Reducer&& r, Func&& func)
1e59de90 347 template <typename Reducer, typename Func, typename... Args>
20effc67 348 inline
1e59de90 349 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) const -> typename reducer_traits<Reducer>::future_type
20effc67
TL
350 {
351 return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
352 boost::make_counting_iterator<unsigned>(_instances.size()),
1e59de90
TL
353 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) {
354 return smp::submit_to(c, [this, &func, args] () {
355 return std::apply([this, &func] (Args&&... args) {
356 auto inst = get_local_service();
357 return std::invoke(func, *inst, std::forward<Args>(args)...);
358 }, std::move(args));
20effc67
TL
359 });
360 }, std::forward<Reducer>(r));
361 }
362
11fdf7f2
TL
363 /// Applies a map function to all shards, then reduces the output by calling a reducer function.
364 ///
365 /// \param map callable with the signature `Value (Service&)` or
366 /// `future<Value> (Service&)` (for some `Value` type).
367 /// used as the second input to \c reduce
368 /// \param initial initial value used as the first input to \c reduce.
369 /// \param reduce binary function used to left-fold the return values of \c map
370 /// into \c initial .
371 ///
372 /// Each \c map invocation runs on the shard associated with the service.
373 ///
374 /// \tparam Mapper unary function taking `Service&` and producing some result.
375 /// \tparam Initial any value type
376 /// \tparam Reduce a binary function taking two Initial values and returning an Initial
f67539c2 377 /// \return Result of invoking `map` with each instance in parallel, reduced by calling
11fdf7f2
TL
378 /// `reduce()` on each adjacent pair of results.
379 template <typename Mapper, typename Initial, typename Reduce>
380 inline
381 future<Initial>
382 map_reduce0(Mapper map, Initial initial, Reduce reduce) {
383 auto wrapped_map = [this, map] (unsigned c) {
384 return smp::submit_to(c, [this, map] {
385 auto inst = get_local_service();
1e59de90 386 return std::invoke(map, *inst);
11fdf7f2
TL
387 });
388 };
389 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
390 std::move(wrapped_map),
391 std::move(initial),
392 std::move(reduce));
393 }
394
20effc67
TL
395 /// The const version of \ref map_reduce0(Mapper map, Initial initial, Reduce reduce)
396 template <typename Mapper, typename Initial, typename Reduce>
397 inline
398 future<Initial>
399 map_reduce0(Mapper map, Initial initial, Reduce reduce) const {
400 auto wrapped_map = [this, map] (unsigned c) {
401 return smp::submit_to(c, [this, map] {
402 auto inst = get_local_service();
1e59de90 403 return std::invoke(map, *inst);
20effc67
TL
404 });
405 };
406 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
407 std::move(wrapped_map),
408 std::move(initial),
409 std::move(reduce));
410 }
411
11fdf7f2
TL
412 /// Applies a map function to all shards, and return a vector of the result.
413 ///
414 /// \param mapper callable with the signature `Value (Service&)` or
415 /// `future<Value> (Service&)` (for some `Value` type).
416 ///
417 /// Each \c map invocation runs on the shard associated with the service.
418 ///
419 /// \tparam Mapper unary function taking `Service&` and producing some result.
f67539c2 420 /// \return Result vector of invoking `map` with each instance in parallel
20effc67 421 template <typename Mapper, typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::tuple_type>()))>
11fdf7f2 422 inline future<std::vector<return_type>> map(Mapper mapper) {
1e59de90
TL
423 return do_with(std::vector<return_type>(), std::move(mapper),
424 [this] (std::vector<return_type>& vec, Mapper& mapper) mutable {
425 vec.resize(_instances.size());
426 return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [this, &vec, &mapper] (unsigned c) {
427 return smp::submit_to(c, [this, &mapper] {
11fdf7f2
TL
428 auto inst = get_local_service();
429 return mapper(*inst);
1e59de90
TL
430 }).then([&vec, c] (auto&& res) {
431 vec[c] = std::move(res);
11fdf7f2
TL
432 });
433 }).then([&vec] {
434 return make_ready_future<std::vector<return_type>>(std::move(vec));
435 });
436 });
437 }
438
11fdf7f2
TL
439 /// Invoke a callable on a specific instance of `Service`.
440 ///
441 /// \param id shard id to call
9f95a23c
TL
442 /// \param options the options to forward to the \ref smp::submit_to()
443 /// called behind the scenes.
f67539c2
TL
444 /// \param func a callable with signature `Value (Service&, Args...)` or
445 /// `future<Value> (Service&, Args...)` (for some `Value` type), or a pointer
446 /// to a member function of Service
447 /// \param args parameters to the callable; will be copied or moved. To pass by reference,
448 /// use std::ref().
449 ///
11fdf7f2 450 /// \return result of calling `func(instance)` on the designated instance
f67539c2
TL
451 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
452 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
11fdf7f2 453 Ret
f67539c2
TL
454 invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args) {
455 return smp::submit_to(id, options, [this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] () mutable {
11fdf7f2 456 auto inst = get_local_service();
f67539c2 457 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
11fdf7f2
TL
458 });
459 }
460
9f95a23c
TL
461 /// Invoke a callable on a specific instance of `Service`.
462 ///
463 /// \param id shard id to call
464 /// \param func a callable with signature `Value (Service&)` or
f67539c2
TL
465 /// `future<Value> (Service&)` (for some `Value` type), or a pointer
466 /// to a member function of Service
467 /// \param args parameters to the callable
9f95a23c 468 /// \return result of calling `func(instance)` on the designated instance
f67539c2
TL
469 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
470 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
9f95a23c 471 Ret
f67539c2
TL
472 invoke_on(unsigned id, Func&& func, Args&&... args) {
473 return invoke_on(id, smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
9f95a23c
TL
474 }
475
11fdf7f2 476 /// Gets a reference to the local instance.
f67539c2 477 const Service& local() const noexcept;
11fdf7f2
TL
478
479 /// Gets a reference to the local instance.
f67539c2 480 Service& local() noexcept;
11fdf7f2
TL
481
482 /// Gets a shared pointer to the local instance.
f67539c2 483 shared_ptr<Service> local_shared() noexcept;
11fdf7f2
TL
484
485 /// Checks whether the local instance has been initialized.
f67539c2 486 bool local_is_initialized() const noexcept;
11fdf7f2
TL
487
488private:
1e59de90 489 void track_deletion(shared_ptr<Service>&, std::false_type) noexcept {
11fdf7f2
TL
490 // do not wait for instance to be deleted since it is not going to notify us
491 service_deleted();
492 }
493
494 void track_deletion(shared_ptr<Service>& s, std::true_type) {
495 s->_delete_cb = std::bind(std::mem_fn(&sharded<Service>::service_deleted), this);
496 }
497
498 template <typename... Args>
499 shared_ptr<Service> create_local_service(Args&&... args) {
500 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
501 set_container(*s);
502 track_deletion(s, std::is_base_of<async_sharded_service<Service>, Service>());
503 return s;
504 }
505
506 shared_ptr<Service> get_local_service() {
f67539c2 507 auto inst = _instances[this_shard_id()].service;
11fdf7f2 508 if (!inst) {
20effc67
TL
509 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
510 }
511 return inst;
512 }
513
514 shared_ptr<const Service> get_local_service() const {
515 auto inst = _instances[this_shard_id()].service;
516 if (!inst) {
517 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
11fdf7f2
TL
518 }
519 return inst;
520 }
521};
522
f67539c2
TL
523
524/// \brief Helper to pass a parameter to a `sharded<>` object that depends
525/// on the shard. It is evaluated on the shard, just before being
526/// passed to the local instance. It is useful when passing
527/// parameters to sharded::start().
528template <typename Func, typename... Params>
529class sharded_parameter {
530 Func _func;
531 std::tuple<Params...> _params;
532public:
533 /// Creates a sharded parameter, which evaluates differently based on
534 /// the shard it is executed on.
535 ///
536 /// \param func Function to be executed
537 /// \param params optional parameters to be passed to the function. Can
538 /// be std::ref(sharded<whatever>), in which case the local
539 /// instance will be passed. Anything else
540 /// will be passed by value unchanged.
541 explicit sharded_parameter(Func func, Params... params)
1e59de90 542 SEASTAR_CONCEPT(requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>)
f67539c2
TL
543 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
544 }
545private:
546 auto evaluate() const;
547
548 template <typename Func_, typename... Param_>
549 friend auto internal::unwrap_sharded_arg(sharded_parameter<Func_, Param_...> sp);
550};
551
552/// \example sharded_parameter_demo.cc
553///
554/// Example use of \ref sharded_parameter.
555
9f95a23c
TL
556/// @}
557
11fdf7f2
TL
558template <typename Service>
559sharded<Service>::~sharded() {
560 assert(_instances.empty());
561}
562
563namespace internal {
564
11fdf7f2
TL
565template <typename T>
566inline
567T&&
568unwrap_sharded_arg(T&& arg) {
569 return std::forward<T>(arg);
570}
571
572template <typename Service>
573either_sharded_or_local<Service>
574unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
575 return either_sharded_or_local<Service>(arg);
576}
577
f67539c2
TL
578template <typename Func, typename... Param>
579auto
580unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
581 return sp.evaluate();
11fdf7f2
TL
582}
583
1e59de90
TL
584template <typename Service>
585either_sharded_or_local<Service>::operator sharded<Service>& () { return _sharded; }
586
587template <typename Service>
588either_sharded_or_local<Service>::operator Service& () { return _sharded.local(); }
589
11fdf7f2
TL
590}
591
f67539c2
TL
592template <typename Func, typename... Param>
593auto
594sharded_parameter<Func, Param...>::evaluate() const {
595 auto unwrap_params_and_invoke = [this] (const auto&... params) {
596 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
597 };
598 return std::apply(unwrap_params_and_invoke, _params);
11fdf7f2
TL
599}
600
601template <typename Service>
602template <typename... Args>
603future<>
f67539c2
TL
604sharded<Service>::start(Args&&... args) noexcept {
605 try {
11fdf7f2 606 _instances.resize(smp::count);
f67539c2 607 return sharded_parallel_for_each(
11fdf7f2
TL
608 [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
609 return smp::submit_to(c, [this, args] () mutable {
f67539c2 610 _instances[this_shard_id()].service = std::apply([this] (Args... args) {
11fdf7f2
TL
611 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
612 }, args);
613 });
614 }).then_wrapped([this] (future<> f) {
615 try {
616 f.get();
617 return make_ready_future<>();
618 } catch (...) {
619 return this->stop().then([e = std::current_exception()] () mutable {
620 std::rethrow_exception(e);
621 });
622 }
623 });
f67539c2
TL
624 } catch (...) {
625 return current_exception_as_future();
626 }
11fdf7f2
TL
627}
628
629template <typename Service>
630template <typename... Args>
631future<>
f67539c2
TL
632sharded<Service>::start_single(Args&&... args) noexcept {
633 try {
11fdf7f2
TL
634 assert(_instances.empty());
635 _instances.resize(1);
636 return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
f67539c2 637 _instances[0].service = std::apply([this] (Args... args) {
11fdf7f2
TL
638 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
639 }, args);
640 }).then_wrapped([this] (future<> f) {
641 try {
642 f.get();
643 return make_ready_future<>();
644 } catch (...) {
645 return this->stop().then([e = std::current_exception()] () mutable {
646 std::rethrow_exception(e);
647 });
648 }
649 });
f67539c2
TL
650 } catch (...) {
651 return current_exception_as_future();
652 }
11fdf7f2
TL
653}
654
9f95a23c
TL
655namespace internal {
656
657// Helper check if Service::stop exists
658
659struct sharded_has_stop {
660 // If a member names "stop" exists, try to call it, even if it doesn't
661 // have the correct signature. This is so that we don't ignore a function
662 // named stop() just because the signature is incorrect, and instead
663 // force the user to resolve the ambiguity.
664 template <typename Service>
665 constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
666 return true;
667 }
668
669 // Fallback in case Service::stop doesn't exist.
670 template<typename>
671 static constexpr auto check(...) -> bool {
672 return false;
673 }
674};
675
676template <bool stop_exists>
677struct sharded_call_stop {
678 template <typename Service>
679 static future<> call(Service& instance);
680};
681
682template <>
683template <typename Service>
684inline
685future<> sharded_call_stop<true>::call(Service& instance) {
686 return instance.stop();
687}
688
689template <>
690template <typename Service>
691inline
1e59de90 692future<> sharded_call_stop<false>::call(Service&) {
9f95a23c
TL
693 return make_ready_future<>();
694}
695
696template <typename Service>
697inline
698future<>
699stop_sharded_instance(Service& instance) {
700 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
701 return internal::sharded_call_stop<has_stop>::call(instance);
702}
703
704}
705
11fdf7f2
TL
706template <typename Service>
707future<>
f67539c2
TL
708sharded<Service>::stop() noexcept {
709 try {
710 return sharded_parallel_for_each([this] (unsigned c) mutable {
11fdf7f2 711 return smp::submit_to(c, [this] () mutable {
f67539c2 712 auto inst = _instances[this_shard_id()].service;
11fdf7f2
TL
713 if (!inst) {
714 return make_ready_future<>();
715 }
9f95a23c
TL
716 return internal::stop_sharded_instance(*inst);
717 });
f67539c2
TL
718 }).then_wrapped([this] (future<> fut) {
719 return sharded_parallel_for_each([this] (unsigned c) {
9f95a23c 720 return smp::submit_to(c, [this] {
f67539c2
TL
721 if (_instances[this_shard_id()].service == nullptr) {
722 return make_ready_future<>();
723 }
724 _instances[this_shard_id()].service = nullptr;
725 return _instances[this_shard_id()].freed.get_future();
11fdf7f2 726 });
f67539c2
TL
727 }).finally([this, fut = std::move(fut)] () mutable {
728 _instances.clear();
729 _instances = std::vector<sharded<Service>::entry>();
730 return std::move(fut);
11fdf7f2 731 });
11fdf7f2 732 });
f67539c2
TL
733 } catch (...) {
734 return current_exception_as_future();
735 }
11fdf7f2
TL
736}
737
738template <typename Service>
739future<>
f67539c2
TL
740sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept {
741 try {
742 return sharded_parallel_for_each([this, options, func = std::move(func)] (unsigned c) {
9f95a23c 743 return smp::submit_to(c, options, [this, func] {
11fdf7f2
TL
744 return func(*get_local_service());
745 });
746 });
f67539c2
TL
747 } catch (...) {
748 return current_exception_as_future();
749 }
11fdf7f2
TL
750}
751
752template <typename Service>
f67539c2 753template <typename Func, typename... Args>
1e59de90 754SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
11fdf7f2
TL
755inline
756future<>
f67539c2 757sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept {
1e59de90 758 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>,
11fdf7f2 759 "invoke_on_all()'s func must return void or future<>");
f67539c2 760 try {
1e59de90
TL
761 return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
762 return std::apply([&service, &func] (Args&&... args) mutable {
763 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
764 }, std::move(args));
11fdf7f2 765 }));
f67539c2
TL
766 } catch (...) {
767 return current_exception_as_future();
768 }
11fdf7f2
TL
769}
770
771template <typename Service>
f67539c2
TL
772template <typename Func, typename... Args>
773SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
11fdf7f2
TL
774inline
775future<>
f67539c2
TL
776sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept {
777 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>,
11fdf7f2 778 "invoke_on_others()'s func must return void or future<>");
f67539c2
TL
779 try {
780 return invoke_on_all(options, [orig = this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) -> future<> {
781 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
11fdf7f2 782 });
f67539c2
TL
783 } catch (...) {
784 return current_exception_as_future();
785 }
11fdf7f2
TL
786}
787
788template <typename Service>
f67539c2 789const Service& sharded<Service>::local() const noexcept {
11fdf7f2 790 assert(local_is_initialized());
f67539c2 791 return *_instances[this_shard_id()].service;
11fdf7f2
TL
792}
793
794template <typename Service>
f67539c2 795Service& sharded<Service>::local() noexcept {
11fdf7f2 796 assert(local_is_initialized());
f67539c2 797 return *_instances[this_shard_id()].service;
11fdf7f2
TL
798}
799
800template <typename Service>
f67539c2 801shared_ptr<Service> sharded<Service>::local_shared() noexcept {
11fdf7f2 802 assert(local_is_initialized());
f67539c2 803 return _instances[this_shard_id()].service;
11fdf7f2
TL
804}
805
806template <typename Service>
f67539c2
TL
807inline bool sharded<Service>::local_is_initialized() const noexcept {
808 return _instances.size() > this_shard_id() &&
809 _instances[this_shard_id()].service;
11fdf7f2
TL
810}
811
9f95a23c
TL
812/// \addtogroup smp-module
813/// @{
814
11fdf7f2
TL
815/// Smart pointer wrapper which makes it safe to move across CPUs.
816///
817/// \c foreign_ptr<> is a smart pointer wrapper which, unlike
818/// \ref shared_ptr and \ref lw_shared_ptr, is safe to move to a
819/// different core.
820///
821/// As seastar avoids locking, any but the most trivial objects must
822/// be destroyed on the same core they were created on, so that,
823/// for example, their destructors can unlink references to the
824/// object from various containers. In addition, for performance
825/// reasons, the shared pointer types do not use atomic operations
826/// to manage their reference counts. As a result they cannot be
827/// used on multiple cores in parallel.
828///
829/// \c foreign_ptr<> provides a solution to that problem.
1e59de90
TL
830/// \c foreign_ptr<> wraps smart pointers -- \ref seastar::shared_ptr<>,
831/// or similar, and remembers on what core this happened.
832/// When the \c foreign_ptr<> object is destroyed, it sends a message to
833/// the original core so that the wrapped object can be safely destroyed.
11fdf7f2
TL
834///
835/// \c foreign_ptr<> is a move-only object; it cannot be copied.
836///
837template <typename PtrType>
20effc67 838SEASTAR_CONCEPT( requires (!std::is_pointer<PtrType>::value) )
11fdf7f2
TL
839class foreign_ptr {
840private:
841 PtrType _value;
842 unsigned _cpu;
843private:
f67539c2 844 void destroy(PtrType p, unsigned cpu) noexcept {
20effc67
TL
845 // `destroy()` is called from the destructor and other
846 // synchronous methods (like `reset()`), that have no way to
847 // wait for this future.
848 (void)destroy_on(std::move(p), cpu);
849 }
850
851 static future<> destroy_on(PtrType p, unsigned cpu) noexcept {
852 if (p) {
853 if (cpu != this_shard_id()) {
854 return smp::submit_to(cpu, [v = std::move(p)] () mutable {
855 // Destroy the contained pointer. We do this explicitly
856 // in the current shard, because the lambda is destroyed
857 // in the shard that submitted the task.
858 v = {};
859 });
860 } else {
861 p = {};
862 }
11fdf7f2 863 }
20effc67 864 return make_ready_future<>();
11fdf7f2
TL
865 }
866public:
867 using element_type = typename std::pointer_traits<PtrType>::element_type;
868 using pointer = element_type*;
869
870 /// Constructs a null \c foreign_ptr<>.
f67539c2 871 foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
11fdf7f2 872 : _value(PtrType())
f67539c2 873 , _cpu(this_shard_id()) {
11fdf7f2
TL
874 }
875 /// Constructs a null \c foreign_ptr<>.
f67539c2 876 foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v<foreign_ptr>) : foreign_ptr() {}
11fdf7f2 877 /// Wraps a pointer object and remembers the current core.
f67539c2 878 foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
11fdf7f2 879 : _value(std::move(value))
f67539c2 880 , _cpu(this_shard_id()) {
11fdf7f2
TL
881 }
882 // The type is intentionally non-copyable because copies
883 // are expensive because each copy requires across-CPU call.
884 foreign_ptr(const foreign_ptr&) = delete;
885 /// Moves a \c foreign_ptr<> to another object.
f67539c2 886 foreign_ptr(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) = default;
11fdf7f2
TL
887 /// Destroys the wrapped object on its original cpu.
888 ~foreign_ptr() {
889 destroy(std::move(_value), _cpu);
890 }
891 /// Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
f67539c2 892 future<foreign_ptr> copy() const noexcept {
11fdf7f2
TL
893 return smp::submit_to(_cpu, [this] () mutable {
894 auto v = _value;
895 return make_foreign(std::move(v));
896 });
897 }
898 /// Accesses the wrapped object.
f67539c2 899 element_type& operator*() const noexcept(noexcept(*_value)) { return *_value; }
11fdf7f2 900 /// Accesses the wrapped object.
f67539c2 901 element_type* operator->() const noexcept(noexcept(&*_value)) { return &*_value; }
11fdf7f2 902 /// Access the raw pointer to the wrapped object.
f67539c2 903 pointer get() const noexcept(noexcept(&*_value)) { return &*_value; }
11fdf7f2
TL
904 /// Return the owner-shard of this pointer.
905 ///
906 /// The owner shard of the pointer can change as a result of
907 /// move-assigment or a call to reset().
f67539c2 908 unsigned get_owner_shard() const noexcept { return _cpu; }
11fdf7f2 909 /// Checks whether the wrapped pointer is non-null.
f67539c2 910 operator bool() const noexcept(noexcept(static_cast<bool>(_value))) { return static_cast<bool>(_value); }
11fdf7f2 911 /// Move-assigns a \c foreign_ptr<>.
9f95a23c
TL
912 foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible<PtrType>::value) {
913 destroy(std::move(_value), _cpu);
914 _value = std::move(other._value);
915 _cpu = other._cpu;
916 return *this;
917 }
11fdf7f2
TL
918 /// Releases the owned pointer
919 ///
920 /// Warning: the caller is now responsible for destroying the
921 /// pointer on its owner shard. This method is best called on the
922 /// owner shard to avoid accidents.
f67539c2 923 PtrType release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
11fdf7f2
TL
924 return std::exchange(_value, {});
925 }
926 /// Replace the managed pointer with new_ptr.
927 ///
928 /// The previous managed pointer is destroyed on its owner shard.
f67539c2 929 void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
11fdf7f2
TL
930 auto old_ptr = std::move(_value);
931 auto old_cpu = _cpu;
932
933 _value = std::move(new_ptr);
f67539c2 934 _cpu = this_shard_id();
11fdf7f2
TL
935
936 destroy(std::move(old_ptr), old_cpu);
937 }
938 /// Replace the managed pointer with a null value.
939 ///
940 /// The previous managed pointer is destroyed on its owner shard.
f67539c2 941 void reset(std::nullptr_t = nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
11fdf7f2
TL
942 reset(PtrType());
943 }
20effc67
TL
944
945 /// Destroy the managed pointer.
946 ///
947 /// \returns a future that is resolved when managed pointer is destroyed on its owner shard.
948 future<> destroy() noexcept {
949 return destroy_on(std::move(_value), _cpu);
950 }
11fdf7f2
TL
951};
952
953/// Wraps a raw or smart pointer object in a \ref foreign_ptr<>.
954///
955/// \relates foreign_ptr
956template <typename T>
957foreign_ptr<T> make_foreign(T ptr) {
958 return foreign_ptr<T>(std::move(ptr));
959}
960
9f95a23c
TL
961/// @}
962
11fdf7f2
TL
963template<typename T>
964struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
965
966}