]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * author: Niek J Bouman | |
1e59de90 | 21 | * reviewers: Avi Kivity, Benny Halevy |
20effc67 TL |
22 | * November 2021 |
23 | */ | |
24 | ||
25 | #pragma once | |
26 | ||
27 | #include <iterator> | |
28 | #include <cstddef> | |
29 | #include <type_traits> | |
30 | #include <vector> | |
1e59de90 TL |
31 | #include <tuple> |
32 | #include <utility> | |
20effc67 TL |
33 | #include <seastar/core/future.hh> |
34 | #include <seastar/core/shared_ptr.hh> | |
35 | ||
36 | namespace seastar { | |
37 | ||
38 | template <class Sequence> | |
39 | struct when_any_result { | |
40 | std::size_t index; | |
41 | Sequence futures; | |
42 | }; | |
43 | ||
44 | namespace internal { | |
45 | class waiter { | |
46 | bool _done = false; | |
47 | promise<std::size_t> _promise; | |
48 | ||
49 | public: | |
50 | void done(std::size_t index) { | |
51 | if (!_done) { | |
52 | _done = true; | |
53 | _promise.set_value(index); | |
54 | } | |
55 | } | |
56 | auto get_future() { return _promise.get_future(); } | |
57 | }; | |
58 | ||
59 | } // namespace internal | |
60 | ||
61 | /// Wait for the first of multiple futures to complete (iterator version). | |
62 | /// | |
63 | /// Given a range of futures as input, wait for the first of them | |
64 | /// to resolve (either successfully or with an exception), and return | |
65 | /// all of them in a \c when_any_result (following the concurrency TS from | |
66 | /// the standard library), containing a std::vector to all futures | |
67 | /// and the index (into the vector) of the future that resolved. | |
68 | /// | |
69 | /// \param begin an \c InputIterator designating the beginning of the range of futures | |
70 | /// \param end an \c InputIterator designating the end of the range of futures | |
71 | /// \return a \c when_any_result of all the futures in the input; when | |
72 | /// ready, at least one of the contained futures (the one indicated by index) will be ready. | |
73 | template <class FutureIterator> | |
74 | SEASTAR_CONCEPT( requires requires (FutureIterator i) { { *i++ }; requires is_future<std::remove_reference_t<decltype(*i)>>::value; } ) | |
75 | auto when_any(FutureIterator begin, FutureIterator end) noexcept | |
76 | -> future<when_any_result<std::vector<std::decay_t<typename std::iterator_traits<FutureIterator>::value_type>>>> | |
77 | { | |
78 | using ReturnType = when_any_result<std::vector<typename std::iterator_traits<FutureIterator>::value_type>>; | |
79 | if (begin == end) { | |
80 | return make_ready_future<ReturnType>(); | |
81 | } | |
82 | ReturnType result; | |
83 | result.futures.reserve(std::distance(begin, end)); | |
84 | auto waiter_obj = make_lw_shared<internal::waiter>(); | |
85 | std::size_t index{0}; | |
86 | for (auto it = begin; it != end; ++it) { | |
87 | if (it->available()) { | |
88 | result.futures.push_back(std::move(*it)); | |
89 | waiter_obj->done(index); | |
90 | } else { | |
91 | result.futures.push_back(it->finally([waiter_obj, index] { | |
92 | waiter_obj->done(index); | |
93 | })); | |
94 | } | |
95 | index++; | |
96 | } | |
97 | return waiter_obj->get_future().then( | |
98 | [result = std::move(result)](std::size_t index) mutable { | |
99 | result.index = index; | |
100 | return std::move(result); | |
101 | } | |
102 | ); | |
103 | } | |
1e59de90 TL |
104 | |
105 | namespace internal { | |
106 | ||
107 | template <class... Futures, std::size_t... I> | |
108 | future<when_any_result<std::tuple<Futures...>>> | |
109 | when_any_impl(std::index_sequence<I...>, Futures&&... futs) noexcept | |
110 | { | |
111 | auto waiter_obj = make_lw_shared<waiter>(); | |
112 | auto attach_notifier = [&](auto&& fut, size_t index) { | |
113 | if (fut.available()) { | |
114 | waiter_obj->done(index); | |
115 | return std::move(fut); | |
116 | } | |
117 | else { | |
118 | return fut.finally([waiter_obj, index] { waiter_obj->done(index); }); | |
119 | } | |
120 | }; | |
121 | ||
122 | auto result = | |
123 | when_any_result<std::tuple<Futures...>>{0, std::make_tuple(attach_notifier(std::forward<Futures>(futs), I)...)}; | |
124 | return waiter_obj->get_future().then([result = std::move(result)](std::size_t index) mutable { | |
125 | result.index = index; | |
126 | return std::move(result); | |
127 | }); | |
128 | } | |
129 | ||
130 | } // namespace internal | |
131 | ||
132 | /// Wait for the first of multiple futures to complete (variadic version). | |
133 | /// | |
134 | /// Each future can be passed directly, or a function that returns a | |
135 | /// future can be given instead. | |
136 | /// | |
137 | /// Returns a \c when_any_result (following the concurrency TS from | |
138 | /// the standard library), containing a std::tuple to all futures | |
139 | /// and the index (into the vector) of the future that resolved. | |
140 | /// | |
141 | /// \param fut_or_funcs futures or functions that return futures | |
142 | /// \return a \c when_any_result containing a tuple of all futures | |
143 | /// and and index; when ready, at least one of the contained futures | |
144 | /// (the one indicated by index) will be ready. | |
145 | template <class... FutOrFuncs> | |
146 | auto when_any(FutOrFuncs&&... fut_or_funcs) noexcept | |
147 | { | |
148 | return internal::when_any_impl(std::make_index_sequence<sizeof...(FutOrFuncs)>{}, | |
149 | futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...); | |
150 | } | |
151 | ||
20effc67 | 152 | } // namespace seastar |