]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/sharded.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / sharded.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22 #pragma once
23
24 #include <seastar/core/smp.hh>
25 #include <seastar/core/loop.hh>
26 #include <seastar/core/map_reduce.hh>
27 #include <seastar/util/is_smart_ptr.hh>
28 #include <seastar/util/tuple_utils.hh>
29 #include <seastar/core/do_with.hh>
30 #include <seastar/util/concepts.hh>
31 #include <seastar/util/log.hh>
32 #include <boost/iterator/counting_iterator.hpp>
33 #include <functional>
34 #if __has_include(<concepts>)
35 #include <concepts>
36 #endif
37
38 /// \defgroup smp-module Multicore
39 ///
40 /// \brief Support for exploiting multiple cores on a server.
41 ///
42 /// Seastar supports multicore servers by using *sharding*. Each logical
43 /// core (lcore) runs a separate event loop, with its own memory allocator,
44 /// TCP/IP stack, and other services. Shards communicate by explicit message
45 /// passing, rather than using locks and condition variables as with traditional
46 /// threaded programming.
47
48 namespace seastar {
49
50 template <typename Func, typename... Param>
51 class sharded_parameter;
52
53 template <typename Service>
54 class sharded;
55
56 namespace internal {
57
58 template <typename Func, typename... Param>
59 auto unwrap_sharded_arg(sharded_parameter<Func, Param...> sp);
60
61 using on_each_shard_func = std::function<future<> (unsigned shard)>;
62
63 future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard) noexcept(std::is_nothrow_move_constructible_v<on_each_shard_func>);
64
65 template <typename Service>
66 class either_sharded_or_local {
67 sharded<Service>& _sharded;
68 public:
69 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
70 operator sharded<Service>& ();
71 operator Service& ();
72 };
73
74 template <typename T>
75 struct sharded_unwrap {
76 using evaluated_type = T;
77 using type = T;
78 };
79
80 template <typename T>
81 struct sharded_unwrap<std::reference_wrapper<sharded<T>>> {
82 using evaluated_type = T&;
83 using type = either_sharded_or_local<T>;
84 };
85
86 template <typename Func, typename... Param>
87 struct sharded_unwrap<sharded_parameter<Func, Param...>> {
88 using type = std::invoke_result_t<Func, Param...>;
89 };
90
91 template <typename T>
92 using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
93
94 template <typename T>
95 using sharded_unwrap_t = typename sharded_unwrap<T>::type;
96
97 } // internal
98
99
100 /// \addtogroup smp-module
101 /// @{
102
103 template <typename T>
104 class sharded;
105
106 /// If a sharded service inherits from this class, sharded::stop() will wait
107 /// until all references to this service on each shard are released before
108 /// returning. It is still service's own responsibility to track its references
109 /// in asynchronous code by calling shared_from_this() and keeping returned smart
110 /// pointer as long as object is in use.
111 template<typename T>
112 class async_sharded_service : public enable_shared_from_this<T> {
113 protected:
114 std::function<void()> _delete_cb;
115 async_sharded_service() noexcept = default;
116 virtual ~async_sharded_service() {
117 if (_delete_cb) {
118 _delete_cb();
119 }
120 }
121 template <typename Service> friend class sharded;
122 };
123
124
125 /// \brief Provide a sharded service with access to its peers
126 ///
127 /// If a service class inherits from this, it will gain a \code container()
128 /// \endcode method that provides access to the \ref sharded object, with which
129 /// it can call its peers.
130 template <typename Service>
131 class peering_sharded_service {
132 sharded<Service>* _container = nullptr;
133 private:
134 template <typename T> friend class sharded;
135 void set_container(sharded<Service>* container) noexcept { _container = container; }
136 public:
137 peering_sharded_service() noexcept = default;
138 peering_sharded_service(peering_sharded_service<Service>&&) noexcept = default;
139 peering_sharded_service(const peering_sharded_service<Service>&) = delete;
140 peering_sharded_service& operator=(const peering_sharded_service<Service>&) = delete;
141 sharded<Service>& container() noexcept { return *_container; }
142 const sharded<Service>& container() const noexcept { return *_container; }
143 };
144
145
146 /// Exception thrown when a \ref sharded object does not exist
147 class no_sharded_instance_exception : public std::exception {
148 sstring _msg;
149 public:
150 no_sharded_instance_exception() : _msg("sharded instance does not exist") {}
151 explicit no_sharded_instance_exception(sstring type_info)
152 : _msg("sharded instance does not exist: " + type_info) {}
153 virtual const char* what() const noexcept override {
154 return _msg.c_str();
155 }
156 };
157
158 /// Template helper to distribute a service across all logical cores.
159 ///
160 /// The \c sharded template manages a sharded service, by creating
161 /// a copy of the service on each logical core, providing mechanisms to communicate
162 /// with each shard's copy, and a way to stop the service.
163 ///
164 /// \tparam Service a class to be instantiated on each core. Must expose
165 /// a \c stop() method that returns a \c future<>, to be called when
166 /// the service is stopped.
167 template <typename Service>
168 class sharded {
169 struct entry {
170 shared_ptr<Service> service;
171 promise<> freed;
172 };
173 std::vector<entry> _instances;
174 private:
175 using invoke_on_all_func_type = std::function<future<> (Service&)>;
176 private:
177 void service_deleted() noexcept {
178 _instances[this_shard_id()].freed.set_value();
179 }
180 template <typename U, bool async>
181 friend struct shared_ptr_make_helper;
182
183 template <typename T>
184 std::enable_if_t<std::is_base_of<peering_sharded_service<T>, T>::value>
185 set_container(T& service) noexcept {
186 service.set_container(this);
187 }
188
189 template <typename T>
190 std::enable_if_t<!std::is_base_of<peering_sharded_service<T>, T>::value>
191 set_container(T&) noexcept {
192 }
193
194 future<>
195 sharded_parallel_for_each(internal::on_each_shard_func func) noexcept(std::is_nothrow_move_constructible_v<internal::on_each_shard_func>) {
196 return internal::sharded_parallel_for_each(_instances.size(), std::move(func));
197 }
198 public:
199 /// Constructs an empty \c sharded object. No instances of the service are
200 /// created.
201 sharded() noexcept {}
202 sharded(const sharded& other) = delete;
203 sharded& operator=(const sharded& other) = delete;
204 /// Sharded object with T that inherits from peering_sharded_service
205 /// cannot be moved safely, so disable move operations.
206 sharded(sharded&& other) = delete;
207 sharded& operator=(sharded&& other) = delete;
208 /// Destroyes a \c sharded object. Must not be in a started state.
209 ~sharded();
210
211 /// Starts \c Service by constructing an instance on every logical core
212 /// with a copy of \c args passed to the constructor.
213 ///
214 /// \param args Arguments to be forwarded to \c Service constructor
215 /// \return a \ref seastar::future<> that becomes ready when all instances have been
216 /// constructed.
217 template <typename... Args>
218 future<> start(Args&&... args) noexcept;
219
220 /// Starts \c Service by constructing an instance on a single logical core
221 /// with a copy of \c args passed to the constructor.
222 ///
223 /// \param args Arguments to be forwarded to \c Service constructor
224 /// \return a \ref seastar::future<> that becomes ready when the instance has been
225 /// constructed.
226 template <typename... Args>
227 future<> start_single(Args&&... args) noexcept;
228
229 /// Stops all started instances and destroys them.
230 ///
231 /// For every started instance, its \c stop() method is called, and then
232 /// it is destroyed.
233 future<> stop() noexcept;
234
235 /// Invoke a type-erased function on all instances of `Service`.
236 /// The return value becomes ready when all instances have processed
237 /// the message.
238 ///
239 /// \param options the options to forward to the \ref smp::submit_to()
240 /// called behind the scenes.
241 /// \param func Function to be invoked on all shards
242 /// \return Future that becomes ready once all calls have completed
243 future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept;
244
245 /// Invoke a type-erased function on all instances of `Service`.
246 /// The return value becomes ready when all instances have processed
247 /// the message.
248 /// Passes the default \ref smp_submit_to_options to the
249 /// \ref smp::submit_to() called behind the scenes.
250 future<> invoke_on_all(std::function<future<> (Service&)> func) noexcept {
251 try {
252 return invoke_on_all(smp_submit_to_options{}, std::move(func));
253 } catch (...) {
254 return current_exception_as_future();
255 }
256 }
257
258 /// Invoke a function on all instances of `Service`.
259 /// The return value becomes ready when all instances have processed
260 /// the message. The function can be a member pointer to function,
261 /// a free function, or a functor. The first argument of the function
262 /// will be a reference to the local service on the shard.
263 ///
264 /// For a non-static pointer-to-member-function, the first argument
265 /// becomes `this`, not the first declared parameter.
266 ///
267 /// \param options the options to forward to the \ref smp::submit_to()
268 /// called behind the scenes.
269 /// \param func invocable accepting a `Service&` as the first parameter
270 /// to be invoked on all shards
271 /// \return Future that becomes ready once all calls have completed
272 template <typename Func, typename... Args>
273 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
274 future<> invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept;
275
276 /// Invoke a function on all instances of `Service`.
277 /// The return value becomes ready when all instances have processed
278 /// the message.
279 /// Passes the default \ref smp_submit_to_options to the
280 /// \ref smp::submit_to() called behind the scenes.
281 template <typename Func, typename... Args>
282 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
283 future<> invoke_on_all(Func func, Args... args) noexcept {
284 try {
285 return invoke_on_all(smp_submit_to_options{}, std::move(func), std::move(args)...);
286 } catch (...) {
287 return current_exception_as_future();
288 }
289 }
290
291 /// Invoke a callable on all instances of \c Service except the instance
292 /// which is allocated on current shard.
293 ///
294 /// \param options the options to forward to the \ref smp::submit_to()
295 /// called behind the scenes.
296 /// \param func a callable with the signature `void (Service&)`
297 /// or `future<> (Service&)`, to be called on each core
298 /// with the local instance as an argument.
299 /// \return a `future<>` that becomes ready when all cores but the current one have
300 /// processed the message.
301 template <typename Func, typename... Args>
302 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
303 future<> invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept;
304
305 /// Invoke a callable on all instances of \c Service except the instance
306 /// which is allocated on current shard.
307 ///
308 /// \param func a callable with the signature `void (Service&)`
309 /// or `future<> (Service&)`, to be called on each core
310 /// with the local instance as an argument.
311 /// \return a `future<>` that becomes ready when all cores but the current one have
312 /// processed the message.
313 ///
314 /// Passes the default \ref smp_submit_to_options to the
315 /// \ref smp::submit_to() called behind the scenes.
316 template <typename Func, typename... Args>
317 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
318 future<> invoke_on_others(Func func, Args... args) noexcept {
319 try {
320 return invoke_on_others(smp_submit_to_options{}, std::move(func), std::move(args)...);
321 } catch (...) {
322 return current_exception_as_future();
323 }
324 }
325
326 /// Invoke a callable on all instances of `Service` and reduce the results using
327 /// `Reducer`.
328 ///
329 /// \see map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
330 template <typename Reducer, typename Func, typename... Args>
331 inline
332 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) -> typename reducer_traits<Reducer>::future_type
333 {
334 return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
335 boost::make_counting_iterator<unsigned>(_instances.size()),
336 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
337 return smp::submit_to(c, [this, &func, args] () mutable {
338 return std::apply([this, &func] (Args&&... args) mutable {
339 auto inst = get_local_service();
340 return std::invoke(func, *inst, std::forward<Args>(args)...);
341 }, std::move(args));
342 });
343 }, std::forward<Reducer>(r));
344 }
345
346 /// The const version of \ref map_reduce(Reducer&& r, Func&& func)
347 template <typename Reducer, typename Func, typename... Args>
348 inline
349 auto map_reduce(Reducer&& r, Func&& func, Args&&... args) const -> typename reducer_traits<Reducer>::future_type
350 {
351 return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
352 boost::make_counting_iterator<unsigned>(_instances.size()),
353 [this, func = std::forward<Func>(func), args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) {
354 return smp::submit_to(c, [this, &func, args] () {
355 return std::apply([this, &func] (Args&&... args) {
356 auto inst = get_local_service();
357 return std::invoke(func, *inst, std::forward<Args>(args)...);
358 }, std::move(args));
359 });
360 }, std::forward<Reducer>(r));
361 }
362
363 /// Applies a map function to all shards, then reduces the output by calling a reducer function.
364 ///
365 /// \param map callable with the signature `Value (Service&)` or
366 /// `future<Value> (Service&)` (for some `Value` type).
367 /// used as the second input to \c reduce
368 /// \param initial initial value used as the first input to \c reduce.
369 /// \param reduce binary function used to left-fold the return values of \c map
370 /// into \c initial .
371 ///
372 /// Each \c map invocation runs on the shard associated with the service.
373 ///
374 /// \tparam Mapper unary function taking `Service&` and producing some result.
375 /// \tparam Initial any value type
376 /// \tparam Reduce a binary function taking two Initial values and returning an Initial
377 /// \return Result of invoking `map` with each instance in parallel, reduced by calling
378 /// `reduce()` on each adjacent pair of results.
379 template <typename Mapper, typename Initial, typename Reduce>
380 inline
381 future<Initial>
382 map_reduce0(Mapper map, Initial initial, Reduce reduce) {
383 auto wrapped_map = [this, map] (unsigned c) {
384 return smp::submit_to(c, [this, map] {
385 auto inst = get_local_service();
386 return std::invoke(map, *inst);
387 });
388 };
389 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
390 std::move(wrapped_map),
391 std::move(initial),
392 std::move(reduce));
393 }
394
395 /// The const version of \ref map_reduce0(Mapper map, Initial initial, Reduce reduce)
396 template <typename Mapper, typename Initial, typename Reduce>
397 inline
398 future<Initial>
399 map_reduce0(Mapper map, Initial initial, Reduce reduce) const {
400 auto wrapped_map = [this, map] (unsigned c) {
401 return smp::submit_to(c, [this, map] {
402 auto inst = get_local_service();
403 return std::invoke(map, *inst);
404 });
405 };
406 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
407 std::move(wrapped_map),
408 std::move(initial),
409 std::move(reduce));
410 }
411
412 /// Applies a map function to all shards, and return a vector of the result.
413 ///
414 /// \param mapper callable with the signature `Value (Service&)` or
415 /// `future<Value> (Service&)` (for some `Value` type).
416 ///
417 /// Each \c map invocation runs on the shard associated with the service.
418 ///
419 /// \tparam Mapper unary function taking `Service&` and producing some result.
420 /// \return Result vector of invoking `map` with each instance in parallel
421 template <typename Mapper, typename Future = futurize_t<std::invoke_result_t<Mapper,Service&>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::tuple_type>()))>
422 inline future<std::vector<return_type>> map(Mapper mapper) {
423 return do_with(std::vector<return_type>(), std::move(mapper),
424 [this] (std::vector<return_type>& vec, Mapper& mapper) mutable {
425 vec.resize(_instances.size());
426 return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [this, &vec, &mapper] (unsigned c) {
427 return smp::submit_to(c, [this, &mapper] {
428 auto inst = get_local_service();
429 return mapper(*inst);
430 }).then([&vec, c] (auto&& res) {
431 vec[c] = std::move(res);
432 });
433 }).then([&vec] {
434 return make_ready_future<std::vector<return_type>>(std::move(vec));
435 });
436 });
437 }
438
439 /// Invoke a callable on a specific instance of `Service`.
440 ///
441 /// \param id shard id to call
442 /// \param options the options to forward to the \ref smp::submit_to()
443 /// called behind the scenes.
444 /// \param func a callable with signature `Value (Service&, Args...)` or
445 /// `future<Value> (Service&, Args...)` (for some `Value` type), or a pointer
446 /// to a member function of Service
447 /// \param args parameters to the callable; will be copied or moved. To pass by reference,
448 /// use std::ref().
449 ///
450 /// \return result of calling `func(instance)` on the designated instance
451 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
452 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
453 Ret
454 invoke_on(unsigned id, smp_submit_to_options options, Func&& func, Args&&... args) {
455 return smp::submit_to(id, options, [this, func = std::forward<Func>(func), args = std::tuple(std::move(args)...)] () mutable {
456 auto inst = get_local_service();
457 return std::apply(std::forward<Func>(func), std::tuple_cat(std::forward_as_tuple(*inst), std::move(args)));
458 });
459 }
460
461 /// Invoke a callable on a specific instance of `Service`.
462 ///
463 /// \param id shard id to call
464 /// \param func a callable with signature `Value (Service&)` or
465 /// `future<Value> (Service&)` (for some `Value` type), or a pointer
466 /// to a member function of Service
467 /// \param args parameters to the callable
468 /// \return result of calling `func(instance)` on the designated instance
469 template <typename Func, typename... Args, typename Ret = futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
470 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args&&...>)
471 Ret
472 invoke_on(unsigned id, Func&& func, Args&&... args) {
473 return invoke_on(id, smp_submit_to_options(), std::forward<Func>(func), std::forward<Args>(args)...);
474 }
475
476 /// Gets a reference to the local instance.
477 const Service& local() const noexcept;
478
479 /// Gets a reference to the local instance.
480 Service& local() noexcept;
481
482 /// Gets a shared pointer to the local instance.
483 shared_ptr<Service> local_shared() noexcept;
484
485 /// Checks whether the local instance has been initialized.
486 bool local_is_initialized() const noexcept;
487
488 private:
489 void track_deletion(shared_ptr<Service>&, std::false_type) noexcept {
490 // do not wait for instance to be deleted since it is not going to notify us
491 service_deleted();
492 }
493
494 void track_deletion(shared_ptr<Service>& s, std::true_type) {
495 s->_delete_cb = std::bind(std::mem_fn(&sharded<Service>::service_deleted), this);
496 }
497
498 template <typename... Args>
499 shared_ptr<Service> create_local_service(Args&&... args) {
500 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
501 set_container(*s);
502 track_deletion(s, std::is_base_of<async_sharded_service<Service>, Service>());
503 return s;
504 }
505
506 shared_ptr<Service> get_local_service() {
507 auto inst = _instances[this_shard_id()].service;
508 if (!inst) {
509 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
510 }
511 return inst;
512 }
513
514 shared_ptr<const Service> get_local_service() const {
515 auto inst = _instances[this_shard_id()].service;
516 if (!inst) {
517 throw no_sharded_instance_exception(pretty_type_name(typeid(Service)));
518 }
519 return inst;
520 }
521 };
522
523
524 /// \brief Helper to pass a parameter to a `sharded<>` object that depends
525 /// on the shard. It is evaluated on the shard, just before being
526 /// passed to the local instance. It is useful when passing
527 /// parameters to sharded::start().
528 template <typename Func, typename... Params>
529 class sharded_parameter {
530 Func _func;
531 std::tuple<Params...> _params;
532 public:
533 /// Creates a sharded parameter, which evaluates differently based on
534 /// the shard it is executed on.
535 ///
536 /// \param func Function to be executed
537 /// \param params optional parameters to be passed to the function. Can
538 /// be std::ref(sharded<whatever>), in which case the local
539 /// instance will be passed. Anything else
540 /// will be passed by value unchanged.
541 explicit sharded_parameter(Func func, Params... params)
542 SEASTAR_CONCEPT(requires std::invocable<Func, internal::sharded_unwrap_evaluated_t<Params>...>)
543 : _func(std::move(func)), _params(std::make_tuple(std::move(params)...)) {
544 }
545 private:
546 auto evaluate() const;
547
548 template <typename Func_, typename... Param_>
549 friend auto internal::unwrap_sharded_arg(sharded_parameter<Func_, Param_...> sp);
550 };
551
552 /// \example sharded_parameter_demo.cc
553 ///
554 /// Example use of \ref sharded_parameter.
555
556 /// @}
557
558 template <typename Service>
559 sharded<Service>::~sharded() {
560 assert(_instances.empty());
561 }
562
563 namespace internal {
564
565 template <typename T>
566 inline
567 T&&
568 unwrap_sharded_arg(T&& arg) {
569 return std::forward<T>(arg);
570 }
571
572 template <typename Service>
573 either_sharded_or_local<Service>
574 unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
575 return either_sharded_or_local<Service>(arg);
576 }
577
578 template <typename Func, typename... Param>
579 auto
580 unwrap_sharded_arg(sharded_parameter<Func, Param...> sp) {
581 return sp.evaluate();
582 }
583
584 template <typename Service>
585 either_sharded_or_local<Service>::operator sharded<Service>& () { return _sharded; }
586
587 template <typename Service>
588 either_sharded_or_local<Service>::operator Service& () { return _sharded.local(); }
589
590 }
591
592 template <typename Func, typename... Param>
593 auto
594 sharded_parameter<Func, Param...>::evaluate() const {
595 auto unwrap_params_and_invoke = [this] (const auto&... params) {
596 return std::invoke(_func, internal::unwrap_sharded_arg(params)...);
597 };
598 return std::apply(unwrap_params_and_invoke, _params);
599 }
600
601 template <typename Service>
602 template <typename... Args>
603 future<>
604 sharded<Service>::start(Args&&... args) noexcept {
605 try {
606 _instances.resize(smp::count);
607 return sharded_parallel_for_each(
608 [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
609 return smp::submit_to(c, [this, args] () mutable {
610 _instances[this_shard_id()].service = std::apply([this] (Args... args) {
611 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
612 }, args);
613 });
614 }).then_wrapped([this] (future<> f) {
615 try {
616 f.get();
617 return make_ready_future<>();
618 } catch (...) {
619 return this->stop().then([e = std::current_exception()] () mutable {
620 std::rethrow_exception(e);
621 });
622 }
623 });
624 } catch (...) {
625 return current_exception_as_future();
626 }
627 }
628
629 template <typename Service>
630 template <typename... Args>
631 future<>
632 sharded<Service>::start_single(Args&&... args) noexcept {
633 try {
634 assert(_instances.empty());
635 _instances.resize(1);
636 return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
637 _instances[0].service = std::apply([this] (Args... args) {
638 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
639 }, args);
640 }).then_wrapped([this] (future<> f) {
641 try {
642 f.get();
643 return make_ready_future<>();
644 } catch (...) {
645 return this->stop().then([e = std::current_exception()] () mutable {
646 std::rethrow_exception(e);
647 });
648 }
649 });
650 } catch (...) {
651 return current_exception_as_future();
652 }
653 }
654
655 namespace internal {
656
657 // Helper check if Service::stop exists
658
659 struct sharded_has_stop {
660 // If a member names "stop" exists, try to call it, even if it doesn't
661 // have the correct signature. This is so that we don't ignore a function
662 // named stop() just because the signature is incorrect, and instead
663 // force the user to resolve the ambiguity.
664 template <typename Service>
665 constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
666 return true;
667 }
668
669 // Fallback in case Service::stop doesn't exist.
670 template<typename>
671 static constexpr auto check(...) -> bool {
672 return false;
673 }
674 };
675
676 template <bool stop_exists>
677 struct sharded_call_stop {
678 template <typename Service>
679 static future<> call(Service& instance);
680 };
681
682 template <>
683 template <typename Service>
684 inline
685 future<> sharded_call_stop<true>::call(Service& instance) {
686 return instance.stop();
687 }
688
689 template <>
690 template <typename Service>
691 inline
692 future<> sharded_call_stop<false>::call(Service&) {
693 return make_ready_future<>();
694 }
695
696 template <typename Service>
697 inline
698 future<>
699 stop_sharded_instance(Service& instance) {
700 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
701 return internal::sharded_call_stop<has_stop>::call(instance);
702 }
703
704 }
705
706 template <typename Service>
707 future<>
708 sharded<Service>::stop() noexcept {
709 try {
710 return sharded_parallel_for_each([this] (unsigned c) mutable {
711 return smp::submit_to(c, [this] () mutable {
712 auto inst = _instances[this_shard_id()].service;
713 if (!inst) {
714 return make_ready_future<>();
715 }
716 return internal::stop_sharded_instance(*inst);
717 });
718 }).then_wrapped([this] (future<> fut) {
719 return sharded_parallel_for_each([this] (unsigned c) {
720 return smp::submit_to(c, [this] {
721 if (_instances[this_shard_id()].service == nullptr) {
722 return make_ready_future<>();
723 }
724 _instances[this_shard_id()].service = nullptr;
725 return _instances[this_shard_id()].freed.get_future();
726 });
727 }).finally([this, fut = std::move(fut)] () mutable {
728 _instances.clear();
729 _instances = std::vector<sharded<Service>::entry>();
730 return std::move(fut);
731 });
732 });
733 } catch (...) {
734 return current_exception_as_future();
735 }
736 }
737
738 template <typename Service>
739 future<>
740 sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) noexcept {
741 try {
742 return sharded_parallel_for_each([this, options, func = std::move(func)] (unsigned c) {
743 return smp::submit_to(c, options, [this, func] {
744 return func(*get_local_service());
745 });
746 });
747 } catch (...) {
748 return current_exception_as_future();
749 }
750 }
751
752 template <typename Service>
753 template <typename Func, typename... Args>
754 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, internal::sharded_unwrap_t<Args>...>)
755 inline
756 future<>
757 sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args... args) noexcept {
758 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>,
759 "invoke_on_all()'s func must return void or future<>");
760 try {
761 return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
762 return std::apply([&service, &func] (Args&&... args) mutable {
763 return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
764 }, std::move(args));
765 }));
766 } catch (...) {
767 return current_exception_as_future();
768 }
769 }
770
771 template <typename Service>
772 template <typename Func, typename... Args>
773 SEASTAR_CONCEPT(requires std::invocable<Func, Service&, Args...>)
774 inline
775 future<>
776 sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Args... args) noexcept {
777 static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, Args...>>, future<>>,
778 "invoke_on_others()'s func must return void or future<>");
779 try {
780 return invoke_on_all(options, [orig = this_shard_id(), func = std::move(func), args = std::tuple(std::move(args)...)] (Service& s) -> future<> {
781 return this_shard_id() == orig ? make_ready_future<>() : futurize_apply(func, std::tuple_cat(std::forward_as_tuple(s), args));;
782 });
783 } catch (...) {
784 return current_exception_as_future();
785 }
786 }
787
788 template <typename Service>
789 const Service& sharded<Service>::local() const noexcept {
790 assert(local_is_initialized());
791 return *_instances[this_shard_id()].service;
792 }
793
794 template <typename Service>
795 Service& sharded<Service>::local() noexcept {
796 assert(local_is_initialized());
797 return *_instances[this_shard_id()].service;
798 }
799
800 template <typename Service>
801 shared_ptr<Service> sharded<Service>::local_shared() noexcept {
802 assert(local_is_initialized());
803 return _instances[this_shard_id()].service;
804 }
805
806 template <typename Service>
807 inline bool sharded<Service>::local_is_initialized() const noexcept {
808 return _instances.size() > this_shard_id() &&
809 _instances[this_shard_id()].service;
810 }
811
812 /// \addtogroup smp-module
813 /// @{
814
815 /// Smart pointer wrapper which makes it safe to move across CPUs.
816 ///
817 /// \c foreign_ptr<> is a smart pointer wrapper which, unlike
818 /// \ref shared_ptr and \ref lw_shared_ptr, is safe to move to a
819 /// different core.
820 ///
821 /// As seastar avoids locking, any but the most trivial objects must
822 /// be destroyed on the same core they were created on, so that,
823 /// for example, their destructors can unlink references to the
824 /// object from various containers. In addition, for performance
825 /// reasons, the shared pointer types do not use atomic operations
826 /// to manage their reference counts. As a result they cannot be
827 /// used on multiple cores in parallel.
828 ///
829 /// \c foreign_ptr<> provides a solution to that problem.
830 /// \c foreign_ptr<> wraps smart pointers -- \ref seastar::shared_ptr<>,
831 /// or similar, and remembers on what core this happened.
832 /// When the \c foreign_ptr<> object is destroyed, it sends a message to
833 /// the original core so that the wrapped object can be safely destroyed.
834 ///
835 /// \c foreign_ptr<> is a move-only object; it cannot be copied.
836 ///
837 template <typename PtrType>
838 SEASTAR_CONCEPT( requires (!std::is_pointer<PtrType>::value) )
839 class foreign_ptr {
840 private:
841 PtrType _value;
842 unsigned _cpu;
843 private:
844 void destroy(PtrType p, unsigned cpu) noexcept {
845 // `destroy()` is called from the destructor and other
846 // synchronous methods (like `reset()`), that have no way to
847 // wait for this future.
848 (void)destroy_on(std::move(p), cpu);
849 }
850
851 static future<> destroy_on(PtrType p, unsigned cpu) noexcept {
852 if (p) {
853 if (cpu != this_shard_id()) {
854 return smp::submit_to(cpu, [v = std::move(p)] () mutable {
855 // Destroy the contained pointer. We do this explicitly
856 // in the current shard, because the lambda is destroyed
857 // in the shard that submitted the task.
858 v = {};
859 });
860 } else {
861 p = {};
862 }
863 }
864 return make_ready_future<>();
865 }
866 public:
867 using element_type = typename std::pointer_traits<PtrType>::element_type;
868 using pointer = element_type*;
869
870 /// Constructs a null \c foreign_ptr<>.
871 foreign_ptr() noexcept(std::is_nothrow_default_constructible_v<PtrType>)
872 : _value(PtrType())
873 , _cpu(this_shard_id()) {
874 }
875 /// Constructs a null \c foreign_ptr<>.
876 foreign_ptr(std::nullptr_t) noexcept(std::is_nothrow_default_constructible_v<foreign_ptr>) : foreign_ptr() {}
877 /// Wraps a pointer object and remembers the current core.
878 foreign_ptr(PtrType value) noexcept(std::is_nothrow_move_constructible_v<PtrType>)
879 : _value(std::move(value))
880 , _cpu(this_shard_id()) {
881 }
882 // The type is intentionally non-copyable because copies
883 // are expensive because each copy requires across-CPU call.
884 foreign_ptr(const foreign_ptr&) = delete;
885 /// Moves a \c foreign_ptr<> to another object.
886 foreign_ptr(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible_v<PtrType>) = default;
887 /// Destroys the wrapped object on its original cpu.
888 ~foreign_ptr() {
889 destroy(std::move(_value), _cpu);
890 }
891 /// Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
892 future<foreign_ptr> copy() const noexcept {
893 return smp::submit_to(_cpu, [this] () mutable {
894 auto v = _value;
895 return make_foreign(std::move(v));
896 });
897 }
898 /// Accesses the wrapped object.
899 element_type& operator*() const noexcept(noexcept(*_value)) { return *_value; }
900 /// Accesses the wrapped object.
901 element_type* operator->() const noexcept(noexcept(&*_value)) { return &*_value; }
902 /// Access the raw pointer to the wrapped object.
903 pointer get() const noexcept(noexcept(&*_value)) { return &*_value; }
904 /// Return the owner-shard of this pointer.
905 ///
906 /// The owner shard of the pointer can change as a result of
907 /// move-assigment or a call to reset().
908 unsigned get_owner_shard() const noexcept { return _cpu; }
909 /// Checks whether the wrapped pointer is non-null.
910 operator bool() const noexcept(noexcept(static_cast<bool>(_value))) { return static_cast<bool>(_value); }
911 /// Move-assigns a \c foreign_ptr<>.
912 foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible<PtrType>::value) {
913 destroy(std::move(_value), _cpu);
914 _value = std::move(other._value);
915 _cpu = other._cpu;
916 return *this;
917 }
918 /// Releases the owned pointer
919 ///
920 /// Warning: the caller is now responsible for destroying the
921 /// pointer on its owner shard. This method is best called on the
922 /// owner shard to avoid accidents.
923 PtrType release() noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
924 return std::exchange(_value, {});
925 }
926 /// Replace the managed pointer with new_ptr.
927 ///
928 /// The previous managed pointer is destroyed on its owner shard.
929 void reset(PtrType new_ptr) noexcept(std::is_nothrow_move_constructible_v<PtrType>) {
930 auto old_ptr = std::move(_value);
931 auto old_cpu = _cpu;
932
933 _value = std::move(new_ptr);
934 _cpu = this_shard_id();
935
936 destroy(std::move(old_ptr), old_cpu);
937 }
938 /// Replace the managed pointer with a null value.
939 ///
940 /// The previous managed pointer is destroyed on its owner shard.
941 void reset(std::nullptr_t = nullptr) noexcept(std::is_nothrow_default_constructible_v<PtrType>) {
942 reset(PtrType());
943 }
944
945 /// Destroy the managed pointer.
946 ///
947 /// \returns a future that is resolved when managed pointer is destroyed on its owner shard.
948 future<> destroy() noexcept {
949 return destroy_on(std::move(_value), _cpu);
950 }
951 };
952
953 /// Wraps a raw or smart pointer object in a \ref foreign_ptr<>.
954 ///
955 /// \relates foreign_ptr
956 template <typename T>
957 foreign_ptr<T> make_foreign(T ptr) {
958 return foreign_ptr<T>(std::move(ptr));
959 }
960
961 /// @}
962
963 template<typename T>
964 struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
965
966 }