]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2020 ScyllaDB. | |
21 | */ | |
22 | ||
23 | #pragma once | |
24 | ||
25 | #include <iterator> | |
26 | ||
27 | #include <seastar/core/future.hh> | |
28 | #include <seastar/core/shared_ptr.hh> | |
29 | ||
30 | namespace seastar { | |
31 | ||
32 | /// \addtogroup future-util | |
33 | /// @{ | |
34 | ||
35 | /// \cond internal | |
36 | ||
37 | template <typename T, bool IsFuture> | |
38 | struct reducer_with_get_traits; | |
39 | ||
40 | template <typename T> | |
41 | struct reducer_with_get_traits<T, false> { | |
42 | using result_type = decltype(std::declval<T>().get()); | |
43 | using future_type = future<result_type>; | |
44 | static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) { | |
45 | return f.then([r = std::move(r)] () mutable { | |
46 | return make_ready_future<result_type>(std::move(*r).get()); | |
47 | }); | |
48 | } | |
49 | }; | |
50 | ||
51 | template <typename T> | |
52 | struct reducer_with_get_traits<T, true> { | |
53 | using future_type = decltype(std::declval<T>().get()); | |
54 | static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) { | |
55 | return f.then([r = std::move(r)] () mutable { | |
56 | return r->get(); | |
57 | }).then_wrapped([r] (future_type f) { | |
58 | return f; | |
59 | }); | |
60 | } | |
61 | }; | |
62 | ||
63 | template <typename T, typename V = void> | |
64 | struct reducer_traits { | |
65 | using future_type = future<>; | |
66 | static future_type maybe_call_get(future<> f, lw_shared_ptr<T> r) { | |
67 | return f.then([r = std::move(r)] {}); | |
68 | } | |
69 | }; | |
70 | ||
71 | template <typename T> | |
20effc67 | 72 | struct reducer_traits<T, decltype(std::declval<T>().get(), void())> : public reducer_with_get_traits<T, is_future<std::invoke_result_t<decltype(&T::get),T>>::value> {}; |
f67539c2 TL |
73 | |
74 | /// \endcond | |
75 | ||
76 | /// Map a function over a range and reduce the result. | |
77 | /// | |
78 | /// \param begin an \c InputIterator designating the beginning of the range | |
79 | /// \param end an \c InputIterator designating the end of the range | |
80 | /// \param mapper is a callable which transforms values from the iterator range into a future<T> | |
81 | /// \param r is an object which can be called with T as parameter and yields a future<> | |
82 | /// It may have a get() method which returns a value of type U which holds the result of reduction. | |
83 | /// \return Th reduced value wrapped in a future. | |
84 | /// If the reducer has no get() method then this function returns future<>. | |
85 | ||
86 | // TODO: specialize for non-deferring reducer | |
87 | template <typename Iterator, typename Mapper, typename Reducer> | |
88 | SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Reducer reduce) { | |
89 | *i++; | |
90 | { i != i } -> std::convertible_to<bool>; | |
91 | mapper(*i); | |
92 | reduce(futurize_invoke(mapper, *i).get0()); | |
93 | } ) | |
94 | inline | |
95 | auto | |
96 | map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r) | |
97 | -> typename reducer_traits<Reducer>::future_type | |
98 | { | |
99 | auto r_ptr = make_lw_shared(std::forward<Reducer>(r)); | |
100 | future<> ret = make_ready_future<>(); | |
101 | while (begin != end) { | |
102 | ret = futurize_invoke(mapper, *begin++).then_wrapped([ret = std::move(ret), r_ptr] (auto f) mutable { | |
103 | return ret.then_wrapped([f = std::move(f), r_ptr] (auto rf) mutable { | |
104 | if (rf.failed()) { | |
105 | f.ignore_ready_future(); | |
106 | return std::move(rf); | |
107 | } else { | |
108 | return futurize_invoke(*r_ptr, std::move(f.get0())); | |
109 | } | |
110 | }); | |
111 | }); | |
112 | } | |
113 | return reducer_traits<Reducer>::maybe_call_get(std::move(ret), r_ptr); | |
114 | } | |
115 | ||
116 | /// Asynchronous map/reduce transformation. | |
117 | /// | |
118 | /// Given a range of objects, an asynchronous unary function | |
119 | /// operating on these objects, an initial value, and a | |
120 | /// binary function for reducing, map_reduce() will | |
121 | /// transform each object in the range, then invoke | |
122 | /// the the reducing function with the result. | |
123 | /// | |
124 | /// Example: | |
125 | /// | |
126 | /// Calculate the total size of several files: | |
127 | /// | |
128 | /// \code | |
129 | /// map_reduce(files.begin(), files.end(), | |
130 | /// std::mem_fn(file::size), | |
131 | /// size_t(0), | |
132 | /// std::plus<size_t>()) | |
133 | /// \endcode | |
134 | /// | |
135 | /// Requirements: | |
136 | /// - Iterator: an InputIterator. | |
137 | /// - Mapper: unary function taking Iterator::value_type and producing a future<...>. | |
138 | /// - Initial: any value type | |
139 | /// - Reduce: a binary function taking two Initial values and returning an Initial | |
140 | /// | |
141 | /// Return type: | |
142 | /// - future<Initial> | |
143 | /// | |
144 | /// \param begin beginning of object range to operate on | |
145 | /// \param end end of object range to operate on | |
146 | /// \param mapper map function to call on each object, returning a future | |
147 | /// \param initial initial input value to reduce function | |
148 | /// \param reduce binary function for merging two result values from \c mapper | |
149 | /// | |
150 | /// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ... | |
151 | template <typename Iterator, typename Mapper, typename Initial, typename Reduce> | |
152 | SEASTAR_CONCEPT( requires requires (Iterator i, Mapper mapper, Initial initial, Reduce reduce) { | |
153 | *i++; | |
154 | { i != i} -> std::convertible_to<bool>; | |
155 | mapper(*i); | |
156 | requires is_future<decltype(mapper(*i))>::value; | |
157 | { reduce(std::move(initial), mapper(*i).get0()) } -> std::convertible_to<Initial>; | |
158 | } ) | |
159 | inline | |
160 | future<Initial> | |
161 | map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce reduce) { | |
162 | struct state { | |
163 | Initial result; | |
164 | Reduce reduce; | |
165 | }; | |
166 | auto s = make_lw_shared(state{std::move(initial), std::move(reduce)}); | |
167 | future<> ret = make_ready_future<>(); | |
168 | while (begin != end) { | |
169 | ret = futurize_invoke(mapper, *begin++).then_wrapped([s = s.get(), ret = std::move(ret)] (auto f) mutable { | |
170 | try { | |
171 | s->result = s->reduce(std::move(s->result), std::move(f.get0())); | |
172 | return std::move(ret); | |
173 | } catch (...) { | |
174 | return std::move(ret).then_wrapped([ex = std::current_exception()] (auto f) { | |
175 | f.ignore_ready_future(); | |
176 | return make_exception_future<>(ex); | |
177 | }); | |
178 | } | |
179 | }); | |
180 | } | |
181 | return ret.then([s] { | |
182 | return make_ready_future<Initial>(std::move(s->result)); | |
183 | }); | |
184 | } | |
185 | ||
186 | /// Asynchronous map/reduce transformation (range version). | |
187 | /// | |
188 | /// Given a range of objects, an asynchronous unary function | |
189 | /// operating on these objects, an initial value, and a | |
190 | /// binary function for reducing, map_reduce() will | |
191 | /// transform each object in the range, then invoke | |
192 | /// the the reducing function with the result. | |
193 | /// | |
194 | /// Example: | |
195 | /// | |
196 | /// Calculate the total size of several files: | |
197 | /// | |
198 | /// \code | |
199 | /// std::vector<file> files = ...; | |
200 | /// map_reduce(files, | |
201 | /// std::mem_fn(file::size), | |
202 | /// size_t(0), | |
203 | /// std::plus<size_t>()) | |
204 | /// \endcode | |
205 | /// | |
206 | /// Requirements: | |
207 | /// - Iterator: an InputIterator. | |
208 | /// - Mapper: unary function taking Iterator::value_type and producing a future<...>. | |
209 | /// - Initial: any value type | |
210 | /// - Reduce: a binary function taking two Initial values and returning an Initial | |
211 | /// | |
212 | /// Return type: | |
213 | /// - future<Initial> | |
214 | /// | |
215 | /// \param range object range to operate on | |
216 | /// \param mapper map function to call on each object, returning a future | |
217 | /// \param initial initial input value to reduce function | |
218 | /// \param reduce binary function for merging two result values from \c mapper | |
219 | /// | |
220 | /// \return equivalent to \c reduce(reduce(initial, mapper(obj0)), mapper(obj1)) ... | |
221 | template <typename Range, typename Mapper, typename Initial, typename Reduce> | |
222 | SEASTAR_CONCEPT( requires requires (Range range, Mapper mapper, Initial initial, Reduce reduce) { | |
223 | std::begin(range); | |
224 | std::end(range); | |
225 | mapper(*std::begin(range)); | |
226 | requires is_future<std::remove_reference_t<decltype(mapper(*std::begin(range)))>>::value; | |
227 | { reduce(std::move(initial), mapper(*std::begin(range)).get0()) } -> std::convertible_to<Initial>; | |
228 | } ) | |
229 | inline | |
230 | future<Initial> | |
231 | map_reduce(Range&& range, Mapper&& mapper, Initial initial, Reduce reduce) { | |
232 | return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper), | |
233 | std::move(initial), std::move(reduce)); | |
234 | } | |
235 | ||
236 | /// Implements @Reducer concept. Calculates the result by | |
237 | /// adding elements to the accumulator. | |
238 | template <typename Result, typename Addend = Result> | |
239 | class adder { | |
240 | private: | |
241 | Result _result; | |
242 | public: | |
243 | future<> operator()(const Addend& value) { | |
244 | _result += value; | |
245 | return make_ready_future<>(); | |
246 | } | |
247 | Result get() && { | |
248 | return std::move(_result); | |
249 | } | |
250 | }; | |
251 | ||
252 | /// @} | |
253 | ||
254 | } // namespace seastar |