#endif
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/exceptions.hpp>
+#include <boost/fiber/waker.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
using value_type = typename std::remove_reference<T>::type;
private:
- using wait_queue_type = context::wait_queue_t;
-
struct slot {
value_type value;
- context * ctx;
+ waker w;
- slot( value_type const& value_, context * ctx_) :
+ slot( value_type const& value_, waker && w) :
value{ value_ },
- ctx{ ctx_ } {
+ w{ std::move(w) } {
}
- slot( value_type && value_, context * ctx_) :
+ slot( value_type && value_, waker && w) :
value{ std::move( value_) },
- ctx{ ctx_ } {
+ w{ std::move(w) } {
}
};
// shared cacheline
std::atomic_bool closed_{ false };
mutable detail::spinlock splk_producers_{};
- wait_queue_type waiting_producers_{};
+ wait_queue waiting_producers_{};
mutable detail::spinlock splk_consumers_{};
- wait_queue_type waiting_consumers_{};
+ wait_queue waiting_consumers_{};
char pad_[cacheline_length];
bool is_empty_() {
}
void close() noexcept {
- context * active_ctx = context::active();
// set flag
if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
// notify current waiting
slot * s = slot_.load( std::memory_order_acquire);
if ( nullptr != s) {
// notify context
- active_ctx->schedule( s->ctx);
+ s->w.wake();
}
- // notify all waiting producers
detail::spinlock_lock lk1{ splk_producers_ };
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- auto expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( producer_ctx);
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( producer_ctx);
- }
- }
- // notify all waiting consumers
+ waiting_producers_.notify_all();
+
detail::spinlock_lock lk2{ splk_consumers_ };
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- auto expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- }
- }
+ waiting_consumers_.notify_all();
}
}
channel_op_status push( value_type const& value) {
context * active_ctx = context::active();
- slot s{ value, active_ctx };
+ slot s{ value, active_ctx->create_waker() };
for (;;) {
if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- auto expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- }
+ waiting_consumers_.notify_one();
// suspend till value has been consumed
active_ctx->suspend( lk);
// resumed
- if ( nullptr == s.ctx) {
- // value has been consumed
- return channel_op_status::success;
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
+ // channel was closed before value was consumed
+ return channel_op_status::closed;
}
- // channel was closed before value was consumed
- return channel_op_status::closed;
+ // value has been consumed
+ return channel_op_status::success;
}
detail::spinlock_lock lk{ splk_producers_ };
if ( BOOST_UNLIKELY( is_closed() ) ) {
if ( is_empty_() ) {
continue;
}
- active_ctx->wait_link( waiting_producers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this producer
- active_ctx->suspend( lk);
+
+ waiting_producers_.suspend_and_wait( lk, active_ctx);
// resumed, slot mabye free
}
}
channel_op_status push( value_type && value) {
context * active_ctx = context::active();
- slot s{ std::move( value), active_ctx };
+ slot s{ std::move( value), active_ctx->create_waker() };
for (;;) {
if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- auto expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- } if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- }
+ waiting_consumers_.notify_one();
// suspend till value has been consumed
active_ctx->suspend( lk);
// resumed
- if ( nullptr == s.ctx) {
- // value has been consumed
- return channel_op_status::success;
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
+ // channel was closed before value was consumed
+ return channel_op_status::closed;
}
- // channel was closed before value was consumed
- return channel_op_status::closed;
+ // value has been consumed
+ return channel_op_status::success;
}
detail::spinlock_lock lk{ splk_producers_ };
if ( BOOST_UNLIKELY( is_closed() ) ) {
if ( is_empty_() ) {
continue;
}
- active_ctx->wait_link( waiting_producers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this producer
- active_ctx->suspend( lk);
+ waiting_producers_.suspend_and_wait( lk, active_ctx);
// resumed, slot mabye free
}
}
channel_op_status push_wait_until( value_type const& value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
context * active_ctx = context::active();
- slot s{ value, active_ctx };
+ slot s{ value, active_ctx->create_waker() };
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
if ( BOOST_UNLIKELY( is_closed() ) ) {
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- auto expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- }
+ waiting_consumers_.notify_one();
// suspend this producer
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
// clear slot
slot * nil_slot = nullptr, * own_slot = & s;
slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
return channel_op_status::timeout;
}
// resumed
- if ( nullptr == s.ctx) {
- // value has been consumed
- return channel_op_status::success;
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
+ // channel was closed before value was consumed
+ return channel_op_status::closed;
}
- // channel was closed before value was consumed
- return channel_op_status::closed;
+ // value has been consumed
+ return channel_op_status::success;
}
detail::spinlock_lock lk{ splk_producers_ };
if ( BOOST_UNLIKELY( is_closed() ) ) {
if ( is_empty_() ) {
continue;
}
- active_ctx->wait_link( waiting_producers_);
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- // suspend this producer
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- waiting_producers_.remove( * active_ctx);
+
+ if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
+ {
return channel_op_status::timeout;
}
// resumed, slot maybe free
channel_op_status push_wait_until( value_type && value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
context * active_ctx = context::active();
- slot s{ std::move( value), active_ctx };
+ slot s{ std::move( value), active_ctx->create_waker() };
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
if ( BOOST_UNLIKELY( is_closed() ) ) {
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- auto expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- } if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
- }
+ waiting_consumers_.notify_one();
// suspend this producer
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
// clear slot
slot * nil_slot = nullptr, * own_slot = & s;
slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
return channel_op_status::timeout;
}
// resumed
- if ( nullptr == s.ctx) {
- // value has been consumed
- return channel_op_status::success;
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
+ // channel was closed before value was consumed
+ return channel_op_status::closed;
}
- // channel was closed before value was consumed
- return channel_op_status::closed;
+ // value has been consumed
+ return channel_op_status::success;
}
detail::spinlock_lock lk{ splk_producers_ };
if ( BOOST_UNLIKELY( is_closed() ) ) {
if ( is_empty_() ) {
continue;
}
- active_ctx->wait_link( waiting_producers_);
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- // suspend this producer
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- waiting_producers_.remove( * active_ctx);
+ if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
+ {
return channel_op_status::timeout;
}
// resumed, slot maybe free
if ( nullptr != ( s = try_pop_() ) ) {
{
detail::spinlock_lock lk{ splk_producers_ };
- // notify one waiting producer
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- auto expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- lk.unlock();
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- } if ( static_cast< std::intptr_t >( 0) == expected) {
- lk.unlock();
- // no timed-wait op.
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- }
- }
+ waiting_producers_.notify_one();
}
value = std::move( s->value);
// notify context
-#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
- active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
-#else
- active_ctx->schedule( std::exchange( s->ctx, nullptr) );
-#endif
+ s->w.wake();
return channel_op_status::success;
}
detail::spinlock_lock lk{ splk_consumers_ };
if ( ! is_empty_() ) {
continue;
}
- active_ctx->wait_link( waiting_consumers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this consumer
- active_ctx->suspend( lk);
+ waiting_consumers_.suspend_and_wait( lk, active_ctx);
// resumed, slot mabye set
}
}
if ( nullptr != ( s = try_pop_() ) ) {
{
detail::spinlock_lock lk{ splk_producers_ };
- // notify one waiting producer
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- auto expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- lk.unlock();
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- } if ( static_cast< std::intptr_t >( 0) == expected) {
- lk.unlock();
- // no timed-wait op.
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- }
- }
+ waiting_producers_.notify_one();
}
// consume value
value_type value = std::move( s->value);
// notify context
-#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
- active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
-#else
- active_ctx->schedule( std::exchange( s->ctx, nullptr) );
-#endif
+ s->w.wake();
return std::move( value);
}
detail::spinlock_lock lk{ splk_consumers_ };
if ( ! is_empty_() ) {
continue;
}
- active_ctx->wait_link( waiting_consumers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this consumer
- active_ctx->suspend( lk);
+ waiting_consumers_.suspend_and_wait( lk, active_ctx);
// resumed, slot mabye set
}
}
if ( nullptr != ( s = try_pop_() ) ) {
{
detail::spinlock_lock lk{ splk_producers_ };
- // notify one waiting producer
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- auto expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- lk.unlock();
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- }
- if ( static_cast< std::intptr_t >( 0) == expected) {
- lk.unlock();
- // no timed-wait op.
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- }
- }
+ waiting_producers_.notify_one();
}
// consume value
value = std::move( s->value);
// notify context
-#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
- active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
-#else
- active_ctx->schedule( std::exchange( s->ctx, nullptr) );
-#endif
+ s->w.wake();
return channel_op_status::success;
}
detail::spinlock_lock lk{ splk_consumers_ };
if ( ! is_empty_() ) {
continue;
}
- active_ctx->wait_link( waiting_consumers_);
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- // suspend this consumer
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- waiting_consumers_.remove( * active_ctx);
+ if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
return channel_op_status::timeout;
}
}
unbuffered_channel * chan_{ nullptr };
storage_type storage_;
- void increment_() {
+ void increment_( bool initial = false) {
BOOST_ASSERT( nullptr != chan_);
try {
+ if ( ! initial) {
+ reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
+ }
::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
} catch ( fiber_error const&) {
chan_ = nullptr;
explicit iterator( unbuffered_channel< T > * chan) noexcept :
chan_{ chan } {
- increment_();
+ increment_( true);
}
iterator( iterator const& other) noexcept :