#pragma once
+#include <cstddef>
#include <iterator>
#include <memory>
+#include <type_traits>
#include <vector>
#include <seastar/core/future.hh>
do {
auto f = futurize_invoke(_action);
if (!f.available()) {
- internal::set_callback(f, this);
+ internal::set_callback(std::move(f), this);
return;
}
if (f.get0() == stop_iteration::yes) {
memory::scoped_critical_alloc_section _;
auto repeater = new internal::repeater<AsyncAction>(std::move(action));
auto ret = repeater->get_future();
- internal::set_callback(f, repeater);
+ internal::set_callback(std::move(f), repeater);
return ret;
}();
}
do {
auto f = futurize_invoke(_action);
if (!f.available()) {
- internal::set_callback(f, this);
+ internal::set_callback(std::move(f), this);
return;
}
auto ret = f.get0();
memory::scoped_critical_alloc_section _;
auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
auto ret = state->get_future();
- internal::set_callback(f, state);
+ internal::set_callback(std::move(f), state);
return ret;
}();
}
}
auto f = _action();
if (!f.available()) {
- internal::set_callback(f, this);
+ internal::set_callback(std::move(f), this);
return;
}
if (f.failed()) {
memory::scoped_critical_alloc_section _;
auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
auto ret = task->get_future();
- internal::set_callback(f, task);
+ internal::set_callback(std::move(f), task);
return ret;
}();
}
promise<> _pr;
public:
- do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<> first_unavailable)
+ do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable)
: _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
- internal::set_callback(first_unavailable, this);
+ internal::set_callback(std::move(first_unavailable), this);
}
virtual void run_and_dispose() noexcept override {
std::unique_ptr<do_for_each_state> zis(this);
}
if (!f.available() || need_preempt()) {
_state = {};
- internal::set_callback(f, this);
+ internal::set_callback(std::move(f), this);
zis.release();
return;
}
/// \c action failed.
template<typename Container, typename AsyncAction>
SEASTAR_CONCEPT( requires requires (Container c, AsyncAction aa) {
- { futurize_invoke(aa, *c.begin()) } -> std::same_as<future<>>;
+ { futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
+ std::end(c);
} )
inline
future<> do_for_each(Container& c, AsyncAction action) noexcept {
namespace internal {
-template <typename Iterator, typename IteratorCategory>
+template <typename T, typename = void>
+struct has_iterator_category : std::false_type {};
+
+template <typename T>
+struct has_iterator_category<T, std::void_t<typename std::iterator_traits<T>::iterator_category >> : std::true_type {};
+
+template <typename Iterator, typename Sentinel, typename IteratorCategory>
inline
size_t
-iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, IteratorCategory category) {
+iterator_range_estimate_vector_capacity(Iterator const&, Sentinel const&, IteratorCategory) {
// For InputIterators we can't estimate needed capacity
return 0;
}
-template <typename Iterator>
+template <typename Iterator, typename Sentinel>
inline
size_t
-iterator_range_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_iterator_tag category) {
+iterator_range_estimate_vector_capacity(Iterator begin, Sentinel end, std::forward_iterator_tag) {
// May be linear time below random_access_iterator_tag, but still better than reallocation
return std::distance(begin, end);
}
/// \return a \c future<> that resolves when all the function invocations
/// complete. If one or more return an exception, the return value
/// contains one of the exceptions.
-template <typename Iterator, typename Func>
-SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
+/// \note parallel_for_each() schedules all invocations of \c func on the
+/// current shard. If you want to run a function on all shards in parallel,
+/// have a look at \ref smp::invoke_on_all() instead.
+template <typename Iterator, typename Sentinel, typename Func>
+SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { i++ }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)))
+// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
+// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
+// break legacy code, for which it holds that Sentinel equals Iterator.
inline
future<>
-parallel_for_each(Iterator begin, Iterator end, Func&& func) noexcept {
+parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept {
parallel_for_each_state* s = nullptr;
// Process all elements, giving each future the following treatment:
// - available, not failed: do nothing
// - available, failed: collect exception in ex
// - not available: collect in s (allocating it if needed)
while (begin != end) {
- auto f = futurize_invoke(std::forward<Func>(func), *begin++);
+ auto f = futurize_invoke(std::forward<Func>(func), *begin);
+ ++begin;
memory::scoped_critical_alloc_section _;
if (!f.available() || f.failed()) {
if (!s) {
using itraits = std::iterator_traits<Iterator>;
- auto n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category()) + 1);
+ size_t n{0U};
+ if constexpr (internal::has_iterator_category<Iterator>::value) {
+ // We need if-constexpr here because there exist iterators for which std::iterator_traits
+ // does not have 'iterator_category' as member type
+ n = (internal::iterator_range_estimate_vector_capacity(begin, end, typename itraits::iterator_category{}) + 1);
+ }
s = new parallel_for_each_state(n);
}
s->add_future(std::move(f));
/// was processed. If one or more of the invocations of
/// \c func returned an exceptional future, then the return
/// value will contain one of those exceptions.
+/// \note parallel_for_each() schedules all invocations of \c func on the
+/// current shard. If you want to run a function on all shards in parallel,
+/// have a look at \ref smp::invoke_on_all() instead.
namespace internal {
} // namespace internal
template <typename Range, typename Func>
-SEASTAR_CONCEPT( requires requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
+SEASTAR_CONCEPT( requires requires (Func f, Range r) {
+ { f(*std::begin(r)) } -> std::same_as<future<>>;
+ std::end(r);
+} )
inline
future<>
parallel_for_each(Range&& range, Func&& func) noexcept {
/// \return a \c future<> that resolves when all the function invocations
/// complete. If one or more return an exception, the return value
/// contains one of the exceptions.
-template <typename Iterator, typename Func>
-SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
+/// \note max_concurrent_for_each() schedules all invocations of \c func on the
+/// current shard. If you want to run a function on all shards in parallel,
+/// have a look at \ref smp::invoke_on_all() instead.
+template <typename Iterator, typename Sentinel, typename Func>
+SEASTAR_CONCEPT( requires (requires (Func f, Iterator i) { { f(*i) } -> std::same_as<future<>>; { ++i }; } && (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>) ) )
+// We use a conjunction with std::same_as<Sentinel, Iterator> because std::sentinel_for requires Sentinel to be semiregular,
+// which implies that it requires Sentinel to be default-constructible, which is unnecessarily strict in below's context and could
+// break legacy code, for which it holds that Sentinel equals Iterator.
inline
future<>
-max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Func&& func) noexcept {
+max_concurrent_for_each(Iterator begin, Sentinel end, size_t max_concurrent, Func&& func) noexcept {
struct state {
Iterator begin;
- Iterator end;
+ Sentinel end;
Func func;
size_t max_concurrent;
semaphore sem;
std::exception_ptr err;
- state(Iterator begin_, Iterator end_, size_t max_concurrent_, Func func_)
+ state(Iterator begin_, Sentinel end_, size_t max_concurrent_, Func func_)
: begin(std::move(begin_))
, end(std::move(end_))
, func(std::move(func_))
return s.sem.wait().then([&s] () mutable noexcept {
// Possibly run in background and signal _sem when the task is done.
// The background tasks are waited on using _sem.
- (void)futurize_invoke(s.func, *s.begin++).then_wrapped([&s] (future<> fut) {
+ (void)futurize_invoke(s.func, *s.begin).then_wrapped([&s] (future<> fut) {
if (fut.failed()) {
auto e = fut.get_exception();;
if (!s.err) {
}
s.sem.signal();
});
+ ++s.begin;
});
}).then([&s] {
// Wait for any background task to finish
/// \return a \c future<> that resolves when all the function invocations
/// complete. If one or more return an exception, the return value
/// contains one of the exceptions.
+/// \note max_concurrent_for_each() schedules all invocations of \c func on the
+/// current shard. If you want to run a function on all shards in parallel,
+/// have a look at \ref smp::invoke_on_all() instead.
template <typename Range, typename Func>
-SEASTAR_CONCEPT( requires std::ranges::range<Range> && requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
+SEASTAR_CONCEPT( requires requires (Func f, Range r) {
+ { f(*std::begin(r)) } -> std::same_as<future<>>;
+ std::end(r);
+} )
inline
future<>
max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {