2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2020 ScyllaDB.
29 #include <seastar/core/future.hh>
30 #include <seastar/core/task.hh>
31 #include <seastar/util/bool_class.hh>
32 #include <seastar/core/semaphore.hh>
36 /// \addtogroup future-util
39 // The AsyncAction concept represents an action which can complete later than
40 // the actual function invocation. It is represented by a function which
41 // returns a future which resolves when the action is done.
43 struct stop_iteration_tag { };
44 using stop_iteration = bool_class<stop_iteration_tag>;
48 template <typename AsyncAction>
49 class repeater final : public continuation_base<stop_iteration> {
53 explicit repeater(AsyncAction&& action) : _action(std::move(action)) {}
54 future<> get_future() { return _promise.get_future(); }
55 task* waiting_task() noexcept override { return _promise.waiting_task(); }
56 virtual void run_and_dispose() noexcept override {
57 if (_state.failed()) {
58 _promise.set_exception(std::move(_state).get_exception());
62 if (_state.get0() == stop_iteration::yes) {
71 auto f = futurize_invoke(_action);
73 internal::set_callback(f, this);
76 if (f.get0() == stop_iteration::yes) {
81 } while (!need_preempt());
83 _promise.set_exception(std::current_exception());
87 _state.set(stop_iteration::no);
92 } // namespace internal
94 // Delete these overloads so that the actual implementation can use a
95 // universal reference but still reject lvalue references.
96 template<typename AsyncAction>
97 future<> repeat(const AsyncAction& action) noexcept = delete;
98 template<typename AsyncAction>
99 future<> repeat(AsyncAction& action) noexcept = delete;
101 /// Invokes given action until it fails or the function requests iteration to stop by returning
102 /// \c stop_iteration::yes.
104 /// \param action a callable taking no arguments, returning a future<stop_iteration>. Will
105 /// be called again as soon as the future resolves, unless the
106 /// future fails, action throws, or it resolves with \c stop_iteration::yes.
107 /// If \c action is an r-value it can be moved in the middle of iteration.
108 /// \return a ready future if we stopped successfully, or a failed future if
109 /// a call to to \c action failed.
110 template<typename AsyncAction>
111 SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, stop_iteration> || seastar::InvokeReturns<AsyncAction, future<stop_iteration>> )
113 future<> repeat(AsyncAction&& action) noexcept {
114 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
115 static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
117 // Do not type-erase here in case this is a short repeat()
118 auto f = futurator::invoke(action);
120 if (!f.available() || f.failed() || need_preempt()) {
121 return [&] () noexcept {
122 memory::scoped_critical_alloc_section _;
123 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
124 auto ret = repeater->get_future();
125 internal::set_callback(f, repeater);
130 if (f.get0() == stop_iteration::yes) {
131 return make_ready_future<>();
138 template <typename T>
139 struct repeat_until_value_type_helper;
141 /// Type helper for repeat_until_value()
142 template <typename T>
143 struct repeat_until_value_type_helper<future<std::optional<T>>> {
144 /// The type of the value we are computing
145 using value_type = T;
146 /// Type used by \c AsyncAction while looping
147 using optional_type = std::optional<T>;
148 /// Return type of repeat_until_value()
149 using future_type = future<value_type>;
152 /// Return value of repeat_until_value()
153 template <typename AsyncAction>
154 using repeat_until_value_return_type
155 = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
161 template <typename AsyncAction, typename T>
162 class repeat_until_value_state final : public continuation_base<std::optional<T>> {
166 explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
167 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
168 this->_state.set(std::move(st));
170 future<T> get_future() { return _promise.get_future(); }
171 task* waiting_task() noexcept override { return _promise.waiting_task(); }
172 virtual void run_and_dispose() noexcept override {
173 if (this->_state.failed()) {
174 _promise.set_exception(std::move(this->_state).get_exception());
178 auto v = std::move(this->_state).get0();
180 _promise.set_value(std::move(*v));
188 auto f = futurize_invoke(_action);
189 if (!f.available()) {
190 internal::set_callback(f, this);
195 _promise.set_value(std::move(*ret));
199 } while (!need_preempt());
201 _promise.set_exception(std::current_exception());
205 this->_state.set(std::nullopt);
210 } // namespace internal
212 /// Invokes given action until it fails or the function requests iteration to stop by returning
213 /// an engaged \c future<std::optional<T>> or std::optional<T>. The value is extracted
214 /// from the \c optional, and returned, as a future, from repeat_until_value().
216 /// \param action a callable taking no arguments, returning a future<std::optional<T>>
217 /// or std::optional<T>. Will be called again as soon as the future
218 /// resolves, unless the future fails, action throws, or it resolves with
219 /// an engaged \c optional. If \c action is an r-value it can be moved
220 /// in the middle of iteration.
221 /// \return a ready future if we stopped successfully, or a failed future if
222 /// a call to to \c action failed. The \c optional's value is returned.
223 template<typename AsyncAction>
224 SEASTAR_CONCEPT( requires requires (AsyncAction aa) {
225 bool(futurize_invoke(aa).get0());
226 futurize_invoke(aa).get0().value();
228 repeat_until_value_return_type<AsyncAction>
229 repeat_until_value(AsyncAction action) noexcept {
230 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
231 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
232 // the "T" in the documentation
233 using value_type = typename type_helper::value_type;
234 using optional_type = typename type_helper::optional_type;
236 auto f = futurator::invoke(action);
238 if (!f.available()) {
239 return [&] () noexcept {
240 memory::scoped_critical_alloc_section _;
241 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
242 auto ret = state->get_future();
243 internal::set_callback(f, state);
249 return make_exception_future<value_type>(f.get_exception());
252 optional_type&& optional = std::move(f).get0();
254 return make_ready_future<value_type>(std::move(optional.value()));
256 } while (!need_preempt());
259 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
260 auto f = state->get_future();
264 return make_exception_future<value_type>(std::current_exception());
270 template <typename StopCondition, typename AsyncAction>
271 class do_until_state final : public continuation_base<> {
276 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
277 future<> get_future() { return _promise.get_future(); }
278 task* waiting_task() noexcept override { return _promise.waiting_task(); }
279 virtual void run_and_dispose() noexcept override {
280 if (_state.available()) {
281 if (_state.failed()) {
282 _promise.set_urgent_state(std::move(_state));
286 _state = {}; // allow next cycle to overrun state
291 _promise.set_value();
296 if (!f.available()) {
297 internal::set_callback(f, this);
301 f.forward_to(std::move(_promise));
305 } while (!need_preempt());
307 _promise.set_exception(std::current_exception());
315 } // namespace internal
317 /// Invokes given action until it fails or given condition evaluates to true or fails.
319 /// \param stop_cond a callable taking no arguments, returning a boolean that
320 /// evalutes to true when you don't want to call \c action
321 /// any longer. If \c stop_cond fails, the exception is propagated
322 // in the returned future.
323 /// \param action a callable taking no arguments, returning a future<>. Will
324 /// be called again as soon as the future resolves, unless the
325 /// future fails, or \c stop_cond returns \c true or fails.
326 /// \return a ready future if we stopped successfully, or a failed future if
327 /// a call to to \c action or a call to \c stop_cond failed.
328 template<typename AsyncAction, typename StopCondition>
329 SEASTAR_CONCEPT( requires seastar::InvokeReturns<StopCondition, bool> && seastar::InvokeReturns<AsyncAction, future<>> )
331 future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
332 using namespace internal;
336 return make_ready_future<>();
339 return current_exception_as_future();
341 auto f = futurize_invoke(action);
345 if (!f.available() || need_preempt()) {
346 return [&] () noexcept {
347 memory::scoped_critical_alloc_section _;
348 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
349 auto ret = task->get_future();
350 internal::set_callback(f, task);
357 /// Invoke given action until it fails.
359 /// Calls \c action repeatedly until it returns a failed future.
361 /// \param action a callable taking no arguments, returning a \c future<>
362 /// that becomes ready when you wish it to be called again.
363 /// \return a future<> that will resolve to the first failure of \c action
364 template<typename AsyncAction>
365 SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, future<>> )
367 future<> keep_doing(AsyncAction action) noexcept {
368 return repeat([action = std::move(action)] () mutable {
369 return action().then([] {
370 return stop_iteration::no;
376 template <typename Iterator, typename AsyncAction>
377 class do_for_each_state final : public continuation_base<> {
384 do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<> first_unavailable)
385 : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
386 internal::set_callback(first_unavailable, this);
388 virtual void run_and_dispose() noexcept override {
389 std::unique_ptr<do_for_each_state> zis(this);
390 if (_state.failed()) {
391 _pr.set_urgent_state(std::move(_state));
394 while (_begin != _end) {
395 auto f = futurize_invoke(_action, *_begin++);
397 f.forward_to(std::move(_pr));
400 if (!f.available() || need_preempt()) {
402 internal::set_callback(f, this);
409 task* waiting_task() noexcept override {
410 return _pr.waiting_task();
412 future<> get_future() {
413 return _pr.get_future();
417 template<typename Iterator, typename AsyncAction>
419 future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
420 while (begin != end) {
421 auto f = futurize_invoke(action, *begin++);
425 if (!f.available() || need_preempt()) {
426 auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{
427 std::move(begin), std::move(end), std::move(action), std::move(f)};
428 return s->get_future();
431 return make_ready_future<>();
433 } // namespace internal
435 /// \addtogroup future-util
437 /// \brief Call a function for each item in a range, sequentially (iterator version).
439 /// For each item in a range, call a function, waiting for the previous
440 /// invocation to complete before calling the next one.
442 /// \param begin an \c InputIterator designating the beginning of the range
443 /// \param end an \c InputIterator designating the endof the range
444 /// \param action a callable, taking a reference to objects from the range
445 /// as a parameter, and returning a \c future<> that resolves
446 /// when it is acceptable to process the next item.
447 /// \return a ready future on success, or the first failed future if
448 /// \c action failed.
449 template<typename Iterator, typename AsyncAction>
450 SEASTAR_CONCEPT( requires requires (Iterator i, AsyncAction aa) {
451 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
454 future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept {
456 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
458 return current_exception_as_future();
462 /// \brief Call a function for each item in a range, sequentially (range version).
464 /// For each item in a range, call a function, waiting for the previous
465 /// invocation to complete before calling the next one.
467 /// \param c an \c Container object designating input range
468 /// \param action a callable, taking a reference to objects from the range
469 /// as a parameter, and returning a \c future<> that resolves
470 /// when it is acceptable to process the next item.
471 /// \return a ready future on success, or the first failed future if
472 /// \c action failed.
473 template<typename Container, typename AsyncAction>
474 SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) {
475 { futurize_invoke(aa, *c.begin()) } -> std::same_as<future<>>;
478 future<> do_for_each(Container& c, AsyncAction action) noexcept {
480 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
482 return current_exception_as_future();
488 template <typename Iterator, typename IteratorCategory>
491 iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
492 // For InputIterators we can't estimate needed capacity
496 template <typename Iterator>
499 iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
500 // May be linear time below random_access_iterator_tag, but still better than reallocation
501 return std::distance(begin, end);
504 } // namespace internal
508 class parallel_for_each_state final : private continuation_base<> {
509 std::vector<future<>> _incomplete;
511 std::exception_ptr _ex;
513 // Wait for one of the futures in _incomplete to complete, and then
514 // decide what to do: wait for another one, or deliver _result if all
516 void wait_for_one() noexcept;
517 virtual void run_and_dispose() noexcept override;
518 task* waiting_task() noexcept override { return _result.waiting_task(); }
520 parallel_for_each_state(size_t n);
521 void add_future(future<>&& f);
522 future<> get_future();
527 /// \brief Run tasks in parallel (iterator version).
529 /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
530 /// the range, and return a future<> that resolves when all the functions
531 /// complete. \c func should return a future<> that indicates when it is
532 /// complete. All invocations are performed in parallel. This allows the range
533 /// to refer to stack objects, but means that unlike other loops this cannot
534 /// check need_preempt and can only be used with small ranges.
536 /// \param begin an \c InputIterator designating the beginning of the range
537 /// \param end an \c InputIterator designating the end of the range
538 /// \param func Function to invoke with each element in the range (returning
540 /// \return a \c future<> that resolves when all the function invocations
541 /// complete. If one or more return an exception, the return value
542 /// contains one of the exceptions.
543 template <typename Iterator, typename Func>
544 SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
547 parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
548 parallel_for_each_state* s = nullptr;
549 // Process all elements, giving each future the following treatment:
550 // - available, not failed: do nothing
551 // - available, failed: collect exception in ex
552 // - not available: collect in s (allocating it if needed)
553 while (begin != end) {
554 auto f = futurize_invoke(std::forward<Func>(func), *begin++);
555 memory::scoped_critical_alloc_section _;
556 if (!f.available() || f.failed()) {
558 using itraits = std::iterator_traits<Iterator>;
559 auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1);
560 s = new parallel_for_each_state(n);
562 s->add_future(std::move(f));
565 // If any futures were not available, hand off to parallel_for_each_state::start().
566 // Otherwise we can return a result immediately.
568 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
569 // so this isn't a leak
570 return s->get_future();
572 return make_ready_future<>();
575 /// \brief Run tasks in parallel (range version).
577 /// Given a \c range of objects, invoke \c func with each object
578 /// in the range, and return a future<> that resolves when all
579 /// the functions complete. \c func should return a future<> that indicates
580 /// when it is complete. All invocations are performed in parallel. This allows
581 /// the range to refer to stack objects, but means that unlike other loops this
582 /// cannot check need_preempt and can only be used with small ranges.
584 /// \param range A range of objects to iterate run \c func on
585 /// \param func A callable, accepting reference to the range's
586 /// \c value_type, and returning a \c future<>.
587 /// \return a \c future<> that becomes ready when the entire range
588 /// was processed. If one or more of the invocations of
589 /// \c func returned an exceptional future, then the return
590 /// value will contain one of those exceptions.
594 template <typename Range, typename Func>
597 parallel_for_each_impl(Range&& range, Func&& func) {
598 return parallel_for_each(std::begin(range), std::end(range),
599 std::forward<Func>(func));
602 } // namespace internal
604 template <typename Range, typename Func>
605 SEASTAR_CONCEPT( requires requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
608 parallel_for_each(Range&& range, Func&& func) noexcept {
609 auto impl = internal::parallel_for_each_impl<Range, Func>;
610 return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
613 /// Run a maximum of \c max_concurrent tasks in parallel (iterator version).
615 /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
616 /// the range, and return a future<> that resolves when all the functions
617 /// complete. \c func should return a future<> that indicates when it is
618 /// complete. Up to \c max_concurrent invocations are performed in parallel.
619 /// This does not allow the range to refer to stack objects. The caller
620 /// must ensure that the range outlives the call to max_concurrent_for_each
621 /// so it can be iterated in the background.
623 /// \param begin an \c InputIterator designating the beginning of the range
624 /// \param end an \c InputIterator designating the end of the range
625 /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
626 /// \param func Function to invoke with each element in the range (returning
628 /// \return a \c future<> that resolves when all the function invocations
629 /// complete. If one or more return an exception, the return value
630 /// contains one of the exceptions.
631 template <typename Iterator, typename Func>
632 SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
635 max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Func&& func) noexcept {
640 size_t max_concurrent;
642 std::exception_ptr err;
644 state(Iterator begin_, Iterator end_, size_t max_concurrent_, Func func_)
645 : begin(std::move(begin_))
646 , end(std::move(end_))
647 , func(std::move(func_))
648 , max_concurrent(max_concurrent_)
649 , sem(max_concurrent_)
654 assert(max_concurrent > 0);
657 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
658 return do_until([&s] { return s.begin == s.end; }, [&s] {
659 return s.sem.wait().then([&s] () mutable noexcept {
660 // Possibly run in background and signal _sem when the task is done.
661 // The background tasks are waited on using _sem.
662 (void)futurize_invoke(s.func, *s.begin++).then_wrapped([&s] (future<> fut) {
664 auto e = fut.get_exception();;
666 s.err = std::move(e);
673 // Wait for any background task to finish
674 // and signal and semaphore
675 return s.sem.wait(s.max_concurrent);
678 return make_ready_future<>();
680 return seastar::make_exception_future<>(std::move(s.err));
684 return current_exception_as_future();
688 /// Run a maximum of \c max_concurrent tasks in parallel (range version).
690 /// Given a range of objects, run \c func on each \c *i in
691 /// the range, and return a future<> that resolves when all the functions
692 /// complete. \c func should return a future<> that indicates when it is
693 /// complete. Up to \c max_concurrent invocations are performed in parallel.
694 /// This does not allow the range to refer to stack objects. The caller
695 /// must ensure that the range outlives the call to max_concurrent_for_each
696 /// so it can be iterated in the background.
698 /// \param range a \c Range to be processed
699 /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
700 /// \param func Function to invoke with each element in the range (returning
702 /// \return a \c future<> that resolves when all the function invocations
703 /// complete. If one or more return an exception, the return value
704 /// contains one of the exceptions.
705 template <typename Range, typename Func>
706 SEASTAR_CONCEPT( requires std::ranges::range<Range> && requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
709 max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
711 return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
713 return current_exception_as_future();
719 } // namespace seastar