]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/include/seastar/core/map_reduce.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / map_reduce.hh
index 502a73ed93b198673665f96588c98c5dc4617ae0..96de11fe174c5af2b15568f876f4c8ea19eff31b 100644 (file)
@@ -34,42 +34,42 @@ namespace seastar {
 
 /// \cond internal
 
-template <typename T, bool IsFuture>
+template <typename T, typename Ptr, bool IsFuture>
 struct reducer_with_get_traits;
 
-template <typename T>
-struct reducer_with_get_traits<T, false> {
+template <typename T, typename Ptr>
+struct reducer_with_get_traits<T, Ptr, false> {
     using result_type = decltype(std::declval<T>().get());
     using future_type = future<result_type>;
-    static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
+    static future_type maybe_call_get(future<> f, Ptr r) {
         return f.then([r = std::move(r)] () mutable {
-            return make_ready_future<result_type>(std::move(*r).get());
+            return make_ready_future<result_type>(std::move(r->reducer).get());
         });
     }
 };
 
-template <typename T>
-struct reducer_with_get_traits<T, true> {
+template <typename T, typename Ptr>
+struct reducer_with_get_traits<T, Ptr, true> {
     using future_type = decltype(std::declval<T>().get());
-    static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
+    static future_type maybe_call_get(future<> f, Ptr r) {
         return f.then([r = std::move(r)] () mutable {
-            return r->get();
+            return r->reducer.get();
         }).then_wrapped([r] (future_type f) {
             return f;
         });
     }
 };
 
-template <typename T, typename V = void>
+template <typename T, typename Ptr = lw_shared_ptr<T>, typename V = void>
 struct reducer_traits {
     using future_type = future<>;
-    static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
+    static future_type maybe_call_get(future<> f, Ptr r) {
         return f.then([r = std::move(r)] {});
     }
 };
 
-template <typename T>
-struct reducer_traits<T, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {};
+template <typename T, typename Ptr>
+struct reducer_traits<T, Ptr, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, Ptr, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {};
 
 /// \endcond
 
@@ -82,6 +82,12 @@ struct reducer_traits<T, decltype(std::declval<T>().get(), void())> : public red
 ///     It may have a get() method which returns a value of type U which holds the result of reduction.
 /// \return Th reduced value wrapped in a future.
 ///     If the reducer has no get() method then this function returns future<>.
+/// \note map-reduce() schedules all invocations of both \c mapper and \c reduce
+///       on the current shard. If you want to run a function on all shards in
+///       parallel, have a look at \ref smp::invoke_on_all() instead, or combine
+///       map_reduce() with \ref smp::submit_to().
+///       Sharded services have their own \ref sharded::map_reduce() which
+///       map-reduces across all shards.
 
 // TODO: specialize for non-deferring reducer
 template <typename Iterator, typename Mapper, typename Reducer>
@@ -96,21 +102,25 @@ auto
 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
     -> typename reducer_traits<Reducer>::future_type
 {
-    auto r_ptr = make_lw_shared(std::forward<Reducer>(r));
+    struct state {
+        Mapper mapper;
+        Reducer reducer;
+    };
+    auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::forward<Reducer>(r)});
     future<> ret = make_ready_future<>();
     while (begin != end) {
-        ret = futurize_invoke(mapper, *begin++).then_wrapped([ret = std::move(ret), r_ptr] (auto f) mutable {
-            return ret.then_wrapped([f = std::move(f), r_ptr] (auto rf) mutable {
+        ret = futurize_invoke(s->mapper, *begin++).then_wrapped([ret = std::move(ret), s] (auto f) mutable {
+            return ret.then_wrapped([f = std::move(f), s] (auto rf) mutable {
                 if (rf.failed()) {
                     f.ignore_ready_future();
-                    return std::move(rf);
+                    return rf;
                 } else {
-                    return futurize_invoke(*r_ptr, std::move(f.get0()));
+                    return futurize_invoke(s->reducer, std::move(f.get0()));
                 }
             });
         });
     }
-    return reducer_traits<Reducer>::maybe_call_get(std::move(ret), r_ptr);
+    return reducer_traits<Reducer, lw_shared_ptr<state>>::maybe_call_get(std::move(ret), s);
 }
 
 /// Asynchronous map/reduce transformation.
@@ -148,6 +158,13 @@ map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
 /// \param reduce binary function for merging two result values from \c mapper
 ///
 /// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ...
+///
+/// \note map-reduce() schedules all invocations of both \c mapper and \c reduce
+///       on the current shard. If you want to run a function on all shards in
+///       parallel, have a look at \ref smp::invoke_on_all() instead, or combine
+///       map_reduce() with \ref smp::submit_to().
+///       Sharded services have their own \ref sharded::map_reduce() which
+///       map-reduces across all shards.
 template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
 SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) {
      *i++;
@@ -160,13 +177,14 @@ inline
 future<Initial>
 map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) {
     struct state {
+        Mapper mapper;
         Initial result;
         Reduce reduce;
     };
-    auto s = make_lw_shared(state{std::move(initial), std::move(reduce)});
+    auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::move(initial), std::move(reduce)});
     future<> ret = make_ready_future<>();
     while (begin != end) {
-        ret = futurize_invoke(mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
+        ret = futurize_invoke(s->mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
             try {
                 s->result = s->reduce(std::move(s->result), std::move(f.get0()));
                 return std::move(ret);
@@ -218,6 +236,13 @@ map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduc
 /// \param reduce binary function for merging two result values from \c mapper
 ///
 /// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ...
+///
+/// \note map-reduce() schedules all invocations of both \c mapper and \c reduce
+///       on the current shard. If you want to run a function on all shards in
+///       parallel, have a look at \ref smp::invoke_on_all() instead, or combine
+///       map_reduce() with \ref smp::submit_to().
+///       Sharded services have their own \ref sharded::map_reduce() which
+///       map-reduces across all shards.
 template <typename Range, typename Mapper, typename Initial, typename Reduce>
 SEASTAR_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) {
      std::begin(range);