]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/coroutine/all.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / coroutine / all.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2021-present ScyllaDB
20 */
21
22 #pragma once
23
24 #include <concepts>
25 #include <type_traits>
26 #include <tuple>
27 #include <seastar/core/coroutine.hh>
28
29 namespace seastar::coroutine {
30
31 template <typename Future>
32 constexpr inline bool is_future_v = is_future<Future>::value;
33
34 template <typename Future>
35 concept future_type = is_future_v<Future>;
36
37 namespace internal {
38
39 // Given a bunch of futures, find the indexes of the ones that are not avoid
40 // and store them in member type `type` as an std::integer_sequence.
41 //
42 // `IndexSequence` and `current` are intermediates used for recursion.
43 template <typename IndexSequence, size_t current, typename... Futures>
44 struct index_sequence_for_non_void_futures_helper;
45
46 // Terminate recursion be returning the accumulated `IndexSequence`
47 template <typename IndexSequence, size_t current>
48 struct index_sequence_for_non_void_futures_helper<IndexSequence, current> {
49 using type = IndexSequence;
50 };
51
52 // Process a future<T> by adding it to the current IndexSequence and recursing
53 template <size_t... Existing, size_t current, typename T, typename... Futures>
54 struct index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t, Existing...>, current, future<T>, Futures...> {
55 using type = typename index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t, Existing..., current>, current + 1, Futures...>::type;
56 };
57
58 // Process a future<void> by ignoring it and recursing
59 template <size_t... Existing, size_t current, typename... Futures>
60 struct index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t, Existing...>, current, future<>, Futures...> {
61 using type = typename index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t, Existing...>, current + 1, Futures...>::type;
62 };
63
64 // Simple interface for the above.
65 template <typename... Futures>
66 using index_sequence_for_non_void_futures = typename index_sequence_for_non_void_futures_helper<std::integer_sequence<size_t>, 0, Futures...>::type;
67
68 // Given a tuple of futures, return a tuple of the value types, excluding future<void>.
69 template <typename IndexSequence, typename FutureTuple>
70 struct value_tuple_for_non_void_futures_helper;
71
72 template <size_t... Idx, typename FutureTuple>
73 struct value_tuple_for_non_void_futures_helper<std::integer_sequence<size_t, Idx...>, FutureTuple> {
74 using type = std::tuple<typename std::tuple_element_t<Idx, FutureTuple>::value_type...>;
75 };
76
77 // Simple interface for the above
78 template <typename... Futures>
79 using value_tuple_for_non_void_futures = typename value_tuple_for_non_void_futures_helper<index_sequence_for_non_void_futures<Futures...>, std::tuple<Futures...>>::type;
80
81 }
82
83 /// Wait for serveral futures to complete in a coroutine.
84 ///
85 /// `all` can be used to launch several computations concurrently
86 /// and wait for all of them to complete. Computations are provided
87 /// as callable objects (typically lambda coroutines) that are invoked
88 /// by `all`. Waiting is performend by `co_await` and returns a tuple
89 /// of values, one for each non-void future.
90 ///
91 /// If one or more of the function objects throws an exception, or if one
92 /// or more of the futures resolves to an exception, then the exception is
93 /// thrown. All of the futures are waited for, even in the case of exceptions.
94 /// If more than one exception is present, an arbitrary one is thrown.
95 ///
96 /// Example
97 ///
98 /// ```
99 /// future<int> add() {
100 /// auto [a, b] = co_await all(
101 /// [] () -> future<int> {
102 /// co_await sleep(1ms);
103 /// co_return 2;
104 /// },
105 /// [] () -> future<int> {
106 /// co_await sleep(1ms);
107 /// co_return 3;
108 /// }
109 /// );
110 /// co_return a + b;
111 /// };
112 /// ```
113 template <typename... Futures>
114 class [[nodiscard("must co_await an all() object")]] all {
115 using tuple = std::tuple<Futures...>;
116 using value_tuple = typename internal::value_tuple_for_non_void_futures<Futures...>;
117 struct awaiter;
118 template <size_t idx>
119 struct intermediate_task final : continuation_base_from_future_t<std::tuple_element_t<idx, tuple>> {
120 awaiter& container;
121 explicit intermediate_task(awaiter& container) : container(container) {}
122 virtual void run_and_dispose() noexcept {
123 using value_type = typename std::tuple_element_t<idx, tuple>::value_type;
124 if (__builtin_expect(this->_state.failed(), false)) {
125 using futurator = futurize<std::tuple_element_t<idx, tuple>>;
126 std::get<idx>(container.state._futures) = futurator::make_exception_future(std::move(this->_state).get_exception());
127 } else {
128 if constexpr (std::same_as<std::tuple_element_t<idx, tuple>, future<>>) {
129 std::get<idx>(container.state._futures) = make_ready_future<>();
130 } else {
131 std::get<idx>(container.state._futures) = make_ready_future<value_type>(std::move(this->_state).get0());
132 }
133 }
134 this->~intermediate_task();
135 container.template process<idx+1>();
136 }
137 };
138 template <typename IndexSequence>
139 struct generate_aligned_union;
140 template <size_t... idx>
141 struct generate_aligned_union<std::integer_sequence<size_t, idx...>> {
142 using type = std::aligned_union_t<1, intermediate_task<idx>...>;
143 };
144 using continuation_storage_t = typename generate_aligned_union<std::make_index_sequence<std::tuple_size_v<tuple>>>::type;
145 using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<void>;
146 private:
147 tuple _futures;
148 private:
149 struct awaiter {
150 all& state;
151 continuation_storage_t _continuation_storage;
152 coroutine_handle_t when_ready;
153 awaiter(all& state) : state(state) {}
154 bool await_ready() const {
155 return std::apply([] (const Futures&... futures) {
156 return (... && futures.available());
157 }, state._futures);
158 }
159 void await_suspend(coroutine_handle_t h) {
160 when_ready = h;
161 process<0>();
162 }
163 value_tuple await_resume() {
164 std::apply([] (Futures&... futures) {
165 std::exception_ptr e;
166 // Call get_exception for every failed future, to avoid exceptional future
167 // ignored warnings.
168 (void)(..., (futures.failed() ? (e = futures.get_exception(), 0) : 0));
169 if (e) {
170 std::rethrow_exception(std::move(e));
171 }
172 }, state._futures);
173 // This immediately-invoked lambda is used to materialize the indexes
174 // of non-void futures in the tuple.
175 return [&] <size_t... Idx> (std::integer_sequence<size_t, Idx...>) {
176 return value_tuple(std::get<Idx>(state._futures).get0()...);
177 } (internal::index_sequence_for_non_void_futures<Futures...>());
178 }
179 template <unsigned idx>
180 void process() {
181 if constexpr (idx == sizeof...(Futures)) {
182 when_ready.resume();
183 } else {
184 if (!std::get<idx>(state._futures).available()) {
185 auto task = new (&_continuation_storage) intermediate_task<idx>(*this);
186 seastar::internal::set_callback(std::get<idx>(state._futures), task);
187 } else {
188 process<idx + 1>();
189 }
190 }
191 }
192 };
193 public:
194 template <typename... Func>
195 requires (... && std::invocable<Func>) && (... && future_type<std::invoke_result_t<Func>>)
196 explicit all(Func&&... funcs)
197 : _futures(futurize_invoke(funcs)...) {
198 }
199 awaiter operator co_await() { return awaiter{*this}; }
200 };
201
202 template <typename... Func>
203 explicit all(Func&&... funcs) -> all<std::invoke_result_t<Func>...>;
204
205 }