X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fboost%2Fboost%2Fasio%2Fexperimental%2Fimpl%2Fparallel_group.hpp;fp=ceph%2Fsrc%2Fboost%2Fboost%2Fasio%2Fexperimental%2Fimpl%2Fparallel_group.hpp;h=668a00e4ca7f6b35ade9b535209dfa3408e5935c;hb=1e59de90020f1d8d374046ef9cca56ccd4e806e2;hp=0000000000000000000000000000000000000000;hpb=bd41e436e25044e8e83156060a37c23cb661c364;p=ceph.git diff --git a/ceph/src/boost/boost/asio/experimental/impl/parallel_group.hpp b/ceph/src/boost/boost/asio/experimental/impl/parallel_group.hpp new file mode 100644 index 000000000..668a00e4c --- /dev/null +++ b/ceph/src/boost/boost/asio/experimental/impl/parallel_group.hpp @@ -0,0 +1,435 @@ +// +// experimental/impl/parallel_group.hpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP +#define BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +# pragma once +#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace boost { +namespace asio { +namespace experimental { +namespace detail { + +// Stores the result from an individual asynchronous operation. +template +struct parallel_group_op_result +{ +public: + parallel_group_op_result() + : has_value_(false) + { + } + + parallel_group_op_result(parallel_group_op_result&& other) + : has_value_(other.has_value_) + { + if (has_value_) + new (&u_.value_) T(std::move(other.get())); + } + + ~parallel_group_op_result() + { + if (has_value_) + u_.value_.~T(); + } + + T& get() noexcept + { + return u_.value_; + } + + template + void emplace(Args&&... args) + { + new (&u_.value_) T(std::forward(args)...); + has_value_ = true; + } + +private: + union u + { + u() {} + ~u() {} + char c_; + T value_; + } u_; + bool has_value_; +}; + +// Proxy completion handler for the group of parallel operatations. Unpacks and +// concatenates the individual operations' results, and invokes the user's +// completion handler. +template +struct parallel_group_completion_handler +{ + typedef typename decay< + typename prefer_result< + typename associated_executor::type, + execution::outstanding_work_t::tracked_t + >::type + >::type executor_type; + + parallel_group_completion_handler(Handler&& h) + : handler_(std::move(h)), + executor_( + boost::asio::prefer( + boost::asio::get_associated_executor(handler_), + execution::outstanding_work.tracked)) + { + } + + executor_type get_executor() const noexcept + { + return executor_; + } + + void operator()() + { + this->invoke(std::make_index_sequence()); + } + + template + void invoke(std::index_sequence) + { + this->invoke(std::tuple_cat(std::move(std::get(args_).get())...)); + } + + template + void invoke(std::tuple&& args) + { + this->invoke(std::move(args), std::make_index_sequence()); + } + + template + void invoke(std::tuple&& args, std::index_sequence) + { + std::move(handler_)(completion_order_, std::move(std::get(args))...); + } + + Handler handler_; + executor_type executor_; + std::array completion_order_{}; + std::tuple< + parallel_group_op_result< + typename parallel_op_signature_as_tuple< + typename parallel_op_signature::type + >::type + >... + > args_{}; +}; + +// Shared state for the parallel group. +template +struct parallel_group_state +{ + parallel_group_state(Condition&& c, Handler&& h) + : cancellation_condition_(std::move(c)), + handler_(std::move(h)) + { + } + + // The number of operations that have completed so far. Used to determine the + // order of completion. + std::atomic completed_{0}; + + // The non-none cancellation type that resulted from a cancellation condition. + // Stored here for use by the group's initiating function. + std::atomic cancel_type_{cancellation_type::none}; + + // The number of cancellations that have been requested, either on completion + // of the operations within the group, or via the cancellation slot for the + // group operation. Initially set to the number of operations to prevent + // cancellation signals from being emitted until after all of the group's + // operations' initiating functions have completed. + std::atomic cancellations_requested_{sizeof...(Ops)}; + + // The number of operations that are yet to complete. Used to determine when + // it is safe to invoke the user's completion handler. + std::atomic outstanding_{sizeof...(Ops)}; + + // The cancellation signals for each operation in the group. + boost::asio::cancellation_signal cancellation_signals_[sizeof...(Ops)]; + + // The cancellation condition is used to determine whether the results from an + // individual operation warrant a cancellation request for the whole group. + Condition cancellation_condition_; + + // The proxy handler to be invoked once all operations in the group complete. + parallel_group_completion_handler handler_; +}; + +// Handler for an individual operation within the parallel group. +template +struct parallel_group_op_handler +{ + typedef boost::asio::cancellation_slot cancellation_slot_type; + + parallel_group_op_handler( + std::shared_ptr > state) + : state_(std::move(state)) + { + } + + cancellation_slot_type get_cancellation_slot() const noexcept + { + return state_->cancellation_signals_[I].slot(); + } + + template + void operator()(Args... args) + { + // Capture this operation into the completion order. + state_->handler_.completion_order_[state_->completed_++] = I; + + // Determine whether the results of this operation require cancellation of + // the whole group. + cancellation_type_t cancel_type = state_->cancellation_condition_(args...); + + // Capture the result of the operation into the proxy completion handler. + std::get(state_->handler_.args_).emplace(std::move(args)...); + + if (cancel_type != cancellation_type::none) + { + // Save the type for potential use by the group's initiating function. + state_->cancel_type_ = cancel_type; + + // If we are the first operation to request cancellation, emit a signal + // for each operation in the group. + if (state_->cancellations_requested_++ == 0) + for (std::size_t i = 0; i < sizeof...(Ops); ++i) + if (i != I) + state_->cancellation_signals_[i].emit(cancel_type); + } + + // If this is the last outstanding operation, invoke the user's handler. + if (--state_->outstanding_ == 0) + boost::asio::dispatch(std::move(state_->handler_)); + } + + std::shared_ptr > state_; +}; + +// Handler for an individual operation within the parallel group that has an +// explicitly specified executor. +template +struct parallel_group_op_handler_with_executor : + parallel_group_op_handler +{ + typedef parallel_group_op_handler base_type; + typedef boost::asio::cancellation_slot cancellation_slot_type; + typedef Executor executor_type; + + parallel_group_op_handler_with_executor( + std::shared_ptr > state, + executor_type ex) + : parallel_group_op_handler(std::move(state)) + { + cancel_proxy_ = + &this->state_->cancellation_signals_[I].slot().template + emplace(this->state_, std::move(ex)); + } + + cancellation_slot_type get_cancellation_slot() const noexcept + { + return cancel_proxy_->signal_.slot(); + } + + executor_type get_executor() const noexcept + { + return cancel_proxy_->executor_; + } + + // Proxy handler that forwards the emitted signal to the correct executor. + struct cancel_proxy + { + cancel_proxy( + std::shared_ptr > state, + executor_type ex) + : state_(std::move(state)), + executor_(std::move(ex)) + { + } + + void operator()(cancellation_type_t type) + { + if (auto state = state_.lock()) + { + boost::asio::cancellation_signal* sig = &signal_; + boost::asio::dispatch(executor_, + [state, sig, type]{ sig->emit(type); }); + } + } + + std::weak_ptr > state_; + boost::asio::cancellation_signal signal_; + executor_type executor_; + }; + + cancel_proxy* cancel_proxy_; +}; + +// Helper to launch an operation using the correct executor, if any. +template +struct parallel_group_op_launcher +{ + template + static void launch(Op& op, + const std::shared_ptr >& state) + { + typedef typename associated_executor::type ex_type; + ex_type ex = boost::asio::get_associated_executor(op); + std::move(op)( + parallel_group_op_handler_with_executor(state, std::move(ex))); + } +}; + +// Specialised launcher for operations that specify no executor. +template +struct parallel_group_op_launcher::asio_associated_executor_is_unspecialised, + void + >::value + >::type> +{ + template + static void launch(Op& op, + const std::shared_ptr >& state) + { + std::move(op)( + parallel_group_op_handler(state)); + } +}; + +template +struct parallel_group_cancellation_handler +{ + parallel_group_cancellation_handler( + std::shared_ptr > state) + : state_(std::move(state)) + { + } + + void operator()(cancellation_type_t cancel_type) + { + // If we are the first place to request cancellation, i.e. no operation has + // yet completed and requested cancellation, emit a signal for each + // operation in the group. + if (cancel_type != cancellation_type::none) + if (auto state = state_.lock()) + if (state->cancellations_requested_++ == 0) + for (std::size_t i = 0; i < sizeof...(Ops); ++i) + state->cancellation_signals_[i].emit(cancel_type); + } + + std::weak_ptr > state_; +}; + +template +void parallel_group_launch(Condition cancellation_condition, Handler handler, + std::tuple& ops, std::index_sequence) +{ + // Get the user's completion handler's cancellation slot, so that we can allow + // cancellation of the entire group. + typename associated_cancellation_slot::type slot + = boost::asio::get_associated_cancellation_slot(handler); + + // Create the shared state for the operation. + typedef parallel_group_state state_type; + std::shared_ptr state = std::allocate_shared( + boost::asio::detail::recycling_allocator(), + std::move(cancellation_condition), std::move(handler)); + + // Initiate each individual operation in the group. + int fold[] = { 0, + ( parallel_group_op_launcher::launch(std::get(ops), state), + 0 )... + }; + (void)fold; + + // Check if any of the operations has already requested cancellation, and if + // so, emit a signal for each operation in the group. + if ((state->cancellations_requested_ -= sizeof...(Ops)) > 0) + for (auto& signal : state->cancellation_signals_) + signal.emit(state->cancel_type_); + + // Register a handler with the user's completion handler's cancellation slot. + if (slot.is_connected()) + slot.template emplace< + parallel_group_cancellation_handler< + Condition, Handler, Ops...> >(state); +} + +} // namespace detail +} // namespace experimental + +template +class async_result< + experimental::detail::parallel_op_signature_probe, + R(Args...)> +{ +public: + typedef experimental::detail::parallel_op_signature_probe_result< + void(Args...)> return_type; + + template + static return_type initiate(Initiation&&, + experimental::detail::parallel_op_signature_probe, InitArgs&&...) + { + return return_type{}; + } +}; + +template