X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fseastar%2Finclude%2Fseastar%2Fcoroutine%2Fall.hh;fp=ceph%2Fsrc%2Fseastar%2Finclude%2Fseastar%2Fcoroutine%2Fall.hh;h=26b33345b331b688d9bcf352589653fc31d50f5b;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=0000000000000000000000000000000000000000;hpb=a71831dadd1e1f3e0fa70405511f65cc33db0498;p=ceph.git diff --git a/ceph/src/seastar/include/seastar/coroutine/all.hh b/ceph/src/seastar/include/seastar/coroutine/all.hh new file mode 100644 index 000000000..26b33345b --- /dev/null +++ b/ceph/src/seastar/include/seastar/coroutine/all.hh @@ -0,0 +1,205 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2021-present ScyllaDB + */ + +#pragma once + +#include +#include +#include +#include + +namespace seastar::coroutine { + +template +constexpr inline bool is_future_v = is_future::value; + +template +concept future_type = is_future_v; + +namespace internal { + +// Given a bunch of futures, find the indexes of the ones that are not avoid +// and store them in member type `type` as an std::integer_sequence. +// +// `IndexSequence` and `current` are intermediates used for recursion. +template +struct index_sequence_for_non_void_futures_helper; + +// Terminate recursion be returning the accumulated `IndexSequence` +template +struct index_sequence_for_non_void_futures_helper { + using type = IndexSequence; +}; + +// Process a future by adding it to the current IndexSequence and recursing +template +struct index_sequence_for_non_void_futures_helper, current, future, Futures...> { + using type = typename index_sequence_for_non_void_futures_helper, current + 1, Futures...>::type; +}; + +// Process a future by ignoring it and recursing +template +struct index_sequence_for_non_void_futures_helper, current, future<>, Futures...> { + using type = typename index_sequence_for_non_void_futures_helper, current + 1, Futures...>::type; +}; + +// Simple interface for the above. +template +using index_sequence_for_non_void_futures = typename index_sequence_for_non_void_futures_helper, 0, Futures...>::type; + +// Given a tuple of futures, return a tuple of the value types, excluding future. +template +struct value_tuple_for_non_void_futures_helper; + +template +struct value_tuple_for_non_void_futures_helper, FutureTuple> { + using type = std::tuple::value_type...>; +}; + +// Simple interface for the above +template +using value_tuple_for_non_void_futures = typename value_tuple_for_non_void_futures_helper, std::tuple>::type; + +} + +/// Wait for serveral futures to complete in a coroutine. +/// +/// `all` can be used to launch several computations concurrently +/// and wait for all of them to complete. Computations are provided +/// as callable objects (typically lambda coroutines) that are invoked +/// by `all`. Waiting is performend by `co_await` and returns a tuple +/// of values, one for each non-void future. +/// +/// If one or more of the function objects throws an exception, or if one +/// or more of the futures resolves to an exception, then the exception is +/// thrown. All of the futures are waited for, even in the case of exceptions. +/// If more than one exception is present, an arbitrary one is thrown. +/// +/// Example +/// +/// ``` +/// future add() { +/// auto [a, b] = co_await all( +/// [] () -> future { +/// co_await sleep(1ms); +/// co_return 2; +/// }, +/// [] () -> future { +/// co_await sleep(1ms); +/// co_return 3; +/// } +/// ); +/// co_return a + b; +/// }; +/// ``` +template +class [[nodiscard("must co_await an all() object")]] all { + using tuple = std::tuple; + using value_tuple = typename internal::value_tuple_for_non_void_futures; + struct awaiter; + template + struct intermediate_task final : continuation_base_from_future_t> { + awaiter& container; + explicit intermediate_task(awaiter& container) : container(container) {} + virtual void run_and_dispose() noexcept { + using value_type = typename std::tuple_element_t::value_type; + if (__builtin_expect(this->_state.failed(), false)) { + using futurator = futurize>; + std::get(container.state._futures) = futurator::make_exception_future(std::move(this->_state).get_exception()); + } else { + if constexpr (std::same_as, future<>>) { + std::get(container.state._futures) = make_ready_future<>(); + } else { + std::get(container.state._futures) = make_ready_future(std::move(this->_state).get0()); + } + } + this->~intermediate_task(); + container.template process(); + } + }; + template + struct generate_aligned_union; + template + struct generate_aligned_union> { + using type = std::aligned_union_t<1, intermediate_task...>; + }; + using continuation_storage_t = typename generate_aligned_union>>::type; + using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle; +private: + tuple _futures; +private: + struct awaiter { + all& state; + continuation_storage_t _continuation_storage; + coroutine_handle_t when_ready; + awaiter(all& state) : state(state) {} + bool await_ready() const { + return std::apply([] (const Futures&... futures) { + return (... && futures.available()); + }, state._futures); + } + void await_suspend(coroutine_handle_t h) { + when_ready = h; + process<0>(); + } + value_tuple await_resume() { + std::apply([] (Futures&... futures) { + std::exception_ptr e; + // Call get_exception for every failed future, to avoid exceptional future + // ignored warnings. + (void)(..., (futures.failed() ? (e = futures.get_exception(), 0) : 0)); + if (e) { + std::rethrow_exception(std::move(e)); + } + }, state._futures); + // This immediately-invoked lambda is used to materialize the indexes + // of non-void futures in the tuple. + return [&] (std::integer_sequence) { + return value_tuple(std::get(state._futures).get0()...); + } (internal::index_sequence_for_non_void_futures()); + } + template + void process() { + if constexpr (idx == sizeof...(Futures)) { + when_ready.resume(); + } else { + if (!std::get(state._futures).available()) { + auto task = new (&_continuation_storage) intermediate_task(*this); + seastar::internal::set_callback(std::get(state._futures), task); + } else { + process(); + } + } + } + }; +public: + template + requires (... && std::invocable) && (... && future_type>) + explicit all(Func&&... funcs) + : _futures(futurize_invoke(funcs)...) { + } + awaiter operator co_await() { return awaiter{*this}; } +}; + +template +explicit all(Func&&... funcs) -> all...>; + +} \ No newline at end of file