]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/map_reduce.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / map_reduce.hh
CommitLineData
f67539c2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23#pragma once
24
25#include <iterator>
26
27#include <seastar/core/future.hh>
28#include <seastar/core/shared_ptr.hh>
29
30namespace seastar {
31
32/// \addtogroup future-util
33/// @{
34
35/// \cond internal
36
1e59de90 37template <typename T, typename Ptr, bool IsFuture>
f67539c2
TL
38struct reducer_with_get_traits;
39
1e59de90
TL
40template <typename T, typename Ptr>
41struct reducer_with_get_traits<T, Ptr, false> {
f67539c2
TL
42 using result_type = decltype(std::declval<T>().get());
43 using future_type = future<result_type>;
1e59de90 44 static future_type maybe_call_get(future<> f, Ptr r) {
f67539c2 45 return f.then([r = std::move(r)] () mutable {
1e59de90 46 return make_ready_future<result_type>(std::move(r->reducer).get());
f67539c2
TL
47 });
48 }
49};
50
1e59de90
TL
51template <typename T, typename Ptr>
52struct reducer_with_get_traits<T, Ptr, true> {
f67539c2 53 using future_type = decltype(std::declval<T>().get());
1e59de90 54 static future_type maybe_call_get(future<> f, Ptr r) {
f67539c2 55 return f.then([r = std::move(r)] () mutable {
1e59de90 56 return r->reducer.get();
f67539c2
TL
57 }).then_wrapped([r] (future_type f) {
58 return f;
59 });
60 }
61};
62
1e59de90 63template <typename T, typename Ptr = lw_shared_ptr<T>, typename V = void>
f67539c2
TL
64struct reducer_traits {
65 using future_type = future<>;
1e59de90 66 static future_type maybe_call_get(future<> f, Ptr r) {
f67539c2
TL
67 return f.then([r = std::move(r)] {});
68 }
69};
70
1e59de90
TL
71template <typename T, typename Ptr>
72struct reducer_traits<T, Ptr, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, Ptr, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {};
f67539c2
TL
73
74/// \endcond
75
76/// Map a function over a range and reduce the result.
77///
78/// \param begin an \c InputIterator designating the beginning of the range
79/// \param end an \c InputIterator designating the end of the range
80/// \param mapper is a callable which transforms values from the iterator range into a future<T>
81/// \param r is an object which can be called with T as parameter and yields a future<>
82/// It may have a get() method which returns a value of type U which holds the result of reduction.
83/// \return Th reduced value wrapped in a future.
84/// If the reducer has no get() method then this function returns future<>.
1e59de90
TL
85/// \note map-reduce() schedules all invocations of both \c mapper and \c reduce
86/// on the current shard. If you want to run a function on all shards in
87/// parallel, have a look at \ref smp::invoke_on_all() instead, or combine
88/// map_reduce() with \ref smp::submit_to().
89/// Sharded services have their own \ref sharded::map_reduce() which
90/// map-reduces across all shards.
f67539c2
TL
91
92// TODO: specialize for non-deferring reducer
93template <typename Iterator, typename Mapper, typename Reducer>
94SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Reducer reduce) {
95 *i++;
96 { i != i } -> std::convertible_to<bool>;
97 mapper(*i);
98 reduce(futurize_invoke(mapper, *i).get0());
99} )
100inline
101auto
102map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
103 -> typename reducer_traits<Reducer>::future_type
104{
1e59de90
TL
105 struct state {
106 Mapper mapper;
107 Reducer reducer;
108 };
109 auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::forward<Reducer>(r)});
f67539c2
TL
110 future<> ret = make_ready_future<>();
111 while (begin != end) {
1e59de90
TL
112 ret = futurize_invoke(s->mapper, *begin++).then_wrapped([ret = std::move(ret), s] (auto f) mutable {
113 return ret.then_wrapped([f = std::move(f), s] (auto rf) mutable {
f67539c2
TL
114 if (rf.failed()) {
115 f.ignore_ready_future();
1e59de90 116 return rf;
f67539c2 117 } else {
1e59de90 118 return futurize_invoke(s->reducer, std::move(f.get0()));
f67539c2
TL
119 }
120 });
121 });
122 }
1e59de90 123 return reducer_traits<Reducer, lw_shared_ptr<state>>::maybe_call_get(std::move(ret), s);
f67539c2
TL
124}
125
126/// Asynchronous map/reduce transformation.
127///
128/// Given a range of objects, an asynchronous unary function
129/// operating on these objects, an initial value, and a
130/// binary function for reducing, map_reduce() will
131/// transform each object in the range, then invoke
132/// the the reducing function with the result.
133///
134/// Example:
135///
136/// Calculate the total size of several files:
137///
138/// \code
139/// map_reduce(files.begin(), files.end(),
140/// std::mem_fn(file::size),
141/// size_t(0),
142/// std::plus<size_t>())
143/// \endcode
144///
145/// Requirements:
146/// - Iterator: an InputIterator.
147/// - Mapper: unary function taking Iterator::value_type and producing a future<...>.
148/// - Initial: any value type
149/// - Reduce: a binary function taking two Initial values and returning an Initial
150///
151/// Return type:
152/// - future<Initial>
153///
154/// \param begin beginning of object range to operate on
155/// \param end end of object range to operate on
156/// \param mapper map function to call on each object, returning a future
157/// \param initial initial input value to reduce function
158/// \param reduce binary function for merging two result values from \c mapper
159///
160/// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ...
1e59de90
TL
161///
162/// \note map-reduce() schedules all invocations of both \c mapper and \c reduce
163/// on the current shard. If you want to run a function on all shards in
164/// parallel, have a look at \ref smp::invoke_on_all() instead, or combine
165/// map_reduce() with \ref smp::submit_to().
166/// Sharded services have their own \ref sharded::map_reduce() which
167/// map-reduces across all shards.
f67539c2
TL
168template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
169SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) {
170 *i++;
171 { i != i} -> std::convertible_to<bool>;
172 mapper(*i);
173 requires is_future<decltype(mapper(*i))>::value;
174 { reduce(std::move(initial), mapper(*i).get0()) } -> std::convertible_to<Initial>;
175} )
176inline
177future<Initial>
178map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) {
179 struct state {
1e59de90 180 Mapper mapper;
f67539c2
TL
181 Initial result;
182 Reduce reduce;
183 };
1e59de90 184 auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::move(initial), std::move(reduce)});
f67539c2
TL
185 future<> ret = make_ready_future<>();
186 while (begin != end) {
1e59de90 187 ret = futurize_invoke(s->mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
f67539c2
TL
188 try {
189 s->result = s->reduce(std::move(s->result), std::move(f.get0()));
190 return std::move(ret);
191 } catch (...) {
192 return std::move(ret).then_wrapped([ex = std::current_exception()] (auto f) {
193 f.ignore_ready_future();
194 return make_exception_future<>(ex);
195 });
196 }
197 });
198 }
199 return ret.then([s] {
200 return make_ready_future<Initial>(std::move(s->result));
201 });
202}
203
204/// Asynchronous map/reduce transformation (range version).
205///
206/// Given a range of objects, an asynchronous unary function
207/// operating on these objects, an initial value, and a
208/// binary function for reducing, map_reduce() will
209/// transform each object in the range, then invoke
210/// the the reducing function with the result.
211///
212/// Example:
213///
214/// Calculate the total size of several files:
215///
216/// \code
217/// std::vector<file> files = ...;
218/// map_reduce(files,
219/// std::mem_fn(file::size),
220/// size_t(0),
221/// std::plus<size_t>())
222/// \endcode
223///
224/// Requirements:
225/// - Iterator: an InputIterator.
226/// - Mapper: unary function taking Iterator::value_type and producing a future<...>.
227/// - Initial: any value type
228/// - Reduce: a binary function taking two Initial values and returning an Initial
229///
230/// Return type:
231/// - future<Initial>
232///
233/// \param range object range to operate on
234/// \param mapper map function to call on each object, returning a future
235/// \param initial initial input value to reduce function
236/// \param reduce binary function for merging two result values from \c mapper
237///
238/// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ...
1e59de90
TL
239///
240/// \note map-reduce() schedules all invocations of both \c mapper and \c reduce
241/// on the current shard. If you want to run a function on all shards in
242/// parallel, have a look at \ref smp::invoke_on_all() instead, or combine
243/// map_reduce() with \ref smp::submit_to().
244/// Sharded services have their own \ref sharded::map_reduce() which
245/// map-reduces across all shards.
f67539c2
TL
246template <typename Range, typename Mapper, typename Initial, typename Reduce>
247SEASTAR_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) {
248 std::begin(range);
249 std::end(range);
250 mapper(*std::begin(range));
251 requires is_future<std::remove_reference_t<decltype(mapper(*std::begin(range)))>>::value;
252 { reduce(std::move(initial), mapper(*std::begin(range)).get0()) } -> std::convertible_to<Initial>;
253} )
254inline
255future<Initial>
256map_reduce(Range&& range, Mapper&& mapper, Initial initial, Reduce reduce) {
257 return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
258 std::move(initial), std::move(reduce));
259}
260
261/// Implements @Reducer concept. Calculates the result by
262/// adding elements to the accumulator.
263template <typename Result, typename Addend = Result>
264class adder {
265private:
266 Result _result;
267public:
268 future<> operator()(const Addend& value) {
269 _result += value;
270 return make_ready_future<>();
271 }
272 Result get() && {
273 return std::move(_result);
274 }
275};
276
277/// @}
278
279} // namespace seastar