]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2020 ScyllaDB. | |
21 | */ | |
22 | ||
23 | #pragma once | |
24 | ||
25 | #include <iterator> | |
26 | ||
27 | #include <seastar/core/future.hh> | |
28 | #include <seastar/core/shared_ptr.hh> | |
29 | ||
30 | namespace seastar { | |
31 | ||
32 | /// \addtogroup future-util | |
33 | /// @{ | |
34 | ||
35 | /// \cond internal | |
36 | ||
1e59de90 | 37 | template <typename T, typename Ptr, bool IsFuture> |
f67539c2 TL |
38 | struct reducer_with_get_traits; |
39 | ||
1e59de90 TL |
40 | template <typename T, typename Ptr> |
41 | struct reducer_with_get_traits<T, Ptr, false> { | |
f67539c2 TL |
42 | using result_type = decltype(std::declval<T>().get()); |
43 | using future_type = future<result_type>; | |
1e59de90 | 44 | static future_type maybe_call_get(future<> f, Ptr r) { |
f67539c2 | 45 | return f.then([r = std::move(r)] () mutable { |
1e59de90 | 46 | return make_ready_future<result_type>(std::move(r->reducer).get()); |
f67539c2 TL |
47 | }); |
48 | } | |
49 | }; | |
50 | ||
1e59de90 TL |
51 | template <typename T, typename Ptr> |
52 | struct reducer_with_get_traits<T, Ptr, true> { | |
f67539c2 | 53 | using future_type = decltype(std::declval<T>().get()); |
1e59de90 | 54 | static future_type maybe_call_get(future<> f, Ptr r) { |
f67539c2 | 55 | return f.then([r = std::move(r)] () mutable { |
1e59de90 | 56 | return r->reducer.get(); |
f67539c2 TL |
57 | }).then_wrapped([r] (future_type f) { |
58 | return f; | |
59 | }); | |
60 | } | |
61 | }; | |
62 | ||
1e59de90 | 63 | template <typename T, typename Ptr = lw_shared_ptr<T>, typename V = void> |
f67539c2 TL |
64 | struct reducer_traits { |
65 | using future_type = future<>; | |
1e59de90 | 66 | static future_type maybe_call_get(future<> f, Ptr r) { |
f67539c2 TL |
67 | return f.then([r = std::move(r)] {}); |
68 | } | |
69 | }; | |
70 | ||
1e59de90 TL |
71 | template <typename T, typename Ptr> |
72 | struct reducer_traits<T, Ptr, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, Ptr, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {}; | |
f67539c2 TL |
73 | |
74 | /// \endcond | |
75 | ||
76 | /// Map a function over a range and reduce the result. | |
77 | /// | |
78 | /// \param begin an \c InputIterator designating the beginning of the range | |
79 | /// \param end an \c InputIterator designating the end of the range | |
80 | /// \param mapper is a callable which transforms values from the iterator range into a future<T> | |
81 | /// \param r is an object which can be called with T as parameter and yields a future<> | |
82 | /// It may have a get() method which returns a value of type U which holds the result of reduction. | |
83 | /// \return Th reduced value wrapped in a future. | |
84 | /// If the reducer has no get() method then this function returns future<>. | |
1e59de90 TL |
85 | /// \note map-reduce() schedules all invocations of both \c mapper and \c reduce |
86 | /// on the current shard. If you want to run a function on all shards in | |
87 | /// parallel, have a look at \ref smp::invoke_on_all() instead, or combine | |
88 | /// map_reduce() with \ref smp::submit_to(). | |
89 | /// Sharded services have their own \ref sharded::map_reduce() which | |
90 | /// map-reduces across all shards. | |
f67539c2 TL |
91 | |
92 | // TODO: specialize for non-deferring reducer | |
93 | template <typename Iterator, typename Mapper, typename Reducer> | |
94 | SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Reducer reduce) { | |
95 | *i++; | |
96 | { i != i } -> std::convertible_to<bool>; | |
97 | mapper(*i); | |
98 | reduce(futurize_invoke(mapper, *i).get0()); | |
99 | } ) | |
100 | inline | |
101 | auto | |
102 | map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r) | |
103 | -> typename reducer_traits<Reducer>::future_type | |
104 | { | |
1e59de90 TL |
105 | struct state { |
106 | Mapper mapper; | |
107 | Reducer reducer; | |
108 | }; | |
109 | auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::forward<Reducer>(r)}); | |
f67539c2 TL |
110 | future<> ret = make_ready_future<>(); |
111 | while (begin != end) { | |
1e59de90 TL |
112 | ret = futurize_invoke(s->mapper, *begin++).then_wrapped([ret = std::move(ret), s] (auto f) mutable { |
113 | return ret.then_wrapped([f = std::move(f), s] (auto rf) mutable { | |
f67539c2 TL |
114 | if (rf.failed()) { |
115 | f.ignore_ready_future(); | |
1e59de90 | 116 | return rf; |
f67539c2 | 117 | } else { |
1e59de90 | 118 | return futurize_invoke(s->reducer, std::move(f.get0())); |
f67539c2 TL |
119 | } |
120 | }); | |
121 | }); | |
122 | } | |
1e59de90 | 123 | return reducer_traits<Reducer, lw_shared_ptr<state>>::maybe_call_get(std::move(ret), s); |
f67539c2 TL |
124 | } |
125 | ||
126 | /// Asynchronous map/reduce transformation. | |
127 | /// | |
128 | /// Given a range of objects, an asynchronous unary function | |
129 | /// operating on these objects, an initial value, and a | |
130 | /// binary function for reducing, map_reduce() will | |
131 | /// transform each object in the range, then invoke | |
132 | /// the the reducing function with the result. | |
133 | /// | |
134 | /// Example: | |
135 | /// | |
136 | /// Calculate the total size of several files: | |
137 | /// | |
138 | /// \code | |
139 | /// map_reduce(files.begin(), files.end(), | |
140 | /// std::mem_fn(file::size), | |
141 | /// size_t(0), | |
142 | /// std::plus<size_t>()) | |
143 | /// \endcode | |
144 | /// | |
145 | /// Requirements: | |
146 | /// - Iterator: an InputIterator. | |
147 | /// - Mapper: unary function taking Iterator::value_type and producing a future<...>. | |
148 | /// - Initial: any value type | |
149 | /// - Reduce: a binary function taking two Initial values and returning an Initial | |
150 | /// | |
151 | /// Return type: | |
152 | /// - future<Initial> | |
153 | /// | |
154 | /// \param begin beginning of object range to operate on | |
155 | /// \param end end of object range to operate on | |
156 | /// \param mapper map function to call on each object, returning a future | |
157 | /// \param initial initial input value to reduce function | |
158 | /// \param reduce binary function for merging two result values from \c mapper | |
159 | /// | |
160 | /// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ... | |
1e59de90 TL |
161 | /// |
162 | /// \note map-reduce() schedules all invocations of both \c mapper and \c reduce | |
163 | /// on the current shard. If you want to run a function on all shards in | |
164 | /// parallel, have a look at \ref smp::invoke_on_all() instead, or combine | |
165 | /// map_reduce() with \ref smp::submit_to(). | |
166 | /// Sharded services have their own \ref sharded::map_reduce() which | |
167 | /// map-reduces across all shards. | |
f67539c2 TL |
168 | template <typename Iterator, typename Mapper, typename Initial, typename Reduce> |
169 | SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) { | |
170 | *i++; | |
171 | { i != i} -> std::convertible_to<bool>; | |
172 | mapper(*i); | |
173 | requires is_future<decltype(mapper(*i))>::value; | |
174 | { reduce(std::move(initial), mapper(*i).get0()) } -> std::convertible_to<Initial>; | |
175 | } ) | |
176 | inline | |
177 | future<Initial> | |
178 | map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) { | |
179 | struct state { | |
1e59de90 | 180 | Mapper mapper; |
f67539c2 TL |
181 | Initial result; |
182 | Reduce reduce; | |
183 | }; | |
1e59de90 | 184 | auto s = make_lw_shared(state{std::forward<Mapper>(mapper), std::move(initial), std::move(reduce)}); |
f67539c2 TL |
185 | future<> ret = make_ready_future<>(); |
186 | while (begin != end) { | |
1e59de90 | 187 | ret = futurize_invoke(s->mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable { |
f67539c2 TL |
188 | try { |
189 | s->result = s->reduce(std::move(s->result), std::move(f.get0())); | |
190 | return std::move(ret); | |
191 | } catch (...) { | |
192 | return std::move(ret).then_wrapped([ex = std::current_exception()] (auto f) { | |
193 | f.ignore_ready_future(); | |
194 | return make_exception_future<>(ex); | |
195 | }); | |
196 | } | |
197 | }); | |
198 | } | |
199 | return ret.then([s] { | |
200 | return make_ready_future<Initial>(std::move(s->result)); | |
201 | }); | |
202 | } | |
203 | ||
204 | /// Asynchronous map/reduce transformation (range version). | |
205 | /// | |
206 | /// Given a range of objects, an asynchronous unary function | |
207 | /// operating on these objects, an initial value, and a | |
208 | /// binary function for reducing, map_reduce() will | |
209 | /// transform each object in the range, then invoke | |
210 | /// the the reducing function with the result. | |
211 | /// | |
212 | /// Example: | |
213 | /// | |
214 | /// Calculate the total size of several files: | |
215 | /// | |
216 | /// \code | |
217 | /// std::vector<file> files = ...; | |
218 | /// map_reduce(files, | |
219 | /// std::mem_fn(file::size), | |
220 | /// size_t(0), | |
221 | /// std::plus<size_t>()) | |
222 | /// \endcode | |
223 | /// | |
224 | /// Requirements: | |
225 | /// - Iterator: an InputIterator. | |
226 | /// - Mapper: unary function taking Iterator::value_type and producing a future<...>. | |
227 | /// - Initial: any value type | |
228 | /// - Reduce: a binary function taking two Initial values and returning an Initial | |
229 | /// | |
230 | /// Return type: | |
231 | /// - future<Initial> | |
232 | /// | |
233 | /// \param range object range to operate on | |
234 | /// \param mapper map function to call on each object, returning a future | |
235 | /// \param initial initial input value to reduce function | |
236 | /// \param reduce binary function for merging two result values from \c mapper | |
237 | /// | |
238 | /// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ... | |
1e59de90 TL |
239 | /// |
240 | /// \note map-reduce() schedules all invocations of both \c mapper and \c reduce | |
241 | /// on the current shard. If you want to run a function on all shards in | |
242 | /// parallel, have a look at \ref smp::invoke_on_all() instead, or combine | |
243 | /// map_reduce() with \ref smp::submit_to(). | |
244 | /// Sharded services have their own \ref sharded::map_reduce() which | |
245 | /// map-reduces across all shards. | |
f67539c2 TL |
246 | template <typename Range, typename Mapper, typename Initial, typename Reduce> |
247 | SEASTAR_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) { | |
248 | std::begin(range); | |
249 | std::end(range); | |
250 | mapper(*std::begin(range)); | |
251 | requires is_future<std::remove_reference_t<decltype(mapper(*std::begin(range)))>>::value; | |
252 | { reduce(std::move(initial), mapper(*std::begin(range)).get0()) } -> std::convertible_to<Initial>; | |
253 | } ) | |
254 | inline | |
255 | future<Initial> | |
256 | map_reduce(Range&& range, Mapper&& mapper, Initial initial, Reduce reduce) { | |
257 | return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper), | |
258 | std::move(initial), std::move(reduce)); | |
259 | } | |
260 | ||
261 | /// Implements @Reducer concept. Calculates the result by | |
262 | /// adding elements to the accumulator. | |
263 | template <typename Result, typename Addend = Result> | |
264 | class adder { | |
265 | private: | |
266 | Result _result; | |
267 | public: | |
268 | future<> operator()(const Addend& value) { | |
269 | _result += value; | |
270 | return make_ready_future<>(); | |
271 | } | |
272 | Result get() && { | |
273 | return std::move(_result); | |
274 | } | |
275 | }; | |
276 | ||
277 | /// @} | |
278 | ||
279 | } // namespace seastar |