]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/map_reduce.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / map_reduce.hh
CommitLineData
f67539c2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2020 ScyllaDB.
21 */
22
23#pragma once
24
25#include <iterator>
26
27#include <seastar/core/future.hh>
28#include <seastar/core/shared_ptr.hh>
29
30namespace seastar {
31
32/// \addtogroup future-util
33/// @{
34
35/// \cond internal
36
37template <typename T, bool IsFuture>
38struct reducer_with_get_traits;
39
40template <typename T>
41struct reducer_with_get_traits<T, false> {
42 using result_type = decltype(std::declval<T>().get());
43 using future_type = future<result_type>;
44 static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
45 return f.then([r = std::move(r)] () mutable {
46 return make_ready_future<result_type>(std::move(*r).get());
47 });
48 }
49};
50
51template <typename T>
52struct reducer_with_get_traits<T, true> {
53 using future_type = decltype(std::declval<T>().get());
54 static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
55 return f.then([r = std::move(r)] () mutable {
56 return r->get();
57 }).then_wrapped([r] (future_type f) {
58 return f;
59 });
60 }
61};
62
63template <typename T, typename V = void>
64struct reducer_traits {
65 using future_type = future<>;
66 static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) {
67 return f.then([r = std::move(r)] {});
68 }
69};
70
71template <typename T>
20effc67 72struct reducer_traits<T, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {};
f67539c2
TL
73
74/// \endcond
75
76/// Map a function over a range and reduce the result.
77///
78/// \param begin an \c InputIterator designating the beginning of the range
79/// \param end an \c InputIterator designating the end of the range
80/// \param mapper is a callable which transforms values from the iterator range into a future<T>
81/// \param r is an object which can be called with T as parameter and yields a future<>
82/// It may have a get() method which returns a value of type U which holds the result of reduction.
83/// \return Th reduced value wrapped in a future.
84/// If the reducer has no get() method then this function returns future<>.
85
86// TODO: specialize for non-deferring reducer
87template <typename Iterator, typename Mapper, typename Reducer>
88SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Reducer reduce) {
89 *i++;
90 { i != i } -> std::convertible_to<bool>;
91 mapper(*i);
92 reduce(futurize_invoke(mapper, *i).get0());
93} )
94inline
95auto
96map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
97 -> typename reducer_traits<Reducer>::future_type
98{
99 auto r_ptr = make_lw_shared(std::forward<Reducer>(r));
100 future<> ret = make_ready_future<>();
101 while (begin != end) {
102 ret = futurize_invoke(mapper, *begin++).then_wrapped([ret = std::move(ret), r_ptr] (auto f) mutable {
103 return ret.then_wrapped([f = std::move(f), r_ptr] (auto rf) mutable {
104 if (rf.failed()) {
105 f.ignore_ready_future();
106 return std::move(rf);
107 } else {
108 return futurize_invoke(*r_ptr, std::move(f.get0()));
109 }
110 });
111 });
112 }
113 return reducer_traits<Reducer>::maybe_call_get(std::move(ret), r_ptr);
114}
115
116/// Asynchronous map/reduce transformation.
117///
118/// Given a range of objects, an asynchronous unary function
119/// operating on these objects, an initial value, and a
120/// binary function for reducing, map_reduce() will
121/// transform each object in the range, then invoke
122/// the the reducing function with the result.
123///
124/// Example:
125///
126/// Calculate the total size of several files:
127///
128/// \code
129/// map_reduce(files.begin(), files.end(),
130/// std::mem_fn(file::size),
131/// size_t(0),
132/// std::plus<size_t>())
133/// \endcode
134///
135/// Requirements:
136/// - Iterator: an InputIterator.
137/// - Mapper: unary function taking Iterator::value_type and producing a future<...>.
138/// - Initial: any value type
139/// - Reduce: a binary function taking two Initial values and returning an Initial
140///
141/// Return type:
142/// - future<Initial>
143///
144/// \param begin beginning of object range to operate on
145/// \param end end of object range to operate on
146/// \param mapper map function to call on each object, returning a future
147/// \param initial initial input value to reduce function
148/// \param reduce binary function for merging two result values from \c mapper
149///
150/// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ...
151template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
152SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) {
153 *i++;
154 { i != i} -> std::convertible_to<bool>;
155 mapper(*i);
156 requires is_future<decltype(mapper(*i))>::value;
157 { reduce(std::move(initial), mapper(*i).get0()) } -> std::convertible_to<Initial>;
158} )
159inline
160future<Initial>
161map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) {
162 struct state {
163 Initial result;
164 Reduce reduce;
165 };
166 auto s = make_lw_shared(state{std::move(initial), std::move(reduce)});
167 future<> ret = make_ready_future<>();
168 while (begin != end) {
169 ret = futurize_invoke(mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable {
170 try {
171 s->result = s->reduce(std::move(s->result), std::move(f.get0()));
172 return std::move(ret);
173 } catch (...) {
174 return std::move(ret).then_wrapped([ex = std::current_exception()] (auto f) {
175 f.ignore_ready_future();
176 return make_exception_future<>(ex);
177 });
178 }
179 });
180 }
181 return ret.then([s] {
182 return make_ready_future<Initial>(std::move(s->result));
183 });
184}
185
186/// Asynchronous map/reduce transformation (range version).
187///
188/// Given a range of objects, an asynchronous unary function
189/// operating on these objects, an initial value, and a
190/// binary function for reducing, map_reduce() will
191/// transform each object in the range, then invoke
192/// the the reducing function with the result.
193///
194/// Example:
195///
196/// Calculate the total size of several files:
197///
198/// \code
199/// std::vector<file> files = ...;
200/// map_reduce(files,
201/// std::mem_fn(file::size),
202/// size_t(0),
203/// std::plus<size_t>())
204/// \endcode
205///
206/// Requirements:
207/// - Iterator: an InputIterator.
208/// - Mapper: unary function taking Iterator::value_type and producing a future<...>.
209/// - Initial: any value type
210/// - Reduce: a binary function taking two Initial values and returning an Initial
211///
212/// Return type:
213/// - future<Initial>
214///
215/// \param range object range to operate on
216/// \param mapper map function to call on each object, returning a future
217/// \param initial initial input value to reduce function
218/// \param reduce binary function for merging two result values from \c mapper
219///
220/// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ...
221template <typename Range, typename Mapper, typename Initial, typename Reduce>
222SEASTAR_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) {
223 std::begin(range);
224 std::end(range);
225 mapper(*std::begin(range));
226 requires is_future<std::remove_reference_t<decltype(mapper(*std::begin(range)))>>::value;
227 { reduce(std::move(initial), mapper(*std::begin(range)).get0()) } -> std::convertible_to<Initial>;
228} )
229inline
230future<Initial>
231map_reduce(Range&& range, Mapper&& mapper, Initial initial, Reduce reduce) {
232 return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
233 std::move(initial), std::move(reduce));
234}
235
236/// Implements @Reducer concept. Calculates the result by
237/// adding elements to the accumulator.
238template <typename Result, typename Addend = Result>
239class adder {
240private:
241 Result _result;
242public:
243 future<> operator()(const Addend& value) {
244 _result += value;
245 return make_ready_future<>();
246 }
247 Result get() && {
248 return std::move(_result);
249 }
250};
251
252/// @}
253
254} // namespace seastar