]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2020 ScyllaDB. | |
21 | */ | |
22 | ||
23 | #pragma once | |
24 | ||
25 | #include <iterator> | |
26 | #include <memory> | |
27 | #include <vector> | |
28 | ||
29 | #include <seastar/core/future.hh> | |
30 | #include <seastar/core/task.hh> | |
31 | #include <seastar/util/bool_class.hh> | |
32 | #include <seastar/core/semaphore.hh> | |
33 | ||
34 | namespace seastar { | |
35 | ||
36 | /// \addtogroup future-util | |
37 | /// @{ | |
38 | ||
39 | // The AsyncAction concept represents an action which can complete later than | |
40 | // the actual function invocation. It is represented by a function which | |
41 | // returns a future which resolves when the action is done. | |
42 | ||
43 | struct stop_iteration_tag { }; | |
44 | using stop_iteration = bool_class<stop_iteration_tag>; | |
45 | ||
46 | namespace internal { | |
47 | ||
48 | template <typename AsyncAction> | |
49 | class repeater final : public continuation_base<stop_iteration> { | |
50 | promise<> _promise; | |
51 | AsyncAction _action; | |
52 | public: | |
53 | explicit repeater(AsyncAction&& action) : _action(std::move(action)) {} | |
54 | future<> get_future() { return _promise.get_future(); } | |
55 | task* waiting_task() noexcept override { return _promise.waiting_task(); } | |
56 | virtual void run_and_dispose() noexcept override { | |
57 | if (_state.failed()) { | |
58 | _promise.set_exception(std::move(_state).get_exception()); | |
59 | delete this; | |
60 | return; | |
61 | } else { | |
62 | if (_state.get0() == stop_iteration::yes) { | |
63 | _promise.set_value(); | |
64 | delete this; | |
65 | return; | |
66 | } | |
67 | _state = {}; | |
68 | } | |
69 | try { | |
70 | do { | |
71 | auto f = futurize_invoke(_action); | |
72 | if (!f.available()) { | |
73 | internal::set_callback(f, this); | |
74 | return; | |
75 | } | |
76 | if (f.get0() == stop_iteration::yes) { | |
77 | _promise.set_value(); | |
78 | delete this; | |
79 | return; | |
80 | } | |
81 | } while (!need_preempt()); | |
82 | } catch (...) { | |
83 | _promise.set_exception(std::current_exception()); | |
84 | delete this; | |
85 | return; | |
86 | } | |
87 | _state.set(stop_iteration::no); | |
88 | schedule(this); | |
89 | } | |
90 | }; | |
91 | ||
92 | } // namespace internal | |
93 | ||
94 | // Delete these overloads so that the actual implementation can use a | |
95 | // universal reference but still reject lvalue references. | |
96 | template<typename AsyncAction> | |
97 | future<> repeat(const AsyncAction& action) noexcept = delete; | |
98 | template<typename AsyncAction> | |
99 | future<> repeat(AsyncAction& action) noexcept = delete; | |
100 | ||
101 | /// Invokes given action until it fails or the function requests iteration to stop by returning | |
102 | /// \c stop_iteration::yes. | |
103 | /// | |
104 | /// \param action a callable taking no arguments, returning a future<stop_iteration>. Will | |
105 | /// be called again as soon as the future resolves, unless the | |
106 | /// future fails, action throws, or it resolves with \c stop_iteration::yes. | |
107 | /// If \c action is an r-value it can be moved in the middle of iteration. | |
108 | /// \return a ready future if we stopped successfully, or a failed future if | |
109 | /// a call to to \c action failed. | |
110 | template<typename AsyncAction> | |
111 | SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, stop_iteration> || seastar::InvokeReturns<AsyncAction, future<stop_iteration>> ) | |
112 | inline | |
113 | future<> repeat(AsyncAction&& action) noexcept { | |
20effc67 | 114 | using futurator = futurize<std::invoke_result_t<AsyncAction>>; |
f67539c2 TL |
115 | static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature"); |
116 | for (;;) { | |
117 | // Do not type-erase here in case this is a short repeat() | |
118 | auto f = futurator::invoke(action); | |
119 | ||
120 | if (!f.available() || f.failed() || need_preempt()) { | |
121 | return [&] () noexcept { | |
122 | memory::scoped_critical_alloc_section _; | |
123 | auto repeater = new internal::repeater<AsyncAction>(std::move(action)); | |
124 | auto ret = repeater->get_future(); | |
125 | internal::set_callback(f, repeater); | |
126 | return ret; | |
127 | }(); | |
128 | } | |
129 | ||
130 | if (f.get0() == stop_iteration::yes) { | |
131 | return make_ready_future<>(); | |
132 | } | |
133 | } | |
134 | } | |
135 | ||
136 | /// \cond internal | |
137 | ||
138 | template <typename T> | |
139 | struct repeat_until_value_type_helper; | |
140 | ||
141 | /// Type helper for repeat_until_value() | |
142 | template <typename T> | |
143 | struct repeat_until_value_type_helper<future<std::optional<T>>> { | |
144 | /// The type of the value we are computing | |
145 | using value_type = T; | |
146 | /// Type used by \c AsyncAction while looping | |
147 | using optional_type = std::optional<T>; | |
148 | /// Return type of repeat_until_value() | |
149 | using future_type = future<value_type>; | |
150 | }; | |
151 | ||
152 | /// Return value of repeat_until_value() | |
153 | template <typename AsyncAction> | |
154 | using repeat_until_value_return_type | |
20effc67 | 155 | = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type; |
f67539c2 TL |
156 | |
157 | /// \endcond | |
158 | ||
159 | namespace internal { | |
160 | ||
161 | template <typename AsyncAction, typename T> | |
162 | class repeat_until_value_state final : public continuation_base<std::optional<T>> { | |
163 | promise<T> _promise; | |
164 | AsyncAction _action; | |
165 | public: | |
166 | explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {} | |
167 | repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) { | |
168 | this->_state.set(std::move(st)); | |
169 | } | |
170 | future<T> get_future() { return _promise.get_future(); } | |
171 | task* waiting_task() noexcept override { return _promise.waiting_task(); } | |
172 | virtual void run_and_dispose() noexcept override { | |
173 | if (this->_state.failed()) { | |
174 | _promise.set_exception(std::move(this->_state).get_exception()); | |
175 | delete this; | |
176 | return; | |
177 | } else { | |
178 | auto v = std::move(this->_state).get0(); | |
179 | if (v) { | |
180 | _promise.set_value(std::move(*v)); | |
181 | delete this; | |
182 | return; | |
183 | } | |
184 | this->_state = {}; | |
185 | } | |
186 | try { | |
187 | do { | |
188 | auto f = futurize_invoke(_action); | |
189 | if (!f.available()) { | |
190 | internal::set_callback(f, this); | |
191 | return; | |
192 | } | |
193 | auto ret = f.get0(); | |
194 | if (ret) { | |
195 | _promise.set_value(std::move(*ret)); | |
196 | delete this; | |
197 | return; | |
198 | } | |
199 | } while (!need_preempt()); | |
200 | } catch (...) { | |
201 | _promise.set_exception(std::current_exception()); | |
202 | delete this; | |
203 | return; | |
204 | } | |
205 | this->_state.set(std::nullopt); | |
206 | schedule(this); | |
207 | } | |
208 | }; | |
209 | ||
210 | } // namespace internal | |
211 | ||
212 | /// Invokes given action until it fails or the function requests iteration to stop by returning | |
213 | /// an engaged \c future<std::optional<T>> or std::optional<T>. The value is extracted | |
214 | /// from the \c optional, and returned, as a future, from repeat_until_value(). | |
215 | /// | |
216 | /// \param action a callable taking no arguments, returning a future<std::optional<T>> | |
217 | /// or std::optional<T>. Will be called again as soon as the future | |
218 | /// resolves, unless the future fails, action throws, or it resolves with | |
219 | /// an engaged \c optional. If \c action is an r-value it can be moved | |
220 | /// in the middle of iteration. | |
221 | /// \return a ready future if we stopped successfully, or a failed future if | |
222 | /// a call to to \c action failed. The \c optional's value is returned. | |
223 | template<typename AsyncAction> | |
224 | SEASTAR_CONCEPT( requires requires (AsyncAction aa) { | |
225 | bool(futurize_invoke(aa).get0()); | |
226 | futurize_invoke(aa).get0().value(); | |
227 | } ) | |
228 | repeat_until_value_return_type<AsyncAction> | |
229 | repeat_until_value(AsyncAction action) noexcept { | |
20effc67 | 230 | using futurator = futurize<std::invoke_result_t<AsyncAction>>; |
f67539c2 TL |
231 | using type_helper = repeat_until_value_type_helper<typename futurator::type>; |
232 | // the "T" in the documentation | |
233 | using value_type = typename type_helper::value_type; | |
234 | using optional_type = typename type_helper::optional_type; | |
235 | do { | |
236 | auto f = futurator::invoke(action); | |
237 | ||
238 | if (!f.available()) { | |
239 | return [&] () noexcept { | |
240 | memory::scoped_critical_alloc_section _; | |
241 | auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action)); | |
242 | auto ret = state->get_future(); | |
243 | internal::set_callback(f, state); | |
244 | return ret; | |
245 | }(); | |
246 | } | |
247 | ||
248 | if (f.failed()) { | |
249 | return make_exception_future<value_type>(f.get_exception()); | |
250 | } | |
251 | ||
252 | optional_type&& optional = std::move(f).get0(); | |
253 | if (optional) { | |
254 | return make_ready_future<value_type>(std::move(optional.value())); | |
255 | } | |
256 | } while (!need_preempt()); | |
257 | ||
258 | try { | |
259 | auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action)); | |
260 | auto f = state->get_future(); | |
261 | schedule(state); | |
262 | return f; | |
263 | } catch (...) { | |
264 | return make_exception_future<value_type>(std::current_exception()); | |
265 | } | |
266 | } | |
267 | ||
268 | namespace internal { | |
269 | ||
270 | template <typename StopCondition, typename AsyncAction> | |
271 | class do_until_state final : public continuation_base<> { | |
272 | promise<> _promise; | |
273 | StopCondition _stop; | |
274 | AsyncAction _action; | |
275 | public: | |
276 | explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {} | |
277 | future<> get_future() { return _promise.get_future(); } | |
278 | task* waiting_task() noexcept override { return _promise.waiting_task(); } | |
279 | virtual void run_and_dispose() noexcept override { | |
280 | if (_state.available()) { | |
281 | if (_state.failed()) { | |
282 | _promise.set_urgent_state(std::move(_state)); | |
283 | delete this; | |
284 | return; | |
285 | } | |
286 | _state = {}; // allow next cycle to overrun state | |
287 | } | |
288 | try { | |
289 | do { | |
290 | if (_stop()) { | |
291 | _promise.set_value(); | |
292 | delete this; | |
293 | return; | |
294 | } | |
295 | auto f = _action(); | |
296 | if (!f.available()) { | |
297 | internal::set_callback(f, this); | |
298 | return; | |
299 | } | |
300 | if (f.failed()) { | |
301 | f.forward_to(std::move(_promise)); | |
302 | delete this; | |
303 | return; | |
304 | } | |
305 | } while (!need_preempt()); | |
306 | } catch (...) { | |
307 | _promise.set_exception(std::current_exception()); | |
308 | delete this; | |
309 | return; | |
310 | } | |
311 | schedule(this); | |
312 | } | |
313 | }; | |
314 | ||
315 | } // namespace internal | |
316 | ||
20effc67 | 317 | /// Invokes given action until it fails or given condition evaluates to true or fails. |
f67539c2 TL |
318 | /// |
319 | /// \param stop_cond a callable taking no arguments, returning a boolean that | |
320 | /// evalutes to true when you don't want to call \c action | |
20effc67 TL |
321 | /// any longer. If \c stop_cond fails, the exception is propagated |
322 | // in the returned future. | |
f67539c2 TL |
323 | /// \param action a callable taking no arguments, returning a future<>. Will |
324 | /// be called again as soon as the future resolves, unless the | |
20effc67 | 325 | /// future fails, or \c stop_cond returns \c true or fails. |
f67539c2 | 326 | /// \return a ready future if we stopped successfully, or a failed future if |
20effc67 | 327 | /// a call to to \c action or a call to \c stop_cond failed. |
f67539c2 TL |
328 | template<typename AsyncAction, typename StopCondition> |
329 | SEASTAR_CONCEPT( requires seastar::InvokeReturns<StopCondition, bool> && seastar::InvokeReturns<AsyncAction, future<>> ) | |
330 | inline | |
331 | future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept { | |
332 | using namespace internal; | |
333 | for (;;) { | |
20effc67 | 334 | try { |
f67539c2 TL |
335 | if (stop_cond()) { |
336 | return make_ready_future<>(); | |
337 | } | |
20effc67 TL |
338 | } catch (...) { |
339 | return current_exception_as_future(); | |
340 | } | |
f67539c2 TL |
341 | auto f = futurize_invoke(action); |
342 | if (f.failed()) { | |
343 | return f; | |
344 | } | |
345 | if (!f.available() || need_preempt()) { | |
346 | return [&] () noexcept { | |
347 | memory::scoped_critical_alloc_section _; | |
348 | auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action)); | |
349 | auto ret = task->get_future(); | |
350 | internal::set_callback(f, task); | |
351 | return ret; | |
352 | }(); | |
353 | } | |
354 | } | |
355 | } | |
356 | ||
357 | /// Invoke given action until it fails. | |
358 | /// | |
359 | /// Calls \c action repeatedly until it returns a failed future. | |
360 | /// | |
361 | /// \param action a callable taking no arguments, returning a \c future<> | |
362 | /// that becomes ready when you wish it to be called again. | |
363 | /// \return a future<> that will resolve to the first failure of \c action | |
364 | template<typename AsyncAction> | |
365 | SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, future<>> ) | |
366 | inline | |
367 | future<> keep_doing(AsyncAction action) noexcept { | |
368 | return repeat([action = std::move(action)] () mutable { | |
369 | return action().then([] { | |
370 | return stop_iteration::no; | |
371 | }); | |
372 | }); | |
373 | } | |
374 | ||
375 | namespace internal { | |
376 | template <typename Iterator, typename AsyncAction> | |
377 | class do_for_each_state final : public continuation_base<> { | |
378 | Iterator _begin; | |
379 | Iterator _end; | |
380 | AsyncAction _action; | |
381 | promise<> _pr; | |
382 | ||
383 | public: | |
384 | do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<> first_unavailable) | |
385 | : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) { | |
386 | internal::set_callback(first_unavailable, this); | |
387 | } | |
388 | virtual void run_and_dispose() noexcept override { | |
389 | std::unique_ptr<do_for_each_state> zis(this); | |
390 | if (_state.failed()) { | |
391 | _pr.set_urgent_state(std::move(_state)); | |
392 | return; | |
393 | } | |
394 | while (_begin != _end) { | |
395 | auto f = futurize_invoke(_action, *_begin++); | |
396 | if (f.failed()) { | |
397 | f.forward_to(std::move(_pr)); | |
398 | return; | |
399 | } | |
400 | if (!f.available() || need_preempt()) { | |
401 | _state = {}; | |
402 | internal::set_callback(f, this); | |
403 | zis.release(); | |
404 | return; | |
405 | } | |
406 | } | |
407 | _pr.set_value(); | |
408 | } | |
409 | task* waiting_task() noexcept override { | |
410 | return _pr.waiting_task(); | |
411 | } | |
412 | future<> get_future() { | |
413 | return _pr.get_future(); | |
414 | } | |
415 | }; | |
416 | ||
417 | template<typename Iterator, typename AsyncAction> | |
418 | inline | |
419 | future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) { | |
420 | while (begin != end) { | |
421 | auto f = futurize_invoke(action, *begin++); | |
422 | if (f.failed()) { | |
423 | return f; | |
424 | } | |
425 | if (!f.available() || need_preempt()) { | |
426 | auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{ | |
427 | std::move(begin), std::move(end), std::move(action), std::move(f)}; | |
428 | return s->get_future(); | |
429 | } | |
430 | } | |
431 | return make_ready_future<>(); | |
432 | } | |
433 | } // namespace internal | |
434 | ||
435 | /// \addtogroup future-util | |
436 | ||
437 | /// \brief Call a function for each item in a range, sequentially (iterator version). | |
438 | /// | |
439 | /// For each item in a range, call a function, waiting for the previous | |
440 | /// invocation to complete before calling the next one. | |
441 | /// | |
442 | /// \param begin an \c InputIterator designating the beginning of the range | |
443 | /// \param end an \c InputIterator designating the endof the range | |
444 | /// \param action a callable, taking a reference to objects from the range | |
445 | /// as a parameter, and returning a \c future<> that resolves | |
446 | /// when it is acceptable to process the next item. | |
447 | /// \return a ready future on success, or the first failed future if | |
448 | /// \c action failed. | |
449 | template<typename Iterator, typename AsyncAction> | |
450 | SEASTAR_CONCEPT( requires requires (Iterator i, AsyncAction aa) { | |
451 | { futurize_invoke(aa, *i) } -> std::same_as<future<>>; | |
452 | } ) | |
453 | inline | |
454 | future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept { | |
455 | try { | |
456 | return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action)); | |
457 | } catch (...) { | |
458 | return current_exception_as_future(); | |
459 | } | |
460 | } | |
461 | ||
462 | /// \brief Call a function for each item in a range, sequentially (range version). | |
463 | /// | |
464 | /// For each item in a range, call a function, waiting for the previous | |
465 | /// invocation to complete before calling the next one. | |
466 | /// | |
467 | /// \param c an \c Container object designating input range | |
468 | /// \param action a callable, taking a reference to objects from the range | |
469 | /// as a parameter, and returning a \c future<> that resolves | |
470 | /// when it is acceptable to process the next item. | |
471 | /// \return a ready future on success, or the first failed future if | |
472 | /// \c action failed. | |
473 | template<typename Container, typename AsyncAction> | |
474 | SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) { | |
475 | { futurize_invoke(aa, *c.begin()) } -> std::same_as<future<>>; | |
476 | } ) | |
477 | inline | |
478 | future<> do_for_each(Container& c, AsyncAction action) noexcept { | |
479 | try { | |
480 | return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action)); | |
481 | } catch (...) { | |
482 | return current_exception_as_future(); | |
483 | } | |
484 | } | |
485 | ||
486 | namespace internal { | |
487 | ||
488 | template <typename Iterator, typename IteratorCategory> | |
489 | inline | |
490 | size_t | |
491 | iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) { | |
492 | // For InputIterators we can't estimate needed capacity | |
493 | return 0; | |
494 | } | |
495 | ||
496 | template <typename Iterator> | |
497 | inline | |
498 | size_t | |
499 | iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) { | |
500 | // May be linear time below random_access_iterator_tag, but still better than reallocation | |
501 | return std::distance(begin, end); | |
502 | } | |
503 | ||
504 | } // namespace internal | |
505 | ||
506 | /// \cond internal | |
507 | ||
508 | class parallel_for_each_state final : private continuation_base<> { | |
509 | std::vector<future<>> _incomplete; | |
510 | promise<> _result; | |
511 | std::exception_ptr _ex; | |
512 | private: | |
513 | // Wait for one of the futures in _incomplete to complete, and then | |
514 | // decide what to do: wait for another one, or deliver _result if all | |
515 | // are complete. | |
516 | void wait_for_one() noexcept; | |
517 | virtual void run_and_dispose() noexcept override; | |
518 | task* waiting_task() noexcept override { return _result.waiting_task(); } | |
519 | public: | |
520 | parallel_for_each_state(size_t n); | |
521 | void add_future(future<>&& f); | |
522 | future<> get_future(); | |
523 | }; | |
524 | ||
525 | /// \endcond | |
526 | ||
527 | /// \brief Run tasks in parallel (iterator version). | |
528 | /// | |
529 | /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in | |
530 | /// the range, and return a future<> that resolves when all the functions | |
531 | /// complete. \c func should return a future<> that indicates when it is | |
532 | /// complete. All invocations are performed in parallel. This allows the range | |
533 | /// to refer to stack objects, but means that unlike other loops this cannot | |
534 | /// check need_preempt and can only be used with small ranges. | |
535 | /// | |
536 | /// \param begin an \c InputIterator designating the beginning of the range | |
537 | /// \param end an \c InputIterator designating the end of the range | |
538 | /// \param func Function to invoke with each element in the range (returning | |
539 | /// a \c future<>) | |
540 | /// \return a \c future<> that resolves when all the function invocations | |
541 | /// complete. If one or more return an exception, the return value | |
542 | /// contains one of the exceptions. | |
543 | template <typename Iterator, typename Func> | |
544 | SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } ) | |
545 | inline | |
546 | future<> | |
547 | parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept { | |
548 | parallel_for_each_state* s = nullptr; | |
549 | // Process all elements, giving each future the following treatment: | |
550 | // - available, not failed: do nothing | |
551 | // - available, failed: collect exception in ex | |
552 | // - not available: collect in s (allocating it if needed) | |
553 | while (begin != end) { | |
554 | auto f = futurize_invoke(std::forward<Func>(func), *begin++); | |
20effc67 | 555 | memory::scoped_critical_alloc_section _; |
f67539c2 TL |
556 | if (!f.available() || f.failed()) { |
557 | if (!s) { | |
f67539c2 TL |
558 | using itraits = std::iterator_traits<Iterator>; |
559 | auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1); | |
560 | s = new parallel_for_each_state(n); | |
561 | } | |
562 | s->add_future(std::move(f)); | |
563 | } | |
564 | } | |
565 | // If any futures were not available, hand off to parallel_for_each_state::start(). | |
566 | // Otherwise we can return a result immediately. | |
567 | if (s) { | |
568 | // s->get_future() takes ownership of s (and chains it to one of the futures it contains) | |
569 | // so this isn't a leak | |
570 | return s->get_future(); | |
571 | } | |
572 | return make_ready_future<>(); | |
573 | } | |
574 | ||
575 | /// \brief Run tasks in parallel (range version). | |
576 | /// | |
577 | /// Given a \c range of objects, invoke \c func with each object | |
578 | /// in the range, and return a future<> that resolves when all | |
579 | /// the functions complete. \c func should return a future<> that indicates | |
580 | /// when it is complete. All invocations are performed in parallel. This allows | |
581 | /// the range to refer to stack objects, but means that unlike other loops this | |
582 | /// cannot check need_preempt and can only be used with small ranges. | |
583 | /// | |
584 | /// \param range A range of objects to iterate run \c func on | |
585 | /// \param func A callable, accepting reference to the range's | |
586 | /// \c value_type, and returning a \c future<>. | |
587 | /// \return a \c future<> that becomes ready when the entire range | |
588 | /// was processed. If one or more of the invocations of | |
589 | /// \c func returned an exceptional future, then the return | |
590 | /// value will contain one of those exceptions. | |
591 | ||
592 | namespace internal { | |
593 | ||
594 | template <typename Range, typename Func> | |
595 | inline | |
596 | future<> | |
597 | parallel_for_each_impl(Range&& range, Func&& func) { | |
598 | return parallel_for_each(std::begin(range), std::end(range), | |
599 | std::forward<Func>(func)); | |
600 | } | |
601 | ||
602 | } // namespace internal | |
603 | ||
604 | template <typename Range, typename Func> | |
605 | SEASTAR_CONCEPT( requires requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } ) | |
606 | inline | |
607 | future<> | |
608 | parallel_for_each(Range&& range, Func&& func) noexcept { | |
609 | auto impl = internal::parallel_for_each_impl<Range, Func>; | |
610 | return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func)); | |
611 | } | |
612 | ||
613 | /// Run a maximum of \c max_concurrent tasks in parallel (iterator version). | |
614 | /// | |
615 | /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in | |
616 | /// the range, and return a future<> that resolves when all the functions | |
617 | /// complete. \c func should return a future<> that indicates when it is | |
618 | /// complete. Up to \c max_concurrent invocations are performed in parallel. | |
619 | /// This does not allow the range to refer to stack objects. The caller | |
620 | /// must ensure that the range outlives the call to max_concurrent_for_each | |
621 | /// so it can be iterated in the background. | |
622 | /// | |
623 | /// \param begin an \c InputIterator designating the beginning of the range | |
624 | /// \param end an \c InputIterator designating the end of the range | |
625 | /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero. | |
626 | /// \param func Function to invoke with each element in the range (returning | |
627 | /// a \c future<>) | |
628 | /// \return a \c future<> that resolves when all the function invocations | |
629 | /// complete. If one or more return an exception, the return value | |
630 | /// contains one of the exceptions. | |
631 | template <typename Iterator, typename Func> | |
632 | SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } ) | |
633 | inline | |
634 | future<> | |
635 | max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Func&& func) noexcept { | |
636 | struct state { | |
637 | Iterator begin; | |
638 | Iterator end; | |
639 | Func func; | |
640 | size_t max_concurrent; | |
641 | semaphore sem; | |
642 | std::exception_ptr err; | |
643 | ||
644 | state(Iterator begin_, Iterator end_, size_t max_concurrent_, Func func_) | |
645 | : begin(std::move(begin_)) | |
646 | , end(std::move(end_)) | |
647 | , func(std::move(func_)) | |
648 | , max_concurrent(max_concurrent_) | |
649 | , sem(max_concurrent_) | |
650 | , err() | |
651 | { } | |
652 | }; | |
653 | ||
654 | assert(max_concurrent > 0); | |
655 | ||
656 | try { | |
657 | return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) { | |
658 | return do_until([&s] { return s.begin == s.end; }, [&s] { | |
659 | return s.sem.wait().then([&s] () mutable noexcept { | |
660 | // Possibly run in background and signal _sem when the task is done. | |
661 | // The background tasks are waited on using _sem. | |
662 | (void)futurize_invoke(s.func, *s.begin++).then_wrapped([&s] (future<> fut) { | |
663 | if (fut.failed()) { | |
664 | auto e = fut.get_exception();; | |
665 | if (!s.err) { | |
666 | s.err = std::move(e); | |
667 | } | |
668 | } | |
669 | s.sem.signal(); | |
670 | }); | |
671 | }); | |
672 | }).then([&s] { | |
673 | // Wait for any background task to finish | |
674 | // and signal and semaphore | |
675 | return s.sem.wait(s.max_concurrent); | |
676 | }).then([&s] { | |
677 | if (!s.err) { | |
678 | return make_ready_future<>(); | |
679 | } | |
680 | return seastar::make_exception_future<>(std::move(s.err)); | |
681 | }); | |
682 | }); | |
683 | } catch (...) { | |
684 | return current_exception_as_future(); | |
685 | } | |
686 | } | |
687 | ||
688 | /// Run a maximum of \c max_concurrent tasks in parallel (range version). | |
689 | /// | |
20effc67 | 690 | /// Given a range of objects, run \c func on each \c *i in |
f67539c2 TL |
691 | /// the range, and return a future<> that resolves when all the functions |
692 | /// complete. \c func should return a future<> that indicates when it is | |
693 | /// complete. Up to \c max_concurrent invocations are performed in parallel. | |
694 | /// This does not allow the range to refer to stack objects. The caller | |
695 | /// must ensure that the range outlives the call to max_concurrent_for_each | |
696 | /// so it can be iterated in the background. | |
697 | /// | |
20effc67 | 698 | /// \param range a \c Range to be processed |
f67539c2 TL |
699 | /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero. |
700 | /// \param func Function to invoke with each element in the range (returning | |
701 | /// a \c future<>) | |
702 | /// \return a \c future<> that resolves when all the function invocations | |
703 | /// complete. If one or more return an exception, the return value | |
704 | /// contains one of the exceptions. | |
705 | template <typename Range, typename Func> | |
706 | SEASTAR_CONCEPT( requires std::ranges::range<Range> && requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } ) | |
707 | inline | |
708 | future<> | |
709 | max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept { | |
710 | try { | |
711 | return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func)); | |
712 | } catch (...) { | |
713 | return current_exception_as_future(); | |
714 | } | |
715 | } | |
716 | ||
717 | /// @} | |
718 | ||
719 | } // namespace seastar |