]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/when_any.hh
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / seastar / include / seastar / core / when_any.hh
CommitLineData
20effc67
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * author: Niek J Bouman
1e59de90 21 * reviewers: Avi Kivity, Benny Halevy
20effc67
TL
22 * November 2021
23 */
24
25#pragma once
26
27#include <iterator>
28#include <cstddef>
29#include <type_traits>
30#include <vector>
1e59de90
TL
31#include <tuple>
32#include <utility>
20effc67
TL
33#include <seastar/core/future.hh>
34#include <seastar/core/shared_ptr.hh>
35
36namespace seastar {
37
38template <class Sequence>
39struct when_any_result {
40 std::size_t index;
41 Sequence futures;
42};
43
44namespace internal {
45class waiter {
46 bool _done = false;
47 promise<std::size_t> _promise;
48
49public:
50 void done(std::size_t index) {
51 if (!_done) {
52 _done = true;
53 _promise.set_value(index);
54 }
55 }
56 auto get_future() { return _promise.get_future(); }
57};
58
59} // namespace internal
60
61/// Wait for the first of multiple futures to complete (iterator version).
62///
63/// Given a range of futures as input, wait for the first of them
64/// to resolve (either successfully or with an exception), and return
65/// all of them in a \c when_any_result (following the concurrency TS from
66/// the standard library), containing a std::vector to all futures
67/// and the index (into the vector) of the future that resolved.
68///
69/// \param begin an \c InputIterator designating the beginning of the range of futures
70/// \param end an \c InputIterator designating the end of the range of futures
71/// \return a \c when_any_result of all the futures in the input; when
72/// ready, at least one of the contained futures (the one indicated by index) will be ready.
73template <class FutureIterator>
74SEASTAR_CONCEPT( requires requires (FutureIterator i) { { *i++ }; requires is_future<std::remove_reference_t<decltype(*i)>>::value; } )
75auto when_any(FutureIterator begin, FutureIterator end) noexcept
76 -> future<when_any_result<std::vector<std::decay_t<typename std::iterator_traits<FutureIterator>::value_type>>>>
77{
78 using ReturnType = when_any_result<std::vector<typename std::iterator_traits<FutureIterator>::value_type>>;
79 if (begin == end) {
80 return make_ready_future<ReturnType>();
81 }
82 ReturnType result;
83 result.futures.reserve(std::distance(begin, end));
84 auto waiter_obj = make_lw_shared<internal::waiter>();
85 std::size_t index{0};
86 for (auto it = begin; it != end; ++it) {
87 if (it->available()) {
88 result.futures.push_back(std::move(*it));
89 waiter_obj->done(index);
90 } else {
91 result.futures.push_back(it->finally([waiter_obj, index] {
92 waiter_obj->done(index);
93 }));
94 }
95 index++;
96 }
97 return waiter_obj->get_future().then(
98 [result = std::move(result)](std::size_t index) mutable {
99 result.index = index;
100 return std::move(result);
101 }
102 );
103}
1e59de90
TL
104
105namespace internal {
106
107template <class... Futures, std::size_t... I>
108future<when_any_result<std::tuple<Futures...>>>
109when_any_impl(std::index_sequence<I...>, Futures&&... futs) noexcept
110{
111 auto waiter_obj = make_lw_shared<waiter>();
112 auto attach_notifier = [&](auto&& fut, size_t index) {
113 if (fut.available()) {
114 waiter_obj->done(index);
115 return std::move(fut);
116 }
117 else {
118 return fut.finally([waiter_obj, index] { waiter_obj->done(index); });
119 }
120 };
121
122 auto result =
123 when_any_result<std::tuple<Futures...>>{0, std::make_tuple(attach_notifier(std::forward<Futures>(futs), I)...)};
124 return waiter_obj->get_future().then([result = std::move(result)](std::size_t index) mutable {
125 result.index = index;
126 return std::move(result);
127 });
128}
129
130} // namespace internal
131
132/// Wait for the first of multiple futures to complete (variadic version).
133///
134/// Each future can be passed directly, or a function that returns a
135/// future can be given instead.
136///
137/// Returns a \c when_any_result (following the concurrency TS from
138/// the standard library), containing a std::tuple to all futures
139/// and the index (into the vector) of the future that resolved.
140///
141/// \param fut_or_funcs futures or functions that return futures
142/// \return a \c when_any_result containing a tuple of all futures
143/// and and index; when ready, at least one of the contained futures
144/// (the one indicated by index) will be ready.
145template <class... FutOrFuncs>
146auto when_any(FutOrFuncs&&... fut_or_funcs) noexcept
147{
148 return internal::when_any_impl(std::make_index_sequence<sizeof...(FutOrFuncs)>{},
149 futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
150}
151
20effc67 152} // namespace seastar