]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/boost/asio/experimental/impl/parallel_group.hpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / boost / asio / experimental / impl / parallel_group.hpp
diff --git a/ceph/src/boost/boost/asio/experimental/impl/parallel_group.hpp b/ceph/src/boost/boost/asio/experimental/impl/parallel_group.hpp
new file mode 100644 (file)
index 0000000..668a00e
--- /dev/null
@@ -0,0 +1,435 @@
+//
+// experimental/impl/parallel_group.hpp
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+//
+// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
+#define BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+# pragma once
+#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
+
+#include <boost/asio/detail/config.hpp>
+#include <atomic>
+#include <memory>
+#include <new>
+#include <tuple>
+#include <boost/asio/associated_cancellation_slot.hpp>
+#include <boost/asio/detail/recycling_allocator.hpp>
+#include <boost/asio/detail/type_traits.hpp>
+#include <boost/asio/dispatch.hpp>
+
+#include <boost/asio/detail/push_options.hpp>
+
+namespace boost {
+namespace asio {
+namespace experimental {
+namespace detail {
+
+// Stores the result from an individual asynchronous operation.
+template <typename T, typename = void>
+struct parallel_group_op_result
+{
+public:
+  parallel_group_op_result()
+    : has_value_(false)
+  {
+  }
+
+  parallel_group_op_result(parallel_group_op_result&& other)
+    : has_value_(other.has_value_)
+  {
+    if (has_value_)
+      new (&u_.value_) T(std::move(other.get()));
+  }
+
+  ~parallel_group_op_result()
+  {
+    if (has_value_)
+      u_.value_.~T();
+  }
+
+  T& get() noexcept
+  {
+    return u_.value_;
+  }
+
+  template <typename... Args>
+  void emplace(Args&&... args)
+  {
+    new (&u_.value_) T(std::forward<Args>(args)...);
+    has_value_ = true;
+  }
+
+private:
+  union u
+  {
+    u() {}
+    ~u() {}
+    char c_;
+    T value_;
+  } u_;
+  bool has_value_;
+};
+
+// Proxy completion handler for the group of parallel operatations. Unpacks and
+// concatenates the individual operations' results, and invokes the user's
+// completion handler.
+template <typename Handler, typename... Ops>
+struct parallel_group_completion_handler
+{
+  typedef typename decay<
+      typename prefer_result<
+        typename associated_executor<Handler>::type,
+        execution::outstanding_work_t::tracked_t
+      >::type
+    >::type executor_type;
+
+  parallel_group_completion_handler(Handler&& h)
+    : handler_(std::move(h)),
+      executor_(
+          boost::asio::prefer(
+            boost::asio::get_associated_executor(handler_),
+            execution::outstanding_work.tracked))
+  {
+  }
+
+  executor_type get_executor() const noexcept
+  {
+    return executor_;
+  }
+
+  void operator()()
+  {
+    this->invoke(std::make_index_sequence<sizeof...(Ops)>());
+  }
+
+  template <std::size_t... I>
+  void invoke(std::index_sequence<I...>)
+  {
+    this->invoke(std::tuple_cat(std::move(std::get<I>(args_).get())...));
+  }
+
+  template <typename... Args>
+  void invoke(std::tuple<Args...>&& args)
+  {
+    this->invoke(std::move(args), std::make_index_sequence<sizeof...(Args)>());
+  }
+
+  template <typename... Args, std::size_t... I>
+  void invoke(std::tuple<Args...>&& args, std::index_sequence<I...>)
+  {
+    std::move(handler_)(completion_order_, std::move(std::get<I>(args))...);
+  }
+
+  Handler handler_;
+  executor_type executor_;
+  std::array<std::size_t, sizeof...(Ops)> completion_order_{};
+  std::tuple<
+      parallel_group_op_result<
+        typename parallel_op_signature_as_tuple<
+          typename parallel_op_signature<Ops>::type
+        >::type
+      >...
+    > args_{};
+};
+
+// Shared state for the parallel group.
+template <typename Condition, typename Handler, typename... Ops>
+struct parallel_group_state
+{
+  parallel_group_state(Condition&& c, Handler&& h)
+    : cancellation_condition_(std::move(c)),
+      handler_(std::move(h))
+  {
+  }
+
+  // The number of operations that have completed so far. Used to determine the
+  // order of completion.
+  std::atomic<unsigned int> completed_{0};
+
+  // The non-none cancellation type that resulted from a cancellation condition.
+  // Stored here for use by the group's initiating function.
+  std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
+
+  // The number of cancellations that have been requested, either on completion
+  // of the operations within the group, or via the cancellation slot for the
+  // group operation. Initially set to the number of operations to prevent
+  // cancellation signals from being emitted until after all of the group's
+  // operations' initiating functions have completed.
+  std::atomic<unsigned int> cancellations_requested_{sizeof...(Ops)};
+
+  // The number of operations that are yet to complete. Used to determine when
+  // it is safe to invoke the user's completion handler.
+  std::atomic<unsigned int> outstanding_{sizeof...(Ops)};
+
+  // The cancellation signals for each operation in the group.
+  boost::asio::cancellation_signal cancellation_signals_[sizeof...(Ops)];
+
+  // The cancellation condition is used to determine whether the results from an
+  // individual operation warrant a cancellation request for the whole group.
+  Condition cancellation_condition_;
+
+  // The proxy handler to be invoked once all operations in the group complete.
+  parallel_group_completion_handler<Handler, Ops...> handler_;
+};
+
+// Handler for an individual operation within the parallel group.
+template <std::size_t I, typename Condition, typename Handler, typename... Ops>
+struct parallel_group_op_handler
+{
+  typedef boost::asio::cancellation_slot cancellation_slot_type;
+
+  parallel_group_op_handler(
+    std::shared_ptr<parallel_group_state<Condition, Handler, Ops...> > state)
+    : state_(std::move(state))
+  {
+  }
+
+  cancellation_slot_type get_cancellation_slot() const noexcept
+  {
+    return state_->cancellation_signals_[I].slot();
+  }
+
+  template <typename... Args>
+  void operator()(Args... args)
+  {
+    // Capture this operation into the completion order.
+    state_->handler_.completion_order_[state_->completed_++] = I;
+
+    // Determine whether the results of this operation require cancellation of
+    // the whole group.
+    cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
+
+    // Capture the result of the operation into the proxy completion handler.
+    std::get<I>(state_->handler_.args_).emplace(std::move(args)...);
+
+    if (cancel_type != cancellation_type::none)
+    {
+      // Save the type for potential use by the group's initiating function.
+      state_->cancel_type_ = cancel_type;
+
+      // If we are the first operation to request cancellation, emit a signal
+      // for each operation in the group.
+      if (state_->cancellations_requested_++ == 0)
+        for (std::size_t i = 0; i < sizeof...(Ops); ++i)
+          if (i != I)
+            state_->cancellation_signals_[i].emit(cancel_type);
+    }
+
+    // If this is the last outstanding operation, invoke the user's handler.
+    if (--state_->outstanding_ == 0)
+      boost::asio::dispatch(std::move(state_->handler_));
+  }
+
+  std::shared_ptr<parallel_group_state<Condition, Handler, Ops...> > state_;
+};
+
+// Handler for an individual operation within the parallel group that has an
+// explicitly specified executor.
+template <typename Executor, std::size_t I,
+    typename Condition, typename Handler, typename... Ops>
+struct parallel_group_op_handler_with_executor :
+  parallel_group_op_handler<I, Condition, Handler, Ops...>
+{
+  typedef parallel_group_op_handler<I, Condition, Handler, Ops...> base_type;
+  typedef boost::asio::cancellation_slot cancellation_slot_type;
+  typedef Executor executor_type;
+
+  parallel_group_op_handler_with_executor(
+      std::shared_ptr<parallel_group_state<Condition, Handler, Ops...> > state,
+      executor_type ex)
+    : parallel_group_op_handler<I, Condition, Handler, Ops...>(std::move(state))
+  {
+    cancel_proxy_ =
+      &this->state_->cancellation_signals_[I].slot().template
+        emplace<cancel_proxy>(this->state_, std::move(ex));
+  }
+
+  cancellation_slot_type get_cancellation_slot() const noexcept
+  {
+    return cancel_proxy_->signal_.slot();
+  }
+
+  executor_type get_executor() const noexcept
+  {
+    return cancel_proxy_->executor_;
+  }
+
+  // Proxy handler that forwards the emitted signal to the correct executor.
+  struct cancel_proxy
+  {
+    cancel_proxy(
+        std::shared_ptr<parallel_group_state<
+          Condition, Handler, Ops...> > state,
+        executor_type ex)
+      : state_(std::move(state)),
+        executor_(std::move(ex))
+    {
+    }
+
+    void operator()(cancellation_type_t type)
+    {
+      if (auto state = state_.lock())
+      {
+        boost::asio::cancellation_signal* sig = &signal_;
+        boost::asio::dispatch(executor_,
+            [state, sig, type]{ sig->emit(type); });
+      }
+    }
+
+    std::weak_ptr<parallel_group_state<Condition, Handler, Ops...> > state_;
+    boost::asio::cancellation_signal signal_;
+    executor_type executor_;
+  };
+
+  cancel_proxy* cancel_proxy_;
+};
+
+// Helper to launch an operation using the correct executor, if any.
+template <std::size_t I, typename Op, typename = void>
+struct parallel_group_op_launcher
+{
+  template <typename Condition, typename Handler, typename... Ops>
+  static void launch(Op& op,
+    const std::shared_ptr<parallel_group_state<
+      Condition, Handler, Ops...> >& state)
+  {
+    typedef typename associated_executor<Op>::type ex_type;
+    ex_type ex = boost::asio::get_associated_executor(op);
+    std::move(op)(
+        parallel_group_op_handler_with_executor<ex_type, I,
+          Condition, Handler, Ops...>(state, std::move(ex)));
+  }
+};
+
+// Specialised launcher for operations that specify no executor.
+template <std::size_t I, typename Op>
+struct parallel_group_op_launcher<I, Op,
+    typename enable_if<
+      is_same<
+        typename associated_executor<
+          Op>::asio_associated_executor_is_unspecialised,
+        void
+      >::value
+    >::type>
+{
+  template <typename Condition, typename Handler, typename... Ops>
+  static void launch(Op& op,
+    const std::shared_ptr<parallel_group_state<
+      Condition, Handler, Ops...> >& state)
+  {
+    std::move(op)(
+        parallel_group_op_handler<I, Condition, Handler, Ops...>(state));
+  }
+};
+
+template <typename Condition, typename Handler, typename... Ops>
+struct parallel_group_cancellation_handler
+{
+  parallel_group_cancellation_handler(
+    std::shared_ptr<parallel_group_state<Condition, Handler, Ops...> > state)
+    : state_(std::move(state))
+  {
+  }
+
+  void operator()(cancellation_type_t cancel_type)
+  {
+    // If we are the first place to request cancellation, i.e. no operation has
+    // yet completed and requested cancellation, emit a signal for each
+    // operation in the group.
+    if (cancel_type != cancellation_type::none)
+      if (auto state = state_.lock())
+        if (state->cancellations_requested_++ == 0)
+          for (std::size_t i = 0; i < sizeof...(Ops); ++i)
+            state->cancellation_signals_[i].emit(cancel_type);
+  }
+
+  std::weak_ptr<parallel_group_state<Condition, Handler, Ops...> > state_;
+};
+
+template <typename Condition, typename Handler,
+    typename... Ops, std::size_t... I>
+void parallel_group_launch(Condition cancellation_condition, Handler handler,
+    std::tuple<Ops...>& ops, std::index_sequence<I...>)
+{
+  // Get the user's completion handler's cancellation slot, so that we can allow
+  // cancellation of the entire group.
+  typename associated_cancellation_slot<Handler>::type slot
+    = boost::asio::get_associated_cancellation_slot(handler);
+
+  // Create the shared state for the operation.
+  typedef parallel_group_state<Condition, Handler, Ops...> state_type;
+  std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
+      boost::asio::detail::recycling_allocator<state_type,
+        boost::asio::detail::thread_info_base::parallel_group_tag>(),
+      std::move(cancellation_condition), std::move(handler));
+
+  // Initiate each individual operation in the group.
+  int fold[] = { 0,
+    ( parallel_group_op_launcher<I, Ops>::launch(std::get<I>(ops), state),
+      0 )...
+  };
+  (void)fold;
+
+  // Check if any of the operations has already requested cancellation, and if
+  // so, emit a signal for each operation in the group.
+  if ((state->cancellations_requested_ -= sizeof...(Ops)) > 0)
+    for (auto& signal : state->cancellation_signals_)
+      signal.emit(state->cancel_type_);
+
+  // Register a handler with the user's completion handler's cancellation slot.
+  if (slot.is_connected())
+    slot.template emplace<
+      parallel_group_cancellation_handler<
+        Condition, Handler, Ops...> >(state);
+}
+
+} // namespace detail
+} // namespace experimental
+
+template <typename R, typename... Args>
+class async_result<
+    experimental::detail::parallel_op_signature_probe,
+    R(Args...)>
+{
+public:
+  typedef experimental::detail::parallel_op_signature_probe_result<
+    void(Args...)> return_type;
+
+  template <typename Initiation, typename... InitArgs>
+  static return_type initiate(Initiation&&,
+      experimental::detail::parallel_op_signature_probe, InitArgs&&...)
+  {
+    return return_type{};
+  }
+};
+
+template <template <typename, typename> class Associator,
+    typename Handler, typename... Ops, typename DefaultCandidate>
+struct associator<Associator,
+    experimental::detail::parallel_group_completion_handler<Handler, Ops...>,
+    DefaultCandidate>
+  : Associator<Handler, DefaultCandidate>
+{
+  static typename Associator<Handler, DefaultCandidate>::type get(
+      const experimental::detail::parallel_group_completion_handler<
+        Handler, Ops...>& h,
+      const DefaultCandidate& c = DefaultCandidate()) BOOST_ASIO_NOEXCEPT
+  {
+    return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
+  }
+};
+
+} // namespace asio
+} // namespace boost
+
+#include <boost/asio/detail/pop_options.hpp>
+
+#endif // BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP