2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2020 ScyllaDB.
28 #include <type_traits>
31 #include <seastar/core/future.hh>
32 #include <seastar/core/task.hh>
33 #include <seastar/util/bool_class.hh>
34 #include <seastar/core/semaphore.hh>
38 /// \addtogroup future-util
41 // The AsyncAction concept represents an action which can complete later than
42 // the actual function invocation. It is represented by a function which
43 // returns a future which resolves when the action is done.
45 struct stop_iteration_tag { };
46 using stop_iteration = bool_class<stop_iteration_tag>;
50 template <typename AsyncAction>
51 class repeater final : public continuation_base<stop_iteration> {
55 explicit repeater(AsyncAction&& action) : _action(std::move(action)) {}
56 future<> get_future() { return _promise.get_future(); }
57 task* waiting_task() noexcept override { return _promise.waiting_task(); }
58 virtual void run_and_dispose() noexcept override {
59 if (_state.failed()) {
60 _promise.set_exception(std::move(_state).get_exception());
64 if (_state.get0() == stop_iteration::yes) {
73 auto f = futurize_invoke(_action);
75 internal::set_callback(std::move(f), this);
78 if (f.get0() == stop_iteration::yes) {
83 } while (!need_preempt());
85 _promise.set_exception(std::current_exception());
89 _state.set(stop_iteration::no);
94 } // namespace internal
96 // Delete these overloads so that the actual implementation can use a
97 // universal reference but still reject lvalue references.
98 template<typename AsyncAction>
99 future<> repeat(const AsyncAction& action) noexcept = delete;
100 template<typename AsyncAction>
101 future<> repeat(AsyncAction& action) noexcept = delete;
103 /// Invokes given action until it fails or the function requests iteration to stop by returning
104 /// \c stop_iteration::yes.
106 /// \param action a callable taking no arguments, returning a future<stop_iteration>. Will
107 /// be called again as soon as the future resolves, unless the
108 /// future fails, action throws, or it resolves with \c stop_iteration::yes.
109 /// If \c action is an r-value it can be moved in the middle of iteration.
110 /// \return a ready future if we stopped successfully, or a failed future if
111 /// a call to to \c action failed.
112 template<typename AsyncAction>
113 SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, stop_iteration> || seastar::InvokeReturns<AsyncAction, future<stop_iteration>> )
115 future<> repeat(AsyncAction&& action) noexcept {
116 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
117 static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
119 // Do not type-erase here in case this is a short repeat()
120 auto f = futurator::invoke(action);
122 if (!f.available() || f.failed() || need_preempt()) {
123 return [&] () noexcept {
124 memory::scoped_critical_alloc_section _;
125 auto repeater = new internal::repeater<AsyncAction>(std::move(action));
126 auto ret = repeater->get_future();
127 internal::set_callback(std::move(f), repeater);
132 if (f.get0() == stop_iteration::yes) {
133 return make_ready_future<>();
140 template <typename T>
141 struct repeat_until_value_type_helper;
143 /// Type helper for repeat_until_value()
144 template <typename T>
145 struct repeat_until_value_type_helper<future<std::optional<T>>> {
146 /// The type of the value we are computing
147 using value_type = T;
148 /// Type used by \c AsyncAction while looping
149 using optional_type = std::optional<T>;
150 /// Return type of repeat_until_value()
151 using future_type = future<value_type>;
154 /// Return value of repeat_until_value()
155 template <typename AsyncAction>
156 using repeat_until_value_return_type
157 = typename repeat_until_value_type_helper<typename futurize<std::invoke_result_t<AsyncAction>>::type>::future_type;
163 template <typename AsyncAction, typename T>
164 class repeat_until_value_state final : public continuation_base<std::optional<T>> {
168 explicit repeat_until_value_state(AsyncAction action) : _action(std::move(action)) {}
169 repeat_until_value_state(std::optional<T> st, AsyncAction action) : repeat_until_value_state(std::move(action)) {
170 this->_state.set(std::move(st));
172 future<T> get_future() { return _promise.get_future(); }
173 task* waiting_task() noexcept override { return _promise.waiting_task(); }
174 virtual void run_and_dispose() noexcept override {
175 if (this->_state.failed()) {
176 _promise.set_exception(std::move(this->_state).get_exception());
180 auto v = std::move(this->_state).get0();
182 _promise.set_value(std::move(*v));
190 auto f = futurize_invoke(_action);
191 if (!f.available()) {
192 internal::set_callback(std::move(f), this);
197 _promise.set_value(std::move(*ret));
201 } while (!need_preempt());
203 _promise.set_exception(std::current_exception());
207 this->_state.set(std::nullopt);
212 } // namespace internal
214 /// Invokes given action until it fails or the function requests iteration to stop by returning
215 /// an engaged \c future<std::optional<T>> or std::optional<T>. The value is extracted
216 /// from the \c optional, and returned, as a future, from repeat_until_value().
218 /// \param action a callable taking no arguments, returning a future<std::optional<T>>
219 /// or std::optional<T>. Will be called again as soon as the future
220 /// resolves, unless the future fails, action throws, or it resolves with
221 /// an engaged \c optional. If \c action is an r-value it can be moved
222 /// in the middle of iteration.
223 /// \return a ready future if we stopped successfully, or a failed future if
224 /// a call to to \c action failed. The \c optional's value is returned.
225 template<typename AsyncAction>
226 SEASTAR_CONCEPT( requires requires (AsyncAction aa) {
227 bool(futurize_invoke(aa).get0());
228 futurize_invoke(aa).get0().value();
230 repeat_until_value_return_type<AsyncAction>
231 repeat_until_value(AsyncAction action) noexcept {
232 using futurator = futurize<std::invoke_result_t<AsyncAction>>;
233 using type_helper = repeat_until_value_type_helper<typename futurator::type>;
234 // the "T" in the documentation
235 using value_type = typename type_helper::value_type;
236 using optional_type = typename type_helper::optional_type;
238 auto f = futurator::invoke(action);
240 if (!f.available()) {
241 return [&] () noexcept {
242 memory::scoped_critical_alloc_section _;
243 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
244 auto ret = state->get_future();
245 internal::set_callback(std::move(f), state);
251 return make_exception_future<value_type>(f.get_exception());
254 optional_type&& optional = std::move(f).get0();
256 return make_ready_future<value_type>(std::move(optional.value()));
258 } while (!need_preempt());
261 auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::nullopt, std::move(action));
262 auto f = state->get_future();
266 return make_exception_future<value_type>(std::current_exception());
272 template <typename StopCondition, typename AsyncAction>
273 class do_until_state final : public continuation_base<> {
278 explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
279 future<> get_future() { return _promise.get_future(); }
280 task* waiting_task() noexcept override { return _promise.waiting_task(); }
281 virtual void run_and_dispose() noexcept override {
282 if (_state.available()) {
283 if (_state.failed()) {
284 _promise.set_urgent_state(std::move(_state));
288 _state = {}; // allow next cycle to overrun state
293 _promise.set_value();
298 if (!f.available()) {
299 internal::set_callback(std::move(f), this);
303 f.forward_to(std::move(_promise));
307 } while (!need_preempt());
309 _promise.set_exception(std::current_exception());
317 } // namespace internal
319 /// Invokes given action until it fails or given condition evaluates to true or fails.
321 /// \param stop_cond a callable taking no arguments, returning a boolean that
322 /// evalutes to true when you don't want to call \c action
323 /// any longer. If \c stop_cond fails, the exception is propagated
324 // in the returned future.
325 /// \param action a callable taking no arguments, returning a future<>. Will
326 /// be called again as soon as the future resolves, unless the
327 /// future fails, or \c stop_cond returns \c true or fails.
328 /// \return a ready future if we stopped successfully, or a failed future if
329 /// a call to to \c action or a call to \c stop_cond failed.
330 template<typename AsyncAction, typename StopCondition>
331 SEASTAR_CONCEPT( requires seastar::InvokeReturns<StopCondition, bool> && seastar::InvokeReturns<AsyncAction, future<>> )
333 future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
334 using namespace internal;
338 return make_ready_future<>();
341 return current_exception_as_future();
343 auto f = futurize_invoke(action);
347 if (!f.available() || need_preempt()) {
348 return [&] () noexcept {
349 memory::scoped_critical_alloc_section _;
350 auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
351 auto ret = task->get_future();
352 internal::set_callback(std::move(f), task);
359 /// Invoke given action until it fails.
361 /// Calls \c action repeatedly until it returns a failed future.
363 /// \param action a callable taking no arguments, returning a \c future<>
364 /// that becomes ready when you wish it to be called again.
365 /// \return a future<> that will resolve to the first failure of \c action
366 template<typename AsyncAction>
367 SEASTAR_CONCEPT( requires seastar::InvokeReturns<AsyncAction, future<>> )
369 future<> keep_doing(AsyncAction action) noexcept {
370 return repeat([action = std::move(action)] () mutable {
371 return action().then([] {
372 return stop_iteration::no;
378 template <typename Iterator, typename AsyncAction>
379 class do_for_each_state final : public continuation_base<> {
386 do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable)
387 : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
388 internal::set_callback(std::move(first_unavailable), this);
390 virtual void run_and_dispose() noexcept override {
391 std::unique_ptr<do_for_each_state> zis(this);
392 if (_state.failed()) {
393 _pr.set_urgent_state(std::move(_state));
396 while (_begin != _end) {
397 auto f = futurize_invoke(_action, *_begin++);
399 f.forward_to(std::move(_pr));
402 if (!f.available() || need_preempt()) {
404 internal::set_callback(std::move(f), this);
411 task* waiting_task() noexcept override {
412 return _pr.waiting_task();
414 future<> get_future() {
415 return _pr.get_future();
419 template<typename Iterator, typename AsyncAction>
421 future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
422 while (begin != end) {
423 auto f = futurize_invoke(action, *begin++);
427 if (!f.available() || need_preempt()) {
428 auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{
429 std::move(begin), std::move(end), std::move(action), std::move(f)};
430 return s->get_future();
433 return make_ready_future<>();
435 } // namespace internal
437 /// \addtogroup future-util
439 /// \brief Call a function for each item in a range, sequentially (iterator version).
441 /// For each item in a range, call a function, waiting for the previous
442 /// invocation to complete before calling the next one.
444 /// \param begin an \c InputIterator designating the beginning of the range
445 /// \param end an \c InputIterator designating the endof the range
446 /// \param action a callable, taking a reference to objects from the range
447 /// as a parameter, and returning a \c future<> that resolves
448 /// when it is acceptable to process the next item.
449 /// \return a ready future on success, or the first failed future if
450 /// \c action failed.
451 template<typename Iterator, typename AsyncAction>
452 SEASTAR_CONCEPT( requires requires (Iterator i, AsyncAction aa) {
453 { futurize_invoke(aa, *i) } -> std::same_as<future<>>;
456 future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept {
458 return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
460 return current_exception_as_future();
464 /// \brief Call a function for each item in a range, sequentially (range version).
466 /// For each item in a range, call a function, waiting for the previous
467 /// invocation to complete before calling the next one.
469 /// \param c an \c Container object designating input range
470 /// \param action a callable, taking a reference to objects from the range
471 /// as a parameter, and returning a \c future<> that resolves
472 /// when it is acceptable to process the next item.
473 /// \return a ready future on success, or the first failed future if
474 /// \c action failed.
475 template<typename Container, typename AsyncAction>
476 SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) {
477 { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
481 future<> do_for_each(Container& c, AsyncAction action) noexcept {
483 return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
485 return current_exception_as_future();
491 template <typename T, typename = void>
492 struct has_iterator_category : std::false_type {};
494 template <typename T>
495 struct has_iterator_category<T, std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
497 template <typename Iterator, typename Sentinel, typename IteratorCategory>
500 iterator_range_estimate_vector_capacity(Iterator const&, Sentinel const&, IteratorCategory) {
501 // For InputIterators we can't estimate needed capacity
505 template <typename Iterator, typename Sentinel>
508 iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end, std::forward_iterator_tag) {
509 // May be linear time below random_access_iterator_tag, but still better than reallocation
510 return std::distance(begin, end);
513 } // namespace internal
517 class parallel_for_each_state final : private continuation_base<> {
518 std::vector<future<>> _incomplete;
520 std::exception_ptr _ex;
522 // Wait for one of the futures in _incomplete to complete, and then
523 // decide what to do: wait for another one, or deliver _result if all
525 void wait_for_one() noexcept;
526 virtual void run_and_dispose() noexcept override;
527 task* waiting_task() noexcept override { return _result.waiting_task(); }
529 parallel_for_each_state(size_t n);
530 void add_future(future<>&& f);
531 future<> get_future();
536 /// \brief Run tasks in parallel (iterator version).
538 /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
539 /// the range, and return a future<> that resolves when all the functions
540 /// complete. \c func should return a future<> that indicates when it is
541 /// complete. All invocations are performed in parallel. This allows the range
542 /// to refer to stack objects, but means that unlike other loops this cannot
543 /// check need_preempt and can only be used with small ranges.
545 /// \param begin an \c InputIterator designating the beginning of the range
546 /// \param end an \c InputIterator designating the end of the range
547 /// \param func Function to invoke with each element in the range (returning
549 /// \return a \c future<> that resolves when all the function invocations
550 /// complete. If one or more return an exception, the return value
551 /// contains one of the exceptions.
552 /// \note parallel_for_each() schedules all invocations of \c func on the
553 /// current shard. If you want to run a function on all shards in parallel,
554 /// have a look at \ref smp::invoke_on_all() instead.
555 template <typename Iterator, typename Sentinel, typename Func>
556 SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)))
557 // We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
558 // which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
559 // break legacy code, for which it holds that Sentinel equals Iterator.
562 parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept {
563 parallel_for_each_state* s = nullptr;
564 // Process all elements, giving each future the following treatment:
565 // - available, not failed: do nothing
566 // - available, failed: collect exception in ex
567 // - not available: collect in s (allocating it if needed)
568 while (begin != end) {
569 auto f = futurize_invoke(std::forward<Func>(func), *begin);
571 memory::scoped_critical_alloc_section _;
572 if (!f.available() || f.failed()) {
574 using itraits = std::iterator_traits<Iterator>;
576 if constexpr (internal::has_iterator_category<Iterator>::value) {
577 // We need if-constexpr here because there exist iterators for which std::iterator_traits
578 // does not have 'iterator_category' as member type
579 n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category{}) + 1);
581 s = new parallel_for_each_state(n);
583 s->add_future(std::move(f));
586 // If any futures were not available, hand off to parallel_for_each_state::start().
587 // Otherwise we can return a result immediately.
589 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
590 // so this isn't a leak
591 return s->get_future();
593 return make_ready_future<>();
596 /// \brief Run tasks in parallel (range version).
598 /// Given a \c range of objects, invoke \c func with each object
599 /// in the range, and return a future<> that resolves when all
600 /// the functions complete. \c func should return a future<> that indicates
601 /// when it is complete. All invocations are performed in parallel. This allows
602 /// the range to refer to stack objects, but means that unlike other loops this
603 /// cannot check need_preempt and can only be used with small ranges.
605 /// \param range A range of objects to iterate run \c func on
606 /// \param func A callable, accepting reference to the range's
607 /// \c value_type, and returning a \c future<>.
608 /// \return a \c future<> that becomes ready when the entire range
609 /// was processed. If one or more of the invocations of
610 /// \c func returned an exceptional future, then the return
611 /// value will contain one of those exceptions.
612 /// \note parallel_for_each() schedules all invocations of \c func on the
613 /// current shard. If you want to run a function on all shards in parallel,
614 /// have a look at \ref smp::invoke_on_all() instead.
618 template <typename Range, typename Func>
621 parallel_for_each_impl(Range&& range, Func&& func) {
622 return parallel_for_each(std::begin(range), std::end(range),
623 std::forward<Func>(func));
626 } // namespace internal
628 template <typename Range, typename Func>
629 SEASTAR_CONCEPT( requires requires (Func f, Range r) {
630 { f(*std::begin(r)) } -> std::same_as<future<>>;
635 parallel_for_each(Range&& range, Func&& func) noexcept {
636 auto impl = internal::parallel_for_each_impl<Range, Func>;
637 return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
640 /// Run a maximum of \c max_concurrent tasks in parallel (iterator version).
642 /// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
643 /// the range, and return a future<> that resolves when all the functions
644 /// complete. \c func should return a future<> that indicates when it is
645 /// complete. Up to \c max_concurrent invocations are performed in parallel.
646 /// This does not allow the range to refer to stack objects. The caller
647 /// must ensure that the range outlives the call to max_concurrent_for_each
648 /// so it can be iterated in the background.
650 /// \param begin an \c InputIterator designating the beginning of the range
651 /// \param end an \c InputIterator designating the end of the range
652 /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
653 /// \param func Function to invoke with each element in the range (returning
655 /// \return a \c future<> that resolves when all the function invocations
656 /// complete. If one or more return an exception, the return value
657 /// contains one of the exceptions.
658 /// \note max_concurrent_for_each() schedules all invocations of \c func on the
659 /// current shard. If you want to run a function on all shards in parallel,
660 /// have a look at \ref smp::invoke_on_all() instead.
661 template <typename Iterator, typename Sentinel, typename Func>
662 SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) ) )
663 // We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
664 // which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
665 // break legacy code, for which it holds that Sentinel equals Iterator.
668 max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func&& func) noexcept {
673 size_t max_concurrent;
675 std::exception_ptr err;
677 state(Iterator begin_, Sentinel end_, size_t max_concurrent_, Func func_)
678 : begin(std::move(begin_))
679 , end(std::move(end_))
680 , func(std::move(func_))
681 , max_concurrent(max_concurrent_)
682 , sem(max_concurrent_)
687 assert(max_concurrent > 0);
690 return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
691 return do_until([&s] { return s.begin == s.end; }, [&s] {
692 return s.sem.wait().then([&s] () mutable noexcept {
693 // Possibly run in background and signal _sem when the task is done.
694 // The background tasks are waited on using _sem.
695 (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (future<> fut) {
697 auto e = fut.get_exception();;
699 s.err = std::move(e);
707 // Wait for any background task to finish
708 // and signal and semaphore
709 return s.sem.wait(s.max_concurrent);
712 return make_ready_future<>();
714 return seastar::make_exception_future<>(std::move(s.err));
718 return current_exception_as_future();
722 /// Run a maximum of \c max_concurrent tasks in parallel (range version).
724 /// Given a range of objects, run \c func on each \c *i in
725 /// the range, and return a future<> that resolves when all the functions
726 /// complete. \c func should return a future<> that indicates when it is
727 /// complete. Up to \c max_concurrent invocations are performed in parallel.
728 /// This does not allow the range to refer to stack objects. The caller
729 /// must ensure that the range outlives the call to max_concurrent_for_each
730 /// so it can be iterated in the background.
732 /// \param range a \c Range to be processed
733 /// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
734 /// \param func Function to invoke with each element in the range (returning
736 /// \return a \c future<> that resolves when all the function invocations
737 /// complete. If one or more return an exception, the return value
738 /// contains one of the exceptions.
739 /// \note max_concurrent_for_each() schedules all invocations of \c func on the
740 /// current shard. If you want to run a function on all shards in parallel,
741 /// have a look at \ref smp::invoke_on_all() instead.
742 template <typename Range, typename Func>
743 SEASTAR_CONCEPT( requires requires (Func f, Range r) {
744 { f(*std::begin(r)) } -> std::same_as<future<>>;
749 max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
751 return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
753 return current_exception_as_future();
759 } // namespace seastar