1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
6 #include <seastar/core/future.hh>
8 #include "crimson/common/errorator.h"
12 template <class... AllowedErrors
>
13 class parallel_for_each_state final
: private seastar::continuation_base
<> {
14 using future_t
= typename errorator
<AllowedErrors
...>::template future
<>;
15 std::vector
<future_t
> _incomplete
;
16 seastar::promise
<> _result
;
17 std::exception_ptr _ex
;
19 void wait_for_one() noexcept
{
20 while (!_incomplete
.empty() && _incomplete
.back().available()) {
21 if (_incomplete
.back().failed()) {
22 _ex
= _incomplete
.back().get_exception();
24 _incomplete
.pop_back();
26 if (!_incomplete
.empty()) {
27 seastar::internal::set_callback(std::move(_incomplete
.back()),
28 static_cast<continuation_base
<>*>(this));
29 _incomplete
.pop_back();
32 if (__builtin_expect(bool(_ex
), false)) {
33 _result
.set_exception(std::move(_ex
));
39 virtual void run_and_dispose() noexcept override
{
40 if (_state
.failed()) {
41 _ex
= std::move(_state
).get_exception();
46 task
* waiting_task() noexcept override
{ return _result
.waiting_task(); }
48 parallel_for_each_state(size_t n
) {
49 _incomplete
.reserve(n
);
51 void add_future(future_t
&& f
) {
52 _incomplete
.push_back(std::move(f
));
54 future_t
get_future() {
55 auto ret
= _result
.get_future();
61 template <typename Iterator
, typename Func
, typename
... AllowedErrors
>
62 static inline typename errorator
<AllowedErrors
...>::template future
<>
63 parallel_for_each(Iterator first
, Iterator last
, Func
&& func
) noexcept
{
64 parallel_for_each_state
<AllowedErrors
...>* s
= nullptr;
65 // Process all elements, giving each future the following treatment:
66 // - available, not failed: do nothing
67 // - available, failed: collect exception in ex
68 // - not available: collect in s (allocating it if needed)
69 for (;first
!= last
; ++first
) {
70 auto f
= seastar::futurize_invoke(std::forward
<Func
>(func
), *first
);
71 if (!f
.available() || f
.failed()) {
73 using itraits
= std::iterator_traits
<Iterator
>;
74 auto n
= (seastar::internal::iterator_range_estimate_vector_capacity(
75 first
, last
, typename
itraits::iterator_category()) + 1);
76 s
= new parallel_for_each_state
<AllowedErrors
...>(n
);
78 s
->add_future(std::move(f
));
81 // If any futures were not available, hand off to parallel_for_each_state::start().
82 // Otherwise we can return a result immediately.
84 // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
85 // so this isn't a leak
86 return s
->get_future();
88 return seastar::make_ready_future
<>();
91 } // namespace crimson