// impl/awaitable.hpp
// ~~~~~~~~~~~~~~~~~~
//
-// Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
+// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <exception>
#include <new>
#include <tuple>
-#include <utility>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/cancellation_state.hpp>
#include <boost/asio/detail/thread_context.hpp>
#include <boost/asio/detail/thread_info_base.hpp>
+#include <boost/asio/detail/throw_error.hpp>
#include <boost/asio/detail/type_traits.hpp>
+#include <boost/asio/error.hpp>
#include <boost/asio/post.hpp>
#include <boost/system/system_error.hpp>
#include <boost/asio/this_coro.hpp>
namespace asio {
namespace detail {
+struct awaitable_thread_has_context_switched {};
+
// An awaitable_thread represents a thread-of-execution that is composed of one
// or more "stack frames", with each frame represented by an awaitable_frame.
// All execution occurs in the context of the awaitable_thread's executor. An
{
return boost::asio::detail::thread_info_base::allocate(
boost::asio::detail::thread_info_base::awaitable_frame_tag(),
- boost::asio::detail::thread_context::thread_call_stack::top(),
+ boost::asio::detail::thread_context::top_of_thread_call_stack(),
size);
}
{
boost::asio::detail::thread_info_base::deallocate(
boost::asio::detail::thread_info_base::awaitable_frame_tag(),
- boost::asio::detail::thread_context::thread_call_stack::top(),
+ boost::asio::detail::thread_context::top_of_thread_call_stack(),
pointer, size);
}
#endif // !defined(BOOST_ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
}
}
+ void clear_cancellation_slot()
+ {
+ this->attached_thread_->entry_point()->cancellation_state_.slot().clear();
+ }
+
template <typename T>
auto await_transform(awaitable<T, Executor> a) const
{
+ if (attached_thread_->entry_point()->throw_if_cancelled_)
+ if (!!attached_thread_->get_cancellation_state().cancelled())
+ do_throw_error(boost::asio::error::operation_aborted, "co_await");
return a;
}
return result{this};
}
+ // This await transformation obtains the associated cancellation state of the
+ // thread of execution.
+ auto await_transform(this_coro::cancellation_state_t) noexcept
+ {
+ struct result
+ {
+ awaitable_frame_base* this_;
+
+ bool await_ready() const noexcept
+ {
+ return true;
+ }
+
+ void await_suspend(coroutine_handle<void>) noexcept
+ {
+ }
+
+ auto await_resume() const noexcept
+ {
+ return this_->attached_thread_->get_cancellation_state();
+ }
+ };
+
+ return result{this};
+ }
+
+ // This await transformation resets the associated cancellation state.
+ auto await_transform(this_coro::reset_cancellation_state_0_t) noexcept
+ {
+ struct result
+ {
+ awaitable_frame_base* this_;
+
+ bool await_ready() const noexcept
+ {
+ return true;
+ }
+
+ void await_suspend(coroutine_handle<void>) noexcept
+ {
+ }
+
+ auto await_resume() const
+ {
+ return this_->attached_thread_->reset_cancellation_state();
+ }
+ };
+
+ return result{this};
+ }
+
+ // This await transformation resets the associated cancellation state.
+ template <typename Filter>
+ auto await_transform(
+ this_coro::reset_cancellation_state_1_t<Filter> reset) noexcept
+ {
+ struct result
+ {
+ awaitable_frame_base* this_;
+ Filter filter_;
+
+ bool await_ready() const noexcept
+ {
+ return true;
+ }
+
+ void await_suspend(coroutine_handle<void>) noexcept
+ {
+ }
+
+ auto await_resume()
+ {
+ return this_->attached_thread_->reset_cancellation_state(
+ BOOST_ASIO_MOVE_CAST(Filter)(filter_));
+ }
+ };
+
+ return result{this, BOOST_ASIO_MOVE_CAST(Filter)(reset.filter)};
+ }
+
+ // This await transformation resets the associated cancellation state.
+ template <typename InFilter, typename OutFilter>
+ auto await_transform(
+ this_coro::reset_cancellation_state_2_t<InFilter, OutFilter> reset)
+ noexcept
+ {
+ struct result
+ {
+ awaitable_frame_base* this_;
+ InFilter in_filter_;
+ OutFilter out_filter_;
+
+ bool await_ready() const noexcept
+ {
+ return true;
+ }
+
+ void await_suspend(coroutine_handle<void>) noexcept
+ {
+ }
+
+ auto await_resume()
+ {
+ return this_->attached_thread_->reset_cancellation_state(
+ BOOST_ASIO_MOVE_CAST(InFilter)(in_filter_),
+ BOOST_ASIO_MOVE_CAST(OutFilter)(out_filter_));
+ }
+ };
+
+ return result{this,
+ BOOST_ASIO_MOVE_CAST(InFilter)(reset.in_filter),
+ BOOST_ASIO_MOVE_CAST(OutFilter)(reset.out_filter)};
+ }
+
+ // This await transformation determines whether cancellation is propagated as
+ // an exception.
+ auto await_transform(this_coro::throw_if_cancelled_0_t)
+ noexcept
+ {
+ struct result
+ {
+ awaitable_frame_base* this_;
+
+ bool await_ready() const noexcept
+ {
+ return true;
+ }
+
+ void await_suspend(coroutine_handle<void>) noexcept
+ {
+ }
+
+ auto await_resume()
+ {
+ return this_->attached_thread_->throw_if_cancelled();
+ }
+ };
+
+ return result{this};
+ }
+
+ // This await transformation sets whether cancellation is propagated as an
+ // exception.
+ auto await_transform(this_coro::throw_if_cancelled_1_t throw_if_cancelled)
+ noexcept
+ {
+ struct result
+ {
+ awaitable_frame_base* this_;
+ bool value_;
+
+ bool await_ready() const noexcept
+ {
+ return true;
+ }
+
+ void await_suspend(coroutine_handle<void>) noexcept
+ {
+ }
+
+ auto await_resume()
+ {
+ this_->attached_thread_->throw_if_cancelled(value_);
+ }
+ };
+
+ return result{this, throw_if_cancelled.value};
+ }
+
// This await transformation is used to run an async operation's initiation
// function object after the coroutine has been suspended. This ensures that
// immediate resumption of the coroutine in another thread does not cause a
typename result_of<Function(awaitable_frame_base*)>::type,
awaitable_thread<Executor>*
>::value
- >::type* = 0)
+ >::type* = nullptr)
{
struct result
{
return result{std::move(f), this};
}
+ // Access the awaitable thread's has_context_switched_ flag.
+ auto await_transform(detail::awaitable_thread_has_context_switched) noexcept
+ {
+ struct result
+ {
+ awaitable_frame_base* this_;
+
+ bool await_ready() const noexcept
+ {
+ return true;
+ }
+
+ void await_suspend(coroutine_handle<void>) noexcept
+ {
+ }
+
+ bool& await_resume() const noexcept
+ {
+ return this_->attached_thread_->entry_point()->has_context_switched_;
+ }
+ };
+
+ return result{this};
+ }
+
void attach_thread(awaitable_thread<Executor>* handler) noexcept
{
attached_thread_ = handler;
awaitable_thread<Executor>* detach_thread() noexcept
{
+ attached_thread_->entry_point()->has_context_switched_ = true;
return std::exchange(attached_thread_, nullptr);
}
{
caller_ = caller;
attached_thread_ = caller_->attached_thread_;
- attached_thread_->top_of_stack_ = this;
+ attached_thread_->entry_point()->top_of_stack_ = this;
caller_->attached_thread_ = nullptr;
}
{
if (caller_)
caller_->attached_thread_ = attached_thread_;
- attached_thread_->top_of_stack_ = caller_;
+ attached_thread_->entry_point()->top_of_stack_ = caller_;
attached_thread_ = nullptr;
caller_ = nullptr;
}
}
};
+struct awaitable_thread_entry_point {};
+
+template <typename Executor>
+class awaitable_frame<awaitable_thread_entry_point, Executor>
+ : public awaitable_frame_base<Executor>
+{
+public:
+ awaitable_frame()
+ : top_of_stack_(0),
+ has_executor_(false),
+ has_context_switched_(false),
+ throw_if_cancelled_(true)
+ {
+ }
+
+ ~awaitable_frame()
+ {
+ if (has_executor_)
+ u_.executor_.~Executor();
+ }
+
+ awaitable<awaitable_thread_entry_point, Executor> get_return_object()
+ {
+ this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
+ return awaitable<awaitable_thread_entry_point, Executor>(this);
+ };
+
+ void return_void()
+ {
+ }
+
+ void get()
+ {
+ this->caller_ = nullptr;
+ this->rethrow_exception();
+ }
+
+private:
+ template <typename> friend class awaitable_frame_base;
+ template <typename, typename> friend class awaitable_handler_base;
+ template <typename> friend class awaitable_thread;
+
+ union u
+ {
+ u() {}
+ ~u() {}
+ char c_;
+ Executor executor_;
+ } u_;
+
+ awaitable_frame_base<Executor>* top_of_stack_;
+ boost::asio::cancellation_slot parent_cancellation_slot_;
+ boost::asio::cancellation_state cancellation_state_;
+ bool has_executor_;
+ bool has_context_switched_;
+ bool throw_if_cancelled_;
+};
+
template <typename Executor>
class awaitable_thread
{
public:
typedef Executor executor_type;
+ typedef cancellation_slot cancellation_slot_type;
// Construct from the entry point of a new thread of execution.
- awaitable_thread(awaitable<void, Executor> p, const Executor& ex)
- : bottom_of_stack_(std::move(p)),
- top_of_stack_(bottom_of_stack_.frame_),
- executor_(ex)
- {
+ awaitable_thread(awaitable<awaitable_thread_entry_point, Executor> p,
+ const Executor& ex, cancellation_slot parent_cancel_slot,
+ cancellation_state cancel_state)
+ : bottom_of_stack_(std::move(p))
+ {
+ bottom_of_stack_.frame_->top_of_stack_ = bottom_of_stack_.frame_;
+ new (&bottom_of_stack_.frame_->u_.executor_) Executor(ex);
+ bottom_of_stack_.frame_->has_executor_ = true;
+ bottom_of_stack_.frame_->parent_cancellation_slot_ = parent_cancel_slot;
+ bottom_of_stack_.frame_->cancellation_state_ = cancel_state;
}
// Transfer ownership from another awaitable_thread.
awaitable_thread(awaitable_thread&& other) noexcept
- : bottom_of_stack_(std::move(other.bottom_of_stack_)),
- top_of_stack_(std::exchange(other.top_of_stack_, nullptr)),
- executor_(std::move(other.executor_))
+ : bottom_of_stack_(std::move(other.bottom_of_stack_))
{
}
if (bottom_of_stack_.valid())
{
// Coroutine "stack unwinding" must be performed through the executor.
- (post)(executor_,
+ auto* bottom_frame = bottom_of_stack_.frame_;
+ (post)(bottom_frame->u_.executor_,
[a = std::move(bottom_of_stack_)]() mutable
{
- awaitable<void, Executor>(std::move(a));
+ (void)awaitable<awaitable_thread_entry_point, Executor>(
+ std::move(a));
});
}
}
+ awaitable_frame<awaitable_thread_entry_point, Executor>* entry_point()
+ {
+ return bottom_of_stack_.frame_;
+ }
+
executor_type get_executor() const noexcept
{
- return executor_;
+ return bottom_of_stack_.frame_->u_.executor_;
+ }
+
+ cancellation_state get_cancellation_state() const noexcept
+ {
+ return bottom_of_stack_.frame_->cancellation_state_;
+ }
+
+ void reset_cancellation_state()
+ {
+ bottom_of_stack_.frame_->cancellation_state_ =
+ cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_);
+ }
+
+ template <typename Filter>
+ void reset_cancellation_state(BOOST_ASIO_MOVE_ARG(Filter) filter)
+ {
+ bottom_of_stack_.frame_->cancellation_state_ =
+ cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_,
+ BOOST_ASIO_MOVE_CAST(Filter)(filter));
+ }
+
+ template <typename InFilter, typename OutFilter>
+ void reset_cancellation_state(BOOST_ASIO_MOVE_ARG(InFilter) in_filter,
+ BOOST_ASIO_MOVE_ARG(OutFilter) out_filter)
+ {
+ bottom_of_stack_.frame_->cancellation_state_ =
+ cancellation_state(bottom_of_stack_.frame_->parent_cancellation_slot_,
+ BOOST_ASIO_MOVE_CAST(InFilter)(in_filter),
+ BOOST_ASIO_MOVE_CAST(OutFilter)(out_filter));
+ }
+
+ bool throw_if_cancelled() const
+ {
+ return bottom_of_stack_.frame_->throw_if_cancelled_;
+ }
+
+ void throw_if_cancelled(bool value)
+ {
+ bottom_of_stack_.frame_->throw_if_cancelled_ = value;
+ }
+
+ cancellation_slot_type get_cancellation_slot() const noexcept
+ {
+ return bottom_of_stack_.frame_->cancellation_state_.slot();
}
// Launch a new thread of execution.
void launch()
{
- top_of_stack_->attach_thread(this);
+ bottom_of_stack_.frame_->top_of_stack_->attach_thread(this);
pump();
}
// has been transferred to another resumable_thread object.
void pump()
{
- do top_of_stack_->resume(); while (top_of_stack_);
- if (bottom_of_stack_.valid())
+ do
+ bottom_of_stack_.frame_->top_of_stack_->resume();
+ while (bottom_of_stack_.frame_ && bottom_of_stack_.frame_->top_of_stack_);
+
+ if (bottom_of_stack_.frame_)
{
- awaitable<void, Executor> a(std::move(bottom_of_stack_));
+ awaitable<awaitable_thread_entry_point, Executor> a(
+ std::move(bottom_of_stack_));
a.frame_->rethrow_exception();
}
}
- awaitable<void, Executor> bottom_of_stack_;
- awaitable_frame_base<Executor>* top_of_stack_;
- executor_type executor_;
+ awaitable<awaitable_thread_entry_point, Executor> bottom_of_stack_;
};
} // namespace detail