]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2020 ScyllaDB. | |
21 | */ | |
22 | ||
23 | #pragma once | |
24 | ||
1e59de90 | 25 | #include <cstddef> |
f67539c2 TL |
26 | #include <iterator> |
27 | #include <memory> | |
1e59de90 | 28 | #include <type_traits> |
f67539c2 TL |
29 | #include <vector> |
30 | ||
31 | #include <seastar/core/future.hh> | |
32 | #include <seastar/core/task.hh> | |
33 | #include <seastar/util/bool_class.hh> | |
34 | #include <seastar/core/semaphore.hh> | |
35 | ||
36 | namespace seastar { | |
37 | ||
38 | /// \addtogroup future-util | |
39 | /// @{ | |
40 | ||
41 | // The AsyncAction concept represents an action which can complete later than | |
42 | // the actual function invocation. It is represented by a function which | |
43 | // returns a future which resolves when the action is done. | |
44 | ||
45 | struct stop_iteration_tag { }; | |
46 | using stop_iteration = bool_class<stop_iteration_tag>; | |
47 | ||
48 | namespace internal { | |
49 | ||
50 | template <typename AsyncAction> | |
51 | class repeater final : public continuation_base<stop_iteration> { | |
52 | promise<> _promise; | |
53 | AsyncAction _action; | |
54 | public: | |
55 | explicit repeater(AsyncAction&& action) : _action(std::move(action)) {} | |
56 | future<> get_future() { return _promise.get_future(); } | |
57 | task* waiting_task() noexcept override { return _promise.waiting_task(); } | |
58 | virtual void run_and_dispose() noexcept override { | |
59 | if (_state.failed()) { | |
60 | _promise.set_exception(std::move(_state).get_exception()); | |
61 | delete this; | |
62 | return; | |
63 | } else { | |
64 | if (_state.get0() == stop_iteration::yes) { | |
65 | _promise.set_value(); | |
66 | delete this; | |
67 | return; | |
68 | } | |
69 | _state = {}; | |
70 | } | |
71 | try { | |
72 | do { | |
73 | auto f = futurize_invoke(_action); | |
74 | if (!f.available()) { | |
1e59de90 | 75 | internal::set_callback(std::move(f), this); |
f67539c2 TL |
76 | return; |
77 | } | |
78 | if (f.get0() == stop_iteration::yes) { | |
79 | _promise.set_value(); | |
80 | delete this; | |
81 | return; | |
82 | } | |
83 | } while (!need_preempt()); | |
84 | } catch (...) { | |
85 | _promise.set_exception(std::current_exception()); | |
86 | delete this; | |
87 | return; | |
88 | } | |
89 | _state.set(stop_iteration::no); | |
90 | schedule(this); | |
91 | } | |
92 | }; | |
93 | ||
94 | } // namespace internal | |
95 | ||
96 | // Delete these overloads so that the actual implementation can use a | |
97 | // universal reference but still reject lvalue references. | |
98 | template<typename AsyncAction> | |
99 | future<> repeat(const AsyncAction& action) noexcept = delete; | |
100 | template<typename AsyncAction> | |
101 | future<> repeat(AsyncAction& action) noexcept = delete; | |
102 | ||
103 | /// Invokes given action until it fails or the function requests iteration to stop by returning | |
104 | /// \c stop_iteration::yes. | |
105 | /// | |
106 | /// \param action a callable taking no arguments, returning a future<stop_iteration>. Will | |
107 | /// be called again as soon as the future resolves, unless the | |
108 | /// future fails, action throws, or it resolves with \c stop_iteration::yes. | |
109 | /// If \c action is an r-value it can be moved in the middle of iteration. | |
110 | /// \return a ready future if we stopped successfully, or a failed future if | |
111 | /// a call to to \c action failed. | |
112 | template<typename AsyncAction> | |
113 | SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, stop_iteration> || seastar::InvokeReturns<AsyncAction, future<stop_iteration>> ) | |
114 | inline | |
115 | future<> repeat(AsyncAction&& action) noexcept { | |
20effc67 | 116 | using futurator = futurize<std::invoke_result_t<AsyncAction>>; |
f67539c2 TL |
117 | static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature"); |
118 | for (;;) { | |
119 | // Do not type-erase here in case this is a short repeat() | |
120 | auto f = futurator::invoke(action); | |
121 | ||
122 | if (!f.available() || f.failed() || need_preempt()) { | |
123 | return [&] () noexcept { | |
124 | memory::scoped_critical_alloc_section _; | |
125 | auto repeater = new internal::repeater<AsyncAction>(std::move(action)); | |
126 | auto ret = repeater->get_future(); | |
1e59de90 | 127 | internal::set_callback(std::move(f), repeater); |
f67539c2 TL |
128 | return ret; |
129 | }(); | |
130 | } | |
131 | ||
132 | if (f.get0() == stop_iteration::yes) { | |
133 | return make_ready_future<>(); | |
134 | } | |
135 | } | |
136 | } | |
137 | ||
138 | /// \cond internal | |
139 | ||
140 | template <typename T> | |
141 | struct repeat_until_value_type_helper; | |
142 | ||
143 | /// Type helper for repeat_until_value() | |
144 | template <typename T> | |
145 | struct repeat_until_value_type_helper<future<std::optional<T>>> { | |
146 | /// The type of the value we are computing | |
147 | using value_type = T; | |
148 | /// Type used by \c AsyncAction while looping | |
149 | using optional_type = std::optional<T>; | |
150 | /// Return type of repeat_until_value() | |
151 | using future_type = future<value_type>; | |
152 | }; | |
153 | ||
154 | /// Return value of repeat_until_value() | |
155 | template <typename AsyncAction> | |
156 | using repeat_until_value_return_type | |
20effc67 | 157 | = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type; |
f67539c2 TL |
158 | |
159 | /// \endcond | |
160 | ||
161 | namespace internal { | |
162 | ||
163 | template <typename AsyncAction, typename T> | |
164 | class repeat_until_value_state final : public continuation_base<std::optional<T>> { | |
165 | promise<T> _promise; | |
166 | AsyncAction _action; | |
167 | public: | |
168 | explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {} | |
169 | repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) { | |
170 | this->_state.set(std::move(st)); | |
171 | } | |
172 | future<T> get_future() { return _promise.get_future(); } | |
173 | task* waiting_task() noexcept override { return _promise.waiting_task(); } | |
174 | virtual void run_and_dispose() noexcept override { | |
175 | if (this->_state.failed()) { | |
176 | _promise.set_exception(std::move(this->_state).get_exception()); | |
177 | delete this; | |
178 | return; | |
179 | } else { | |
180 | auto v = std::move(this->_state).get0(); | |
181 | if (v) { | |
182 | _promise.set_value(std::move(*v)); | |
183 | delete this; | |
184 | return; | |
185 | } | |
186 | this->_state = {}; | |
187 | } | |
188 | try { | |
189 | do { | |
190 | auto f = futurize_invoke(_action); | |
191 | if (!f.available()) { | |
1e59de90 | 192 | internal::set_callback(std::move(f), this); |
f67539c2 TL |
193 | return; |
194 | } | |
195 | auto ret = f.get0(); | |
196 | if (ret) { | |
197 | _promise.set_value(std::move(*ret)); | |
198 | delete this; | |
199 | return; | |
200 | } | |
201 | } while (!need_preempt()); | |
202 | } catch (...) { | |
203 | _promise.set_exception(std::current_exception()); | |
204 | delete this; | |
205 | return; | |
206 | } | |
207 | this->_state.set(std::nullopt); | |
208 | schedule(this); | |
209 | } | |
210 | }; | |
211 | ||
212 | } // namespace internal | |
213 | ||
214 | /// Invokes given action until it fails or the function requests iteration to stop by returning | |
215 | /// an engaged \c future<std::optional<T>> or std::optional<T>. The value is extracted | |
216 | /// from the \c optional, and returned, as a future, from repeat_until_value(). | |
217 | /// | |
218 | /// \param action a callable taking no arguments, returning a future<std::optional<T>> | |
219 | /// or std::optional<T>. Will be called again as soon as the future | |
220 | /// resolves, unless the future fails, action throws, or it resolves with | |
221 | /// an engaged \c optional. If \c action is an r-value it can be moved | |
222 | /// in the middle of iteration. | |
223 | /// \return a ready future if we stopped successfully, or a failed future if | |
224 | /// a call to to \c action failed. The \c optional's value is returned. | |
225 | template<typename AsyncAction> | |
226 | SEASTAR_CONCEPT( requires requires (AsyncAction aa) { | |
227 | bool(futurize_invoke(aa).get0()); | |
228 | futurize_invoke(aa).get0().value(); | |
229 | } ) | |
230 | repeat_until_value_return_type<AsyncAction> | |
231 | repeat_until_value(AsyncAction action) noexcept { | |
20effc67 | 232 | using futurator = futurize<std::invoke_result_t<AsyncAction>>; |
f67539c2 TL |
233 | using type_helper = repeat_until_value_type_helper<typename futurator::type>; |
234 | // the "T" in the documentation | |
235 | using value_type = typename type_helper::value_type; | |
236 | using optional_type = typename type_helper::optional_type; | |
237 | do { | |
238 | auto f = futurator::invoke(action); | |
239 | ||
240 | if (!f.available()) { | |
241 | return [&] () noexcept { | |
242 | memory::scoped_critical_alloc_section _; | |
243 | auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action)); | |
244 | auto ret = state->get_future(); | |
1e59de90 | 245 | internal::set_callback(std::move(f), state); |
f67539c2 TL |
246 | return ret; |
247 | }(); | |
248 | } | |
249 | ||
250 | if (f.failed()) { | |
251 | return make_exception_future<value_type>(f.get_exception()); | |
252 | } | |
253 | ||
254 | optional_type&& optional = std::move(f).get0(); | |
255 | if (optional) { | |
256 | return make_ready_future<value_type>(std::move(optional.value())); | |
257 | } | |
258 | } while (!need_preempt()); | |
259 | ||
260 | try { | |
261 | auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action)); | |
262 | auto f = state->get_future(); | |
263 | schedule(state); | |
264 | return f; | |
265 | } catch (...) { | |
266 | return make_exception_future<value_type>(std::current_exception()); | |
267 | } | |
268 | } | |
269 | ||
270 | namespace internal { | |
271 | ||
272 | template <typename StopCondition, typename AsyncAction> | |
273 | class do_until_state final : public continuation_base<> { | |
274 | promise<> _promise; | |
275 | StopCondition _stop; | |
276 | AsyncAction _action; | |
277 | public: | |
278 | explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {} | |
279 | future<> get_future() { return _promise.get_future(); } | |
280 | task* waiting_task() noexcept override { return _promise.waiting_task(); } | |
281 | virtual void run_and_dispose() noexcept override { | |
282 | if (_state.available()) { | |
283 | if (_state.failed()) { | |
284 | _promise.set_urgent_state(std::move(_state)); | |
285 | delete this; | |
286 | return; | |
287 | } | |
288 | _state = {}; // allow next cycle to overrun state | |
289 | } | |
290 | try { | |
291 | do { | |
292 | if (_stop()) { | |
293 | _promise.set_value(); | |
294 | delete this; | |
295 | return; | |
296 | } | |
297 | auto f = _action(); | |
298 | if (!f.available()) { | |
1e59de90 | 299 | internal::set_callback(std::move(f), this); |
f67539c2 TL |
300 | return; |
301 | } | |
302 | if (f.failed()) { | |
303 | f.forward_to(std::move(_promise)); | |
304 | delete this; | |
305 | return; | |
306 | } | |
307 | } while (!need_preempt()); | |
308 | } catch (...) { | |
309 | _promise.set_exception(std::current_exception()); | |
310 | delete this; | |
311 | return; | |
312 | } | |
313 | schedule(this); | |
314 | } | |
315 | }; | |
316 | ||
317 | } // namespace internal | |
318 | ||
20effc67 | 319 | /// Invokes given action until it fails or given condition evaluates to true or fails. |
f67539c2 TL |
320 | /// |
321 | /// \param stop_cond a callable taking no arguments, returning a boolean that | |
322 | /// evalutes to true when you don't want to call \c action | |
20effc67 TL |
323 | /// any longer. If \c stop_cond fails, the exception is propagated |
324 | // in the returned future. | |
f67539c2 TL |
325 | /// \param action a callable taking no arguments, returning a future<>. Will |
326 | /// be called again as soon as the future resolves, unless the | |
20effc67 | 327 | /// future fails, or \c stop_cond returns \c true or fails. |
f67539c2 | 328 | /// \return a ready future if we stopped successfully, or a failed future if |
20effc67 | 329 | /// a call to to \c action or a call to \c stop_cond failed. |
f67539c2 TL |
330 | template<typename AsyncAction, typename StopCondition> |
331 | SEASTAR_CONCEPT( requires seastar::InvokeReturns<StopCondition, bool> && seastar::InvokeReturns<AsyncAction, future<>> ) | |
332 | inline | |
333 | future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept { | |
334 | using namespace internal; | |
335 | for (;;) { | |
20effc67 | 336 | try { |
f67539c2 TL |
337 | if (stop_cond()) { |
338 | return make_ready_future<>(); | |
339 | } | |
20effc67 TL |
340 | } catch (...) { |
341 | return current_exception_as_future(); | |
342 | } | |
f67539c2 TL |
343 | auto f = futurize_invoke(action); |
344 | if (f.failed()) { | |
345 | return f; | |
346 | } | |
347 | if (!f.available() || need_preempt()) { | |
348 | return [&] () noexcept { | |
349 | memory::scoped_critical_alloc_section _; | |
350 | auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action)); | |
351 | auto ret = task->get_future(); | |
1e59de90 | 352 | internal::set_callback(std::move(f), task); |
f67539c2 TL |
353 | return ret; |
354 | }(); | |
355 | } | |
356 | } | |
357 | } | |
358 | ||
359 | /// Invoke given action until it fails. | |
360 | /// | |
361 | /// Calls \c action repeatedly until it returns a failed future. | |
362 | /// | |
363 | /// \param action a callable taking no arguments, returning a \c future<> | |
364 | /// that becomes ready when you wish it to be called again. | |
365 | /// \return a future<> that will resolve to the first failure of \c action | |
366 | template<typename AsyncAction> | |
367 | SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, future<>> ) | |
368 | inline | |
369 | future<> keep_doing(AsyncAction action) noexcept { | |
370 | return repeat([action = std::move(action)] () mutable { | |
371 | return action().then([] { | |
372 | return stop_iteration::no; | |
373 | }); | |
374 | }); | |
375 | } | |
376 | ||
377 | namespace internal { | |
378 | template <typename Iterator, typename AsyncAction> | |
379 | class do_for_each_state final : public continuation_base<> { | |
380 | Iterator _begin; | |
381 | Iterator _end; | |
382 | AsyncAction _action; | |
383 | promise<> _pr; | |
384 | ||
385 | public: | |
1e59de90 | 386 | do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable) |
f67539c2 | 387 | : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) { |
1e59de90 | 388 | internal::set_callback(std::move(first_unavailable), this); |
f67539c2 TL |
389 | } |
390 | virtual void run_and_dispose() noexcept override { | |
391 | std::unique_ptr<do_for_each_state> zis(this); | |
392 | if (_state.failed()) { | |
393 | _pr.set_urgent_state(std::move(_state)); | |
394 | return; | |
395 | } | |
396 | while (_begin != _end) { | |
397 | auto f = futurize_invoke(_action, *_begin++); | |
398 | if (f.failed()) { | |
399 | f.forward_to(std::move(_pr)); | |
400 | return; | |
401 | } | |
402 | if (!f.available() || need_preempt()) { | |
403 | _state = {}; | |
1e59de90 | 404 | internal::set_callback(std::move(f), this); |
f67539c2 TL |
405 | zis.release(); |
406 | return; | |
407 | } | |
408 | } | |
409 | _pr.set_value(); | |
410 | } | |
411 | task* waiting_task() noexcept override { | |
412 | return _pr.waiting_task(); | |
413 | } | |
414 | future<> get_future() { | |
415 | return _pr.get_future(); | |
416 | } | |
417 | }; | |
418 | ||
419 | template<typename Iterator, typename AsyncAction> | |
420 | inline | |
421 | future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) { | |
422 | while (begin != end) { | |
423 | auto f = futurize_invoke(action, *begin++); | |
424 | if (f.failed()) { | |
425 | return f; | |
426 | } | |
427 | if (!f.available() || need_preempt()) { | |
428 | auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{ | |
429 | std::move(begin), std::move(end), std::move(action), std::move(f)}; | |
430 | return s->get_future(); | |
431 | } | |
432 | } | |
433 | return make_ready_future<>(); | |
434 | } | |
435 | } // namespace internal | |
436 | ||
437 | /// \addtogroup future-util | |
438 | ||
439 | /// \brief Call a function for each item in a range, sequentially (iterator version). | |
440 | /// | |
441 | /// For each item in a range, call a function, waiting for the previous | |
442 | /// invocation to complete before calling the next one. | |
443 | /// | |
444 | /// \param begin an \c InputIterator designating the beginning of the range | |
445 | /// \param end an \c InputIterator designating the endof the range | |
446 | /// \param action a callable, taking a reference to objects from the range | |
447 | /// as a parameter, and returning a \c future<> that resolves | |
448 | /// when it is acceptable to process the next item. | |
449 | /// \return a ready future on success, or the first failed future if | |
450 | /// \c action failed. | |
451 | template<typename Iterator, typename AsyncAction> | |
452 | SEASTAR_CONCEPT( requires requires (Iterator i, AsyncAction aa) { | |
453 | { futurize_invoke(aa, *i) } -> std::same_as<future<>>; | |
454 | } ) | |
455 | inline | |
456 | future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept { | |
457 | try { | |
458 | return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action)); | |
459 | } catch (...) { | |
460 | return current_exception_as_future(); | |
461 | } | |
462 | } | |
463 | ||
464 | /// \brief Call a function for each item in a range, sequentially (range version). | |
465 | /// | |
466 | /// For each item in a range, call a function, waiting for the previous | |
467 | /// invocation to complete before calling the next one. | |
468 | /// | |
469 | /// \param c an \c Container object designating input range | |
470 | /// \param action a callable, taking a reference to objects from the range | |
471 | /// as a parameter, and returning a \c future<> that resolves | |
472 | /// when it is acceptable to process the next item. | |
473 | /// \return a ready future on success, or the first failed future if | |
474 | /// \c action failed. | |
475 | template<typename Container, typename AsyncAction> | |
476 | SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) { | |
1e59de90 TL |
477 | { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>; |
478 | std::end(c); | |
f67539c2 TL |
479 | } ) |
480 | inline | |
481 | future<> do_for_each(Container& c, AsyncAction action) noexcept { | |
482 | try { | |
483 | return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action)); | |
484 | } catch (...) { | |
485 | return current_exception_as_future(); | |
486 | } | |
487 | } | |
488 | ||
489 | namespace internal { | |
490 | ||
1e59de90 TL |
491 | template <typename T, typename = void> |
492 | struct has_iterator_category : std::false_type {}; | |
493 | ||
494 | template <typename T> | |
495 | struct has_iterator_category<T, std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {}; | |
496 | ||
497 | template <typename Iterator, typename Sentinel, typename IteratorCategory> | |
f67539c2 TL |
498 | inline |
499 | size_t | |
1e59de90 | 500 | iterator_range_estimate_vector_capacity(Iterator const&, Sentinel const&, IteratorCategory) { |
f67539c2 TL |
501 | // For InputIterators we can't estimate needed capacity |
502 | return 0; | |
503 | } | |
504 | ||
1e59de90 | 505 | template <typename Iterator, typename Sentinel> |
f67539c2 TL |
506 | inline |
507 | size_t | |
1e59de90 | 508 | iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end, std::forward_iterator_tag) { |
f67539c2 TL |
509 | // May be linear time below random_access_iterator_tag, but still better than reallocation |
510 | return std::distance(begin, end); | |
511 | } | |
512 | ||
513 | } // namespace internal | |
514 | ||
515 | /// \cond internal | |
516 | ||
517 | class parallel_for_each_state final : private continuation_base<> { | |
518 | std::vector<future<>> _incomplete; | |
519 | promise<> _result; | |
520 | std::exception_ptr _ex; | |
521 | private: | |
522 | // Wait for one of the futures in _incomplete to complete, and then | |
523 | // decide what to do: wait for another one, or deliver _result if all | |
524 | // are complete. | |
525 | void wait_for_one() noexcept; | |
526 | virtual void run_and_dispose() noexcept override; | |
527 | task* waiting_task() noexcept override { return _result.waiting_task(); } | |
528 | public: | |
529 | parallel_for_each_state(size_t n); | |
530 | void add_future(future<>&& f); | |
531 | future<> get_future(); | |
532 | }; | |
533 | ||
534 | /// \endcond | |
535 | ||
536 | /// \brief Run tasks in parallel (iterator version). | |
537 | /// | |
538 | /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in | |
539 | /// the range, and return a future<> that resolves when all the functions | |
540 | /// complete. \c func should return a future<> that indicates when it is | |
541 | /// complete. All invocations are performed in parallel. This allows the range | |
542 | /// to refer to stack objects, but means that unlike other loops this cannot | |
543 | /// check need_preempt and can only be used with small ranges. | |
544 | /// | |
545 | /// \param begin an \c InputIterator designating the beginning of the range | |
546 | /// \param end an \c InputIterator designating the end of the range | |
547 | /// \param func Function to invoke with each element in the range (returning | |
548 | /// a \c future<>) | |
549 | /// \return a \c future<> that resolves when all the function invocations | |
550 | /// complete. If one or more return an exception, the return value | |
551 | /// contains one of the exceptions. | |
1e59de90 TL |
552 | /// \note parallel_for_each() schedules all invocations of \c func on the |
553 | /// current shard. If you want to run a function on all shards in parallel, | |
554 | /// have a look at \ref smp::invoke_on_all() instead. | |
555 | template <typename Iterator, typename Sentinel, typename Func> | |
556 | SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>))) | |
557 | // We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular, | |
558 | // which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could | |
559 | // break legacy code, for which it holds that Sentinel equals Iterator. | |
f67539c2 TL |
560 | inline |
561 | future<> | |
1e59de90 | 562 | parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept { |
f67539c2 TL |
563 | parallel_for_each_state* s = nullptr; |
564 | // Process all elements, giving each future the following treatment: | |
565 | // - available, not failed: do nothing | |
566 | // - available, failed: collect exception in ex | |
567 | // - not available: collect in s (allocating it if needed) | |
568 | while (begin != end) { | |
1e59de90 TL |
569 | auto f = futurize_invoke(std::forward<Func>(func), *begin); |
570 | ++begin; | |
20effc67 | 571 | memory::scoped_critical_alloc_section _; |
f67539c2 TL |
572 | if (!f.available() || f.failed()) { |
573 | if (!s) { | |
f67539c2 | 574 | using itraits = std::iterator_traits<Iterator>; |
1e59de90 TL |
575 | size_t n{0U}; |
576 | if constexpr (internal::has_iterator_category<Iterator>::value) { | |
577 | // We need if-constexpr here because there exist iterators for which std::iterator_traits | |
578 | // does not have 'iterator_category' as member type | |
579 | n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category{}) + 1); | |
580 | } | |
f67539c2 TL |
581 | s = new parallel_for_each_state(n); |
582 | } | |
583 | s->add_future(std::move(f)); | |
584 | } | |
585 | } | |
586 | // If any futures were not available, hand off to parallel_for_each_state::start(). | |
587 | // Otherwise we can return a result immediately. | |
588 | if (s) { | |
589 | // s->get_future() takes ownership of s (and chains it to one of the futures it contains) | |
590 | // so this isn't a leak | |
591 | return s->get_future(); | |
592 | } | |
593 | return make_ready_future<>(); | |
594 | } | |
595 | ||
596 | /// \brief Run tasks in parallel (range version). | |
597 | /// | |
598 | /// Given a \c range of objects, invoke \c func with each object | |
599 | /// in the range, and return a future<> that resolves when all | |
600 | /// the functions complete. \c func should return a future<> that indicates | |
601 | /// when it is complete. All invocations are performed in parallel. This allows | |
602 | /// the range to refer to stack objects, but means that unlike other loops this | |
603 | /// cannot check need_preempt and can only be used with small ranges. | |
604 | /// | |
605 | /// \param range A range of objects to iterate run \c func on | |
606 | /// \param func A callable, accepting reference to the range's | |
607 | /// \c value_type, and returning a \c future<>. | |
608 | /// \return a \c future<> that becomes ready when the entire range | |
609 | /// was processed. If one or more of the invocations of | |
610 | /// \c func returned an exceptional future, then the return | |
611 | /// value will contain one of those exceptions. | |
1e59de90 TL |
612 | /// \note parallel_for_each() schedules all invocations of \c func on the |
613 | /// current shard. If you want to run a function on all shards in parallel, | |
614 | /// have a look at \ref smp::invoke_on_all() instead. | |
f67539c2 TL |
615 | |
616 | namespace internal { | |
617 | ||
618 | template <typename Range, typename Func> | |
619 | inline | |
620 | future<> | |
621 | parallel_for_each_impl(Range&& range, Func&& func) { | |
622 | return parallel_for_each(std::begin(range), std::end(range), | |
623 | std::forward<Func>(func)); | |
624 | } | |
625 | ||
626 | } // namespace internal | |
627 | ||
628 | template <typename Range, typename Func> | |
1e59de90 TL |
629 | SEASTAR_CONCEPT( requires requires (Func f, Range r) { |
630 | { f(*std::begin(r)) } -> std::same_as<future<>>; | |
631 | std::end(r); | |
632 | } ) | |
f67539c2 TL |
633 | inline |
634 | future<> | |
635 | parallel_for_each(Range&& range, Func&& func) noexcept { | |
636 | auto impl = internal::parallel_for_each_impl<Range, Func>; | |
637 | return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func)); | |
638 | } | |
639 | ||
640 | /// Run a maximum of \c max_concurrent tasks in parallel (iterator version). | |
641 | /// | |
642 | /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in | |
643 | /// the range, and return a future<> that resolves when all the functions | |
644 | /// complete. \c func should return a future<> that indicates when it is | |
645 | /// complete. Up to \c max_concurrent invocations are performed in parallel. | |
646 | /// This does not allow the range to refer to stack objects. The caller | |
647 | /// must ensure that the range outlives the call to max_concurrent_for_each | |
648 | /// so it can be iterated in the background. | |
649 | /// | |
650 | /// \param begin an \c InputIterator designating the beginning of the range | |
651 | /// \param end an \c InputIterator designating the end of the range | |
652 | /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero. | |
653 | /// \param func Function to invoke with each element in the range (returning | |
654 | /// a \c future<>) | |
655 | /// \return a \c future<> that resolves when all the function invocations | |
656 | /// complete. If one or more return an exception, the return value | |
657 | /// contains one of the exceptions. | |
1e59de90 TL |
658 | /// \note max_concurrent_for_each() schedules all invocations of \c func on the |
659 | /// current shard. If you want to run a function on all shards in parallel, | |
660 | /// have a look at \ref smp::invoke_on_all() instead. | |
661 | template <typename Iterator, typename Sentinel, typename Func> | |
662 | SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) ) ) | |
663 | // We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular, | |
664 | // which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could | |
665 | // break legacy code, for which it holds that Sentinel equals Iterator. | |
f67539c2 TL |
666 | inline |
667 | future<> | |
1e59de90 | 668 | max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func&& func) noexcept { |
f67539c2 TL |
669 | struct state { |
670 | Iterator begin; | |
1e59de90 | 671 | Sentinel end; |
f67539c2 TL |
672 | Func func; |
673 | size_t max_concurrent; | |
674 | semaphore sem; | |
675 | std::exception_ptr err; | |
676 | ||
1e59de90 | 677 | state(Iterator begin_, Sentinel end_, size_t max_concurrent_, Func func_) |
f67539c2 TL |
678 | : begin(std::move(begin_)) |
679 | , end(std::move(end_)) | |
680 | , func(std::move(func_)) | |
681 | , max_concurrent(max_concurrent_) | |
682 | , sem(max_concurrent_) | |
683 | , err() | |
684 | { } | |
685 | }; | |
686 | ||
687 | assert(max_concurrent > 0); | |
688 | ||
689 | try { | |
690 | return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) { | |
691 | return do_until([&s] { return s.begin == s.end; }, [&s] { | |
692 | return s.sem.wait().then([&s] () mutable noexcept { | |
693 | // Possibly run in background and signal _sem when the task is done. | |
694 | // The background tasks are waited on using _sem. | |
1e59de90 | 695 | (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (future<> fut) { |
f67539c2 TL |
696 | if (fut.failed()) { |
697 | auto e = fut.get_exception();; | |
698 | if (!s.err) { | |
699 | s.err = std::move(e); | |
700 | } | |
701 | } | |
702 | s.sem.signal(); | |
703 | }); | |
1e59de90 | 704 | ++s.begin; |
f67539c2 TL |
705 | }); |
706 | }).then([&s] { | |
707 | // Wait for any background task to finish | |
708 | // and signal and semaphore | |
709 | return s.sem.wait(s.max_concurrent); | |
710 | }).then([&s] { | |
711 | if (!s.err) { | |
712 | return make_ready_future<>(); | |
713 | } | |
714 | return seastar::make_exception_future<>(std::move(s.err)); | |
715 | }); | |
716 | }); | |
717 | } catch (...) { | |
718 | return current_exception_as_future(); | |
719 | } | |
720 | } | |
721 | ||
722 | /// Run a maximum of \c max_concurrent tasks in parallel (range version). | |
723 | /// | |
20effc67 | 724 | /// Given a range of objects, run \c func on each \c *i in |
f67539c2 TL |
725 | /// the range, and return a future<> that resolves when all the functions |
726 | /// complete. \c func should return a future<> that indicates when it is | |
727 | /// complete. Up to \c max_concurrent invocations are performed in parallel. | |
728 | /// This does not allow the range to refer to stack objects. The caller | |
729 | /// must ensure that the range outlives the call to max_concurrent_for_each | |
730 | /// so it can be iterated in the background. | |
731 | /// | |
20effc67 | 732 | /// \param range a \c Range to be processed |
f67539c2 TL |
733 | /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero. |
734 | /// \param func Function to invoke with each element in the range (returning | |
735 | /// a \c future<>) | |
736 | /// \return a \c future<> that resolves when all the function invocations | |
737 | /// complete. If one or more return an exception, the return value | |
738 | /// contains one of the exceptions. | |
1e59de90 TL |
739 | /// \note max_concurrent_for_each() schedules all invocations of \c func on the |
740 | /// current shard. If you want to run a function on all shards in parallel, | |
741 | /// have a look at \ref smp::invoke_on_all() instead. | |
f67539c2 | 742 | template <typename Range, typename Func> |
1e59de90 TL |
743 | SEASTAR_CONCEPT( requires requires (Func f, Range r) { |
744 | { f(*std::begin(r)) } -> std::same_as<future<>>; | |
745 | std::end(r); | |
746 | } ) | |
f67539c2 TL |
747 | inline |
748 | future<> | |
749 | max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept { | |
750 | try { | |
751 | return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func)); | |
752 | } catch (...) { | |
753 | return current_exception_as_future(); | |
754 | } | |
755 | } | |
756 | ||
757 | /// @} | |
758 | ||
759 | } // namespace seastar |