]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/sharded.hh
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / include / seastar / core / sharded.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22 #pragma once
23
24 #include <seastar/core/reactor.hh>
25 #include <seastar/core/future-util.hh>
26 #include <seastar/util/is_smart_ptr.hh>
27 #include <seastar/util/tuple_utils.hh>
28 #include <seastar/core/do_with.hh>
29 #include <boost/iterator/counting_iterator.hpp>
30 #include <functional>
31
32 /// \defgroup smp-module Multicore
33 ///
34 /// \brief Support for exploiting multiple cores on a server.
35 ///
36 /// Seastar supports multicore servers by using *sharding*. Each logical
37 /// core (lcore) runs a separate event loop, with its own memory allocator,
38 /// TCP/IP stack, and other services. Shards communicate by explicit message
39 /// passing, rather than using locks and condition variables as with traditional
40 /// threaded programming.
41
42 namespace seastar {
43
44 /// \addtogroup smp-module
45 /// @{
46
47 template <typename T>
48 class sharded;
49
50 /// If sharded service inherits from this class sharded::stop() will wait
51 /// until all references to a service on each shard will disappear before
52 /// returning. It is still service's own responsibility to track its references
53 /// in asynchronous code by calling shared_from_this() and keeping returned smart
54 /// pointer as long as object is in use.
55 template<typename T>
56 class async_sharded_service : public enable_shared_from_this<T> {
57 protected:
58 std::function<void()> _delete_cb;
59 virtual ~async_sharded_service() {
60 if (_delete_cb) {
61 _delete_cb();
62 }
63 }
64 template <typename Service> friend class sharded;
65 };
66
67
68 /// \brief Provide a sharded service with access to its peers
69 ///
70 /// If a service class inherits from this, it will gain a \code container()
71 /// \endcode method that provides access to the \ref sharded object, with which
72 /// it can call its peers.
73 template <typename Service>
74 class peering_sharded_service {
75 sharded<Service>* _container = nullptr;
76 private:
77 template <typename T> friend class sharded;
78 void set_container(sharded<Service>* container) { _container = container; }
79 public:
80 peering_sharded_service() = default;
81 peering_sharded_service(peering_sharded_service<Service>&&) = default;
82 peering_sharded_service(const peering_sharded_service<Service>&) = delete;
83 peering_sharded_service& operator=(const peering_sharded_service<Service>&) = delete;
84 sharded<Service>& container() { return *_container; }
85 const sharded<Service>& container() const { return *_container; }
86 };
87
88
89 /// Exception thrown when a \ref sharded object does not exist
90 class no_sharded_instance_exception : public std::exception {
91 public:
92 virtual const char* what() const noexcept override {
93 return "sharded instance does not exist";
94 }
95 };
96
97 /// Template helper to distribute a service across all logical cores.
98 ///
99 /// The \c sharded template manages a sharded service, by creating
100 /// a copy of the service on each logical core, providing mechanisms to communicate
101 /// with each shard's copy, and a way to stop the service.
102 ///
103 /// \tparam Service a class to be instantiated on each core. Must expose
104 /// a \c stop() method that returns a \c future<>, to be called when
105 /// the service is stopped.
106 template <typename Service>
107 class sharded {
108 struct entry {
109 shared_ptr<Service> service;
110 promise<> freed;
111 };
112 std::vector<entry> _instances;
113 private:
114 using invoke_on_all_func_type = std::function<future<> (Service&)>;
115 private:
116 void service_deleted() {
117 _instances[engine().cpu_id()].freed.set_value();
118 }
119 template <typename U, bool async>
120 friend struct shared_ptr_make_helper;
121
122 template <typename T>
123 std::enable_if_t<std::is_base_of<peering_sharded_service<T>, T>::value>
124 set_container(T& service) {
125 service.set_container(this);
126 }
127
128 template <typename T>
129 std::enable_if_t<!std::is_base_of<peering_sharded_service<T>, T>::value>
130 set_container(T& service) {
131 }
132 public:
133 /// Constructs an empty \c sharded object. No instances of the service are
134 /// created.
135 sharded() {}
136 sharded(const sharded& other) = delete;
137 /// Moves a \c sharded object.
138 sharded(sharded&& other) noexcept;
139 sharded& operator=(const sharded& other) = delete;
140 /// Moves a \c sharded object.
141 sharded& operator=(sharded&& other) = default;
142 /// Destroyes a \c sharded object. Must not be in a started state.
143 ~sharded();
144
145 /// Starts \c Service by constructing an instance on every logical core
146 /// with a copy of \c args passed to the constructor.
147 ///
148 /// \param args Arguments to be forwarded to \c Service constructor
149 /// \return a \ref future<> that becomes ready when all instances have been
150 /// constructed.
151 template <typename... Args>
152 future<> start(Args&&... args);
153
154 /// Starts \c Service by constructing an instance on a single logical core
155 /// with a copy of \c args passed to the constructor.
156 ///
157 /// \param args Arguments to be forwarded to \c Service constructor
158 /// \return a \ref future<> that becomes ready when the instance has been
159 /// constructed.
160 template <typename... Args>
161 future<> start_single(Args&&... args);
162
163 /// Stops all started instances and destroys them.
164 ///
165 /// For every started instance, its \c stop() method is called, and then
166 /// it is destroyed.
167 future<> stop();
168
169 /// Invoke a type-erased function on all instances of @Service.
170 /// The return value becomes ready when all instances have processed
171 /// the message.
172 ///
173 /// \param options the options to forward to the \ref smp::submit_to()
174 /// called behind the scenes.
175 /// \param func Function to be invoked on all shards
176 /// \return Future that becomes ready once all calls have completed
177 future<> invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func);
178
179 /// Invoke a type-erased function on all instances of @Service.
180 /// The return value becomes ready when all instances have processed
181 /// the message.
182 /// Passes the default \ref smp_submit_to_options to the
183 /// \ref smp::submit_to() called behind the scenes.
184 future<> invoke_on_all(std::function<future<> (Service&)> func) {
185 return invoke_on_all(smp_submit_to_options{}, std::move(func));
186 }
187
188 /// Invoke a method on all instances of @Service.
189 /// The return value becomes ready when all instances have processed
190 /// the message.
191 ///
192 /// \param options the options to forward to the \ref smp::submit_to()
193 /// called behind the scenes.
194 /// \param func Member function of \c Service to be invoked on all shards
195 /// \return Future that becomes ready once all calls have completed
196 template <typename... Args>
197 future<> invoke_on_all(smp_submit_to_options options, future<> (Service::*func)(Args...), Args... args);
198
199 /// Invoke a method on all instances of @Service.
200 /// The return value becomes ready when all instances have processed
201 /// the message.
202 /// Passes the default \ref smp_submit_to_options to the
203 /// \ref smp::submit_to() called behind the scenes.
204 template <typename... Args>
205 future<> invoke_on_all(future<> (Service::*func)(Args...), Args... args) {
206 return invoke_on_all(smp_submit_to_options{}, func, std::move(args)...);
207 }
208
209 /// Invoke a method on all \c Service instances in parallel.
210 ///
211 /// \param options the options to forward to the \ref smp::submit_to()
212 /// called behind the scenes.
213 /// \param func member function to be called. Must return \c void or
214 /// \c future<>.
215 /// \param args arguments to be passed to \c func.
216 /// \return future that becomes ready when the method has been invoked
217 /// on all instances.
218 template <typename... Args>
219 future<> invoke_on_all(smp_submit_to_options options, void (Service::*func)(Args...), Args... args);
220
221 /// Invoke a method on all \c Service instances in parallel.
222 ///
223 /// \param func member function to be called. Must return \c void or
224 /// \c future<>.
225 /// \param args arguments to be passed to \c func.
226 /// \return future that becomes ready when the method has been invoked
227 /// on all instances.
228 ///
229 /// Passes the default \ref smp_submit_to_options to the
230 /// \ref smp::submit_to() called behind the scenes.
231 template <typename... Args>
232 future<> invoke_on_all(void (Service::*func)(Args...), Args... args) {
233 return invoke_on_all(smp_submit_to_options{}, func, std::move(args)...);
234 }
235
236 /// Invoke a callable on all instances of \c Service.
237 ///
238 /// \param options the options to forward to the \ref smp::submit_to()
239 /// called behind the scenes.
240 /// \param func a callable with the signature `void (Service&)`
241 /// or `future<> (Service&)`, to be called on each core
242 /// with the local instance as an argument.
243 /// \return a `future<>` that becomes ready when all cores have
244 /// processed the message.
245 template <typename Func>
246 future<> invoke_on_all(smp_submit_to_options options, Func&& func);
247
248 /// Invoke a callable on all instances of \c Service.
249 ///
250 /// \param func a callable with the signature `void (Service&)`
251 /// or `future<> (Service&)`, to be called on each core
252 /// with the local instance as an argument.
253 /// \return a `future<>` that becomes ready when all cores have
254 /// processed the message.
255 ///
256 /// Passes the default \ref smp_submit_to_options to the
257 /// \ref smp::submit_to() called behind the scenes.
258 template <typename Func>
259 future<> invoke_on_all(Func&& func) {
260 return invoke_on_all(smp_submit_to_options{}, std::forward<Func>(func));
261 }
262
263 /// Invoke a callable on all instances of \c Service except the instance
264 /// which is allocated on current shard.
265 ///
266 /// \param options the options to forward to the \ref smp::submit_to()
267 /// called behind the scenes.
268 /// \param func a callable with the signature `void (Service&)`
269 /// or `future<> (Service&)`, to be called on each core
270 /// with the local instance as an argument.
271 /// \return a `future<>` that becomes ready when all cores but the current one have
272 /// processed the message.
273 template <typename Func>
274 future<> invoke_on_others(smp_submit_to_options options, Func&& func);
275
276 /// Invoke a callable on all instances of \c Service except the instance
277 /// which is allocated on current shard.
278 ///
279 /// \param func a callable with the signature `void (Service&)`
280 /// or `future<> (Service&)`, to be called on each core
281 /// with the local instance as an argument.
282 /// \return a `future<>` that becomes ready when all cores but the current one have
283 /// processed the message.
284 ///
285 /// Passes the default \ref smp_submit_to_options to the
286 /// \ref smp::submit_to() called behind the scenes.
287 template <typename Func>
288 future<> invoke_on_others(Func&& func) {
289 return invoke_on_others(smp_submit_to_options{}, std::forward<Func>(func));
290 }
291
292 /// Invoke a method on all instances of `Service` and reduce the results using
293 /// `Reducer`.
294 ///
295 /// \see map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
296 template <typename Reducer, typename Ret, typename... FuncArgs, typename... Args>
297 inline
298 auto
299 map_reduce(Reducer&& r, Ret (Service::*func)(FuncArgs...), Args&&... args)
300 -> typename reducer_traits<Reducer>::future_type
301 {
302 return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
303 boost::make_counting_iterator<unsigned>(_instances.size()),
304 [this, func, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
305 return smp::submit_to(c, [this, func, args] () mutable {
306 return apply([this, func] (Args&&... args) mutable {
307 auto inst = _instances[engine().cpu_id()].service;
308 if (inst) {
309 return ((*inst).*func)(std::forward<Args>(args)...);
310 } else {
311 throw no_sharded_instance_exception();
312 }
313 }, std::move(args));
314 });
315 }, std::forward<Reducer>(r));
316 }
317
318 /// Invoke a callable on all instances of `Service` and reduce the results using
319 /// `Reducer`.
320 ///
321 /// \see map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
322 template <typename Reducer, typename Func>
323 inline
324 auto map_reduce(Reducer&& r, Func&& func) -> typename reducer_traits<Reducer>::future_type
325 {
326 return ::seastar::map_reduce(boost::make_counting_iterator<unsigned>(0),
327 boost::make_counting_iterator<unsigned>(_instances.size()),
328 [this, &func] (unsigned c) mutable {
329 return smp::submit_to(c, [this, func] () mutable {
330 auto inst = get_local_service();
331 return func(*inst);
332 });
333 }, std::forward<Reducer>(r));
334 }
335
336 /// Applies a map function to all shards, then reduces the output by calling a reducer function.
337 ///
338 /// \param map callable with the signature `Value (Service&)` or
339 /// `future<Value> (Service&)` (for some `Value` type).
340 /// used as the second input to \c reduce
341 /// \param initial initial value used as the first input to \c reduce.
342 /// \param reduce binary function used to left-fold the return values of \c map
343 /// into \c initial .
344 ///
345 /// Each \c map invocation runs on the shard associated with the service.
346 ///
347 /// \tparam Mapper unary function taking `Service&` and producing some result.
348 /// \tparam Initial any value type
349 /// \tparam Reduce a binary function taking two Initial values and returning an Initial
350 /// \return Result of applying `map` to each instance in parallel, reduced by calling
351 /// `reduce()` on each adjacent pair of results.
352 template <typename Mapper, typename Initial, typename Reduce>
353 inline
354 future<Initial>
355 map_reduce0(Mapper map, Initial initial, Reduce reduce) {
356 auto wrapped_map = [this, map] (unsigned c) {
357 return smp::submit_to(c, [this, map] {
358 auto inst = get_local_service();
359 return map(*inst);
360 });
361 };
362 return ::seastar::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
363 std::move(wrapped_map),
364 std::move(initial),
365 std::move(reduce));
366 }
367
368 /// Applies a map function to all shards, and return a vector of the result.
369 ///
370 /// \param mapper callable with the signature `Value (Service&)` or
371 /// `future<Value> (Service&)` (for some `Value` type).
372 ///
373 /// Each \c map invocation runs on the shard associated with the service.
374 ///
375 /// \tparam Mapper unary function taking `Service&` and producing some result.
376 /// \return Result vector of applying `map` to each instance in parallel
377 template <typename Mapper, typename Future = futurize_t<std::result_of_t<Mapper(Service&)>>, typename return_type = decltype(internal::untuple(std::declval<typename Future::value_type>()))>
378 inline future<std::vector<return_type>> map(Mapper mapper) {
379 return do_with(std::vector<return_type>(),
380 [&mapper, this] (std::vector<return_type>& vec) mutable {
381 vec.resize(smp::count);
382 return parallel_for_each(boost::irange<unsigned>(0, _instances.size()), [this, &vec, mapper] (unsigned c) {
383 return smp::submit_to(c, [this, mapper] {
384 auto inst = get_local_service();
385 return mapper(*inst);
386 }).then([&vec, c] (auto res) {
387 vec[c] = res;
388 });
389 }).then([&vec] {
390 return make_ready_future<std::vector<return_type>>(std::move(vec));
391 });
392 });
393 }
394
395 /// Invoke a method on a specific instance of `Service`.
396 ///
397 /// \param id shard id to call
398 /// \param options the options to forward to the \ref smp::submit_to()
399 /// called behind the scenes.
400 /// \param func a method of `Service`
401 /// \param args arguments to be passed to `func`
402 /// \return result of calling `func(args)` on the designated instance
403 template <typename Ret, typename... FuncArgs, typename... Args, typename FutureRet = futurize_t<Ret>>
404 FutureRet
405 invoke_on(unsigned id, smp_submit_to_options options, Ret (Service::*func)(FuncArgs...), Args&&... args) {
406 using futurator = futurize<Ret>;
407 return smp::submit_to(id, options, [this, func, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
408 auto inst = get_local_service();
409 return futurator::apply(std::mem_fn(func), std::tuple_cat(std::make_tuple<>(inst), std::move(args)));
410 });
411 }
412
413 /// Invoke a method on a specific instance of `Service`.
414 ///
415 /// \param id shard id to call
416 /// \param func a method of `Service`
417 /// \param args arguments to be passed to `func`
418 /// \return result of calling `func(args)` on the designated instance
419 template <typename Ret, typename... FuncArgs, typename... Args, typename FutureRet = futurize_t<Ret>>
420 FutureRet
421 invoke_on(unsigned id, Ret (Service::*func)(FuncArgs...), Args&&... args) {
422 return invoke_on(id, smp_submit_to_options{}, func, std::forward<Args>(args)...);
423 }
424
425 /// Invoke a callable on a specific instance of `Service`.
426 ///
427 /// \param id shard id to call
428 /// \param options the options to forward to the \ref smp::submit_to()
429 /// called behind the scenes.
430 /// \param func a callable with signature `Value (Service&)` or
431 /// `future<Value> (Service&)` (for some `Value` type)
432 /// \return result of calling `func(instance)` on the designated instance
433 template <typename Func, typename Ret = futurize_t<std::result_of_t<Func(Service&)>>>
434 Ret
435 invoke_on(unsigned id, smp_submit_to_options options, Func&& func) {
436 return smp::submit_to(id, options, [this, func = std::forward<Func>(func)] () mutable {
437 auto inst = get_local_service();
438 return func(*inst);
439 });
440 }
441
442 /// Invoke a callable on a specific instance of `Service`.
443 ///
444 /// \param id shard id to call
445 /// \param func a callable with signature `Value (Service&)` or
446 /// `future<Value> (Service&)` (for some `Value` type)
447 /// \return result of calling `func(instance)` on the designated instance
448 template <typename Func, typename Ret = futurize_t<std::result_of_t<Func(Service&)>>>
449 Ret
450 invoke_on(unsigned id, Func&& func) {
451 return invoke_on(id, smp_submit_to_options{}, std::forward<Func>(func));
452 }
453
454 /// Gets a reference to the local instance.
455 const Service& local() const;
456
457 /// Gets a reference to the local instance.
458 Service& local();
459
460 /// Gets a shared pointer to the local instance.
461 shared_ptr<Service> local_shared();
462
463 /// Checks whether the local instance has been initialized.
464 bool local_is_initialized() const;
465
466 private:
467 void track_deletion(shared_ptr<Service>& s, std::false_type) {
468 // do not wait for instance to be deleted since it is not going to notify us
469 service_deleted();
470 }
471
472 void track_deletion(shared_ptr<Service>& s, std::true_type) {
473 s->_delete_cb = std::bind(std::mem_fn(&sharded<Service>::service_deleted), this);
474 }
475
476 template <typename... Args>
477 shared_ptr<Service> create_local_service(Args&&... args) {
478 auto s = ::seastar::make_shared<Service>(std::forward<Args>(args)...);
479 set_container(*s);
480 track_deletion(s, std::is_base_of<async_sharded_service<Service>, Service>());
481 return s;
482 }
483
484 shared_ptr<Service> get_local_service() {
485 auto inst = _instances[engine().cpu_id()].service;
486 if (!inst) {
487 throw no_sharded_instance_exception();
488 }
489 return inst;
490 }
491 };
492
493 /// @}
494
495 template <typename Service>
496 sharded<Service>::~sharded() {
497 assert(_instances.empty());
498 }
499
500 namespace internal {
501
502 template <typename Service>
503 class either_sharded_or_local {
504 sharded<Service>& _sharded;
505 public:
506 either_sharded_or_local(sharded<Service>& s) : _sharded(s) {}
507 operator sharded<Service>& () { return _sharded; }
508 operator Service& () { return _sharded.local(); }
509 };
510
511 template <typename T>
512 inline
513 T&&
514 unwrap_sharded_arg(T&& arg) {
515 return std::forward<T>(arg);
516 }
517
518 template <typename Service>
519 either_sharded_or_local<Service>
520 unwrap_sharded_arg(std::reference_wrapper<sharded<Service>> arg) {
521 return either_sharded_or_local<Service>(arg);
522 }
523
524 }
525
526 template <typename Service>
527 sharded<Service>::sharded(sharded&& x) noexcept : _instances(std::move(x._instances)) {
528 for (auto&& e : _instances) {
529 set_container(*e.service);
530 }
531 }
532
533 namespace internal {
534
535 using on_each_shard_func = std::function<future<> (unsigned shard)>;
536
537 future<> sharded_parallel_for_each(unsigned nr_shards, on_each_shard_func on_each_shard);
538
539 }
540
541 template <typename Service>
542 template <typename... Args>
543 future<>
544 sharded<Service>::start(Args&&... args) {
545 _instances.resize(smp::count);
546 return internal::sharded_parallel_for_each(_instances.size(),
547 [this, args = std::make_tuple(std::forward<Args>(args)...)] (unsigned c) mutable {
548 return smp::submit_to(c, [this, args] () mutable {
549 _instances[engine().cpu_id()].service = apply([this] (Args... args) {
550 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
551 }, args);
552 });
553 }).then_wrapped([this] (future<> f) {
554 try {
555 f.get();
556 return make_ready_future<>();
557 } catch (...) {
558 return this->stop().then([e = std::current_exception()] () mutable {
559 std::rethrow_exception(e);
560 });
561 }
562 });
563 }
564
565 template <typename Service>
566 template <typename... Args>
567 future<>
568 sharded<Service>::start_single(Args&&... args) {
569 assert(_instances.empty());
570 _instances.resize(1);
571 return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
572 _instances[0].service = apply([this] (Args... args) {
573 return create_local_service(internal::unwrap_sharded_arg(std::forward<Args>(args))...);
574 }, args);
575 }).then_wrapped([this] (future<> f) {
576 try {
577 f.get();
578 return make_ready_future<>();
579 } catch (...) {
580 return this->stop().then([e = std::current_exception()] () mutable {
581 std::rethrow_exception(e);
582 });
583 }
584 });
585 }
586
587 namespace internal {
588
589 // Helper check if Service::stop exists
590
591 struct sharded_has_stop {
592 // If a member names "stop" exists, try to call it, even if it doesn't
593 // have the correct signature. This is so that we don't ignore a function
594 // named stop() just because the signature is incorrect, and instead
595 // force the user to resolve the ambiguity.
596 template <typename Service>
597 constexpr static auto check(int) -> std::enable_if_t<(sizeof(&Service::stop) >= 0), bool> {
598 return true;
599 }
600
601 // Fallback in case Service::stop doesn't exist.
602 template<typename>
603 static constexpr auto check(...) -> bool {
604 return false;
605 }
606 };
607
608 template <bool stop_exists>
609 struct sharded_call_stop {
610 template <typename Service>
611 static future<> call(Service& instance);
612 };
613
614 template <>
615 template <typename Service>
616 inline
617 future<> sharded_call_stop<true>::call(Service& instance) {
618 return instance.stop();
619 }
620
621 template <>
622 template <typename Service>
623 inline
624 future<> sharded_call_stop<false>::call(Service& instance) {
625 return make_ready_future<>();
626 }
627
628 template <typename Service>
629 inline
630 future<>
631 stop_sharded_instance(Service& instance) {
632 constexpr bool has_stop = internal::sharded_has_stop::check<Service>(0);
633 return internal::sharded_call_stop<has_stop>::call(instance);
634 }
635
636 }
637
638 template <typename Service>
639 future<>
640 sharded<Service>::stop() {
641 return internal::sharded_parallel_for_each(_instances.size(), [this] (unsigned c) mutable {
642 return smp::submit_to(c, [this] () mutable {
643 auto inst = _instances[engine().cpu_id()].service;
644 if (!inst) {
645 return make_ready_future<>();
646 }
647 return internal::stop_sharded_instance(*inst);
648 });
649 }).then([this] {
650 return internal::sharded_parallel_for_each(_instances.size(), [this] (unsigned c) {
651 return smp::submit_to(c, [this] {
652 _instances[engine().cpu_id()].service = nullptr;
653 return _instances[engine().cpu_id()].freed.get_future();
654 });
655 });
656 }).then([this] {
657 _instances.clear();
658 _instances = std::vector<sharded<Service>::entry>();
659 });
660 }
661
662 template <typename Service>
663 future<>
664 sharded<Service>::invoke_on_all(smp_submit_to_options options, std::function<future<> (Service&)> func) {
665 return internal::sharded_parallel_for_each(_instances.size(), [this, options, func = std::move(func)] (unsigned c) {
666 return smp::submit_to(c, options, [this, func] {
667 return func(*get_local_service());
668 });
669 });
670 }
671
672 template <typename Service>
673 template <typename... Args>
674 inline
675 future<>
676 sharded<Service>::invoke_on_all(smp_submit_to_options options, future<> (Service::*func)(Args...), Args... args) {
677 return invoke_on_all(options, invoke_on_all_func_type([func, args...] (Service& service) mutable {
678 return (service.*func)(args...);
679 }));
680 }
681
682 template <typename Service>
683 template <typename... Args>
684 inline
685 future<>
686 sharded<Service>::invoke_on_all(smp_submit_to_options options, void (Service::*func)(Args...), Args... args) {
687 return invoke_on_all(options, invoke_on_all_func_type([func, args...] (Service& service) mutable {
688 (service.*func)(args...);
689 return make_ready_future<>();
690 }));
691 }
692
693 template <typename Service>
694 template <typename Func>
695 inline
696 future<>
697 sharded<Service>::invoke_on_all(smp_submit_to_options options, Func&& func) {
698 static_assert(std::is_same<futurize_t<std::result_of_t<Func(Service&)>>, future<>>::value,
699 "invoke_on_all()'s func must return void or future<>");
700 return invoke_on_all(options, invoke_on_all_func_type([func] (Service& service) mutable {
701 return futurize<void>::apply(func, service);
702 }));
703 }
704
705 template <typename Service>
706 template <typename Func>
707 inline
708 future<>
709 sharded<Service>::invoke_on_others(smp_submit_to_options options, Func&& func) {
710 static_assert(std::is_same<futurize_t<std::result_of_t<Func(Service&)>>, future<>>::value,
711 "invoke_on_others()'s func must return void or future<>");
712 return invoke_on_all(options, [orig = engine().cpu_id(), func = std::forward<Func>(func)] (auto& s) -> future<> {
713 return engine().cpu_id() == orig ? make_ready_future<>() : futurize_apply(func, s);
714 });
715 }
716
717 template <typename Service>
718 const Service& sharded<Service>::local() const {
719 assert(local_is_initialized());
720 return *_instances[engine().cpu_id()].service;
721 }
722
723 template <typename Service>
724 Service& sharded<Service>::local() {
725 assert(local_is_initialized());
726 return *_instances[engine().cpu_id()].service;
727 }
728
729 template <typename Service>
730 shared_ptr<Service> sharded<Service>::local_shared() {
731 assert(local_is_initialized());
732 return _instances[engine().cpu_id()].service;
733 }
734
735 template <typename Service>
736 inline bool sharded<Service>::local_is_initialized() const {
737 return _instances.size() > engine().cpu_id() &&
738 _instances[engine().cpu_id()].service;
739 }
740
741 /// \addtogroup smp-module
742 /// @{
743
744 /// Smart pointer wrapper which makes it safe to move across CPUs.
745 ///
746 /// \c foreign_ptr<> is a smart pointer wrapper which, unlike
747 /// \ref shared_ptr and \ref lw_shared_ptr, is safe to move to a
748 /// different core.
749 ///
750 /// As seastar avoids locking, any but the most trivial objects must
751 /// be destroyed on the same core they were created on, so that,
752 /// for example, their destructors can unlink references to the
753 /// object from various containers. In addition, for performance
754 /// reasons, the shared pointer types do not use atomic operations
755 /// to manage their reference counts. As a result they cannot be
756 /// used on multiple cores in parallel.
757 ///
758 /// \c foreign_ptr<> provides a solution to that problem.
759 /// \c foreign_ptr<> wraps any pointer type -- raw pointer,
760 /// \ref shared_ptr<>, or similar, and remembers on what core this
761 /// happened. When the \c foreign_ptr<> object is destroyed, it
762 /// sends a message to the original core so that the wrapped object
763 /// can be safely destroyed.
764 ///
765 /// \c foreign_ptr<> is a move-only object; it cannot be copied.
766 ///
767 template <typename PtrType>
768 class foreign_ptr {
769 private:
770 PtrType _value;
771 unsigned _cpu;
772 private:
773 void destroy(PtrType p, unsigned cpu) {
774 if (p && engine().cpu_id() != cpu) {
775 // `destroy()` is called from the destructor and other
776 // synchronous methods (like `reset()`), that have no way to
777 // wait for this future.
778 (void)smp::submit_to(cpu, [v = std::move(p)] () mutable {
779 // Destroy the contained pointer. We do this explicitly
780 // in the current shard, because the lambda is destroyed
781 // in the shard that submitted the task.
782 v = {};
783 });
784 }
785 }
786 public:
787 using element_type = typename std::pointer_traits<PtrType>::element_type;
788 using pointer = element_type*;
789
790 /// Constructs a null \c foreign_ptr<>.
791 foreign_ptr()
792 : _value(PtrType())
793 , _cpu(engine().cpu_id()) {
794 }
795 /// Constructs a null \c foreign_ptr<>.
796 foreign_ptr(std::nullptr_t) : foreign_ptr() {}
797 /// Wraps a pointer object and remembers the current core.
798 foreign_ptr(PtrType value)
799 : _value(std::move(value))
800 , _cpu(engine().cpu_id()) {
801 }
802 // The type is intentionally non-copyable because copies
803 // are expensive because each copy requires across-CPU call.
804 foreign_ptr(const foreign_ptr&) = delete;
805 /// Moves a \c foreign_ptr<> to another object.
806 foreign_ptr(foreign_ptr&& other) = default;
807 /// Destroys the wrapped object on its original cpu.
808 ~foreign_ptr() {
809 destroy(std::move(_value), _cpu);
810 }
811 /// Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
812 future<foreign_ptr> copy() const {
813 return smp::submit_to(_cpu, [this] () mutable {
814 auto v = _value;
815 return make_foreign(std::move(v));
816 });
817 }
818 /// Accesses the wrapped object.
819 element_type& operator*() const { return *_value; }
820 /// Accesses the wrapped object.
821 element_type* operator->() const { return &*_value; }
822 /// Access the raw pointer to the wrapped object.
823 pointer get() const { return &*_value; }
824 /// Return the owner-shard of this pointer.
825 ///
826 /// The owner shard of the pointer can change as a result of
827 /// move-assigment or a call to reset().
828 unsigned get_owner_shard() const { return _cpu; }
829 /// Checks whether the wrapped pointer is non-null.
830 operator bool() const { return static_cast<bool>(_value); }
831 /// Move-assigns a \c foreign_ptr<>.
832 foreign_ptr& operator=(foreign_ptr&& other) noexcept(std::is_nothrow_move_constructible<PtrType>::value) {
833 destroy(std::move(_value), _cpu);
834 _value = std::move(other._value);
835 _cpu = other._cpu;
836 return *this;
837 }
838 /// Releases the owned pointer
839 ///
840 /// Warning: the caller is now responsible for destroying the
841 /// pointer on its owner shard. This method is best called on the
842 /// owner shard to avoid accidents.
843 PtrType release() {
844 return std::exchange(_value, {});
845 }
846 /// Replace the managed pointer with new_ptr.
847 ///
848 /// The previous managed pointer is destroyed on its owner shard.
849 void reset(PtrType new_ptr) {
850 auto old_ptr = std::move(_value);
851 auto old_cpu = _cpu;
852
853 _value = std::move(new_ptr);
854 _cpu = engine().cpu_id();
855
856 destroy(std::move(old_ptr), old_cpu);
857 }
858 /// Replace the managed pointer with a null value.
859 ///
860 /// The previous managed pointer is destroyed on its owner shard.
861 void reset(std::nullptr_t = nullptr) {
862 reset(PtrType());
863 }
864 };
865
866 /// Wraps a raw or smart pointer object in a \ref foreign_ptr<>.
867 ///
868 /// \relates foreign_ptr
869 template <typename T>
870 foreign_ptr<T> make_foreign(T ptr) {
871 return foreign_ptr<T>(std::move(ptr));
872 }
873
874 /// @}
875
876 template<typename T>
877 struct is_smart_ptr<foreign_ptr<T>> : std::true_type {};
878
879 }