/// \cond internal
-template <typename T, bool IsFuture>
+template <typename T, typename Ptr, bool IsFuture>
struct reducer_with_get_traits;
-template <typename T>
-struct reducer_with_get_traits<T, false> {
+template <typename T, typename Ptr>
+struct reducer_with_get_traits<T, Ptr, false> {
using result_type = decltype(std::declval<T>().get());
using future_type = future<result_type>;
- static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
+ static future_type maybe_call_get(future<> f, Ptr r) {
return f.then([r = std::move(r)] () mutable {
- return make_ready_future<result_type>(std::move(*r).get());
+ return make_ready_future<result_type>(std::move(r->reducer).get());
});
}
};
-template <typename T>
-struct reducer_with_get_traits<T, true> {
+template <typename T, typename Ptr>
+struct reducer_with_get_traits<T, Ptr, true> {
using future_type = decltype(std::declval<T>().get());
- static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
+ static future_type maybe_call_get(future<> f, Ptr r) {
return f.then([r = std::move(r)] () mutable {
- return r->get();
+ return r->reducer.get();
}).then_wrapped([r] (future_type f) {
return f;
});
}
};
-template <typename T, typename V = void>
+template <typename T, typename Ptr = lw_shared_ptr<T>, typename V = void>
struct reducer_traits {
using future_type = future<>;
- static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
+ static future_type maybe_call_get(future<> f, Ptr r) {
return f.then([r = std::move(r)] {});
}
};
-template <typename T>
-struct reducer_traits<T, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {};
+template <typename T, typename Ptr>
+struct reducer_traits<T, Ptr, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, Ptr, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {};
/// \endcond
/// It may have a get() method which returns a value of type U which holds the result of reduction.
/// \return Th reduced value wrapped in a future.
/// If the reducer has no get() method then this function returns future<>.
+/// \note map-reduce() schedules all invocations of both \c mapper and \c reduce
+/// on the current shard. If you want to run a function on all shards in
+/// parallel, have a look at \ref smp::invoke_on_all() instead, or combine
+/// map_reduce() with \ref smp::submit_to().
+/// Sharded services have their own \ref sharded::map_reduce() which
+/// map-reduces across all shards.
// TODO: specialize for non-deferring reducer
template <typename Iterator, typename Mapper, typename Reducer>
map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
-> typename reducer_traits<Reducer>::future_type
{
- auto r_ptr = make_lw_shared(std::forward<Reducer>(r));
+ struct state {
+ Mapper mapper;
+ Reducer reducer;
+ };
+ auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::forward<Reducer>(r)});
future<> ret = make_ready_future<>();
while (begin != end) {
- ret = futurize_invoke(mapper, *begin++).then_wrapped([ret = std::move(ret), r_ptr] (auto f) mutable {
- return ret.then_wrapped([f = std::move(f), r_ptr] (auto rf) mutable {
+ ret = futurize_invoke(s->mapper, *begin++).then_wrapped([ret = std::move(ret), s] (auto f) mutable {
+ return ret.then_wrapped([f = std::move(f), s] (auto rf) mutable {
if (rf.failed()) {
f.ignore_ready_future();
- return std::move(rf);
+ return rf;
} else {
- return futurize_invoke(*r_ptr, std::move(f.get0()));
+ return futurize_invoke(s->reducer, std::move(f.get0()));
}
});
});
}
- return reducer_traits<Reducer>::maybe_call_get(std::move(ret), r_ptr);
+ return reducer_traits<Reducer, lw_shared_ptr<state>>::maybe_call_get(std::move(ret), s);
}
/// Asynchronous map/reduce transformation.
/// \param reduce binary function for merging two result values from \c mapper
///
/// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ...
+///
+/// \note map-reduce() schedules all invocations of both \c mapper and \c reduce
+/// on the current shard. If you want to run a function on all shards in
+/// parallel, have a look at \ref smp::invoke_on_all() instead, or combine
+/// map_reduce() with \ref smp::submit_to().
+/// Sharded services have their own \ref sharded::map_reduce() which
+/// map-reduces across all shards.
template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) {
*i++;
future<Initial>
map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) {
struct state {
+ Mapper mapper;
Initial result;
Reduce reduce;
};
- auto s = make_lw_shared(state{std::move(initial), std::move(reduce)});
+ auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::move(initial), std::move(reduce)});
future<> ret = make_ready_future<>();
while (begin != end) {
- ret = futurize_invoke(mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
+ ret = futurize_invoke(s->mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
try {
s->result = s->reduce(std::move(s->result), std::move(f.get0()));
return std::move(ret);
/// \param reduce binary function for merging two result values from \c mapper
///
/// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ...
+///
+/// \note map-reduce() schedules all invocations of both \c mapper and \c reduce
+/// on the current shard. If you want to run a function on all shards in
+/// parallel, have a look at \ref smp::invoke_on_all() instead, or combine
+/// map_reduce() with \ref smp::submit_to().
+/// Sharded services have their own \ref sharded::map_reduce() which
+/// map-reduces across all shards.
template <typename Range, typename Mapper, typename Initial, typename Reduce>
SEASTAR_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) {
std::begin(range);