#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/convert.hpp>
+#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
+#include <boost/fiber/detail/exchange.hpp>
+#endif
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/exceptions.hpp>
void close() noexcept {
context * active_ctx = context::active();
- // notify all waiting producers
- closed_.store( true, std::memory_order_release);
- detail::spinlock_lock lk1{ splk_producers_ };
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( producer_ctx);
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
+ // set flag
+ if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
+ // notify current waiting
+ slot * s = slot_.load( std::memory_order_acquire);
+ if ( nullptr != s) {
// notify context
- active_ctx->schedule( producer_ctx);
+ active_ctx->schedule( s->ctx);
}
- }
- // notify all waiting consumers
- detail::spinlock_lock lk2{ splk_consumers_ };
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify context
- active_ctx->schedule( consumer_ctx);
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
+ // notify all waiting producers
+ detail::spinlock_lock lk1{ splk_producers_ };
+ while ( ! waiting_producers_.empty() ) {
+ context * producer_ctx = & waiting_producers_.front();
+ waiting_producers_.pop_front();
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ }
+ }
+ // notify all waiting consumers
+ detail::spinlock_lock lk2{ splk_consumers_ };
+ while ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ }
}
}
}
}
// suspend till value has been consumed
active_ctx->suspend( lk);
- // resumed, value has been consumed
- return channel_op_status::success;
+ // resumed
+ if ( nullptr == s.ctx) {
+ // value has been consumed
+ return channel_op_status::success;
+ } else {
+ // channel was closed before value was consumed
+ return channel_op_status::closed;
+ }
} else {
detail::spinlock_lock lk{ splk_producers_ };
if ( BOOST_UNLIKELY( is_closed() ) ) {
}
// suspend till value has been consumed
active_ctx->suspend( lk);
- // resumed, value has been consumed
- return channel_op_status::success;
+ // resumed
+ if ( nullptr == s.ctx) {
+ // value has been consumed
+ return channel_op_status::success;
+ } else {
+ // channel was closed before value was consumed
+ return channel_op_status::closed;
+ }
} else {
detail::spinlock_lock lk{ splk_producers_ };
if ( BOOST_UNLIKELY( is_closed() ) ) {
// resumed, value has not been consumed
return channel_op_status::timeout;
}
- // resumed, value has been consumed
- return channel_op_status::success;
+ // resumed
+ if ( nullptr == s.ctx) {
+ // value has been consumed
+ return channel_op_status::success;
+ } else {
+ // channel was closed before value was consumed
+ return channel_op_status::closed;
+ }
} else {
detail::spinlock_lock lk{ splk_producers_ };
if ( BOOST_UNLIKELY( is_closed() ) ) {
// resumed, value has not been consumed
return channel_op_status::timeout;
}
- // resumed, value has been consumed
- return channel_op_status::success;
+ // resumed
+ if ( nullptr == s.ctx) {
+ // value has been consumed
+ return channel_op_status::success;
+ } else {
+ // channel was closed before value was consumed
+ return channel_op_status::closed;
+ }
} else {
detail::spinlock_lock lk{ splk_producers_ };
if ( BOOST_UNLIKELY( is_closed() ) ) {
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ lk.unlock();
// notify context
active_ctx->schedule( producer_ctx);
break;
} else if ( static_cast< std::intptr_t >( 0) == expected) {
+ lk.unlock();
// no timed-wait op.
// notify context
active_ctx->schedule( producer_ctx);
}
value = std::move( s->value);
// notify context
- active_ctx->schedule( s->ctx);
+#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
+ active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
+#else
+ active_ctx->schedule( std::exchange( s->ctx, nullptr) );
+#endif
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_consumers_ };
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ lk.unlock();
// notify context
active_ctx->schedule( producer_ctx);
break;
} else if ( static_cast< std::intptr_t >( 0) == expected) {
+ lk.unlock();
// no timed-wait op.
// notify context
active_ctx->schedule( producer_ctx);
// consume value
value_type value = std::move( s->value);
// notify context
- active_ctx->schedule( s->ctx);
+#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
+ active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
+#else
+ active_ctx->schedule( std::exchange( s->ctx, nullptr) );
+#endif
return std::move( value);
} else {
detail::spinlock_lock lk{ splk_consumers_ };
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ lk.unlock();
// notify context
active_ctx->schedule( producer_ctx);
break;
} else if ( static_cast< std::intptr_t >( 0) == expected) {
+ lk.unlock();
// no timed-wait op.
// notify context
active_ctx->schedule( producer_ctx);
// consume value
value = std::move( s->value);
// notify context
- active_ctx->schedule( s->ctx);
+#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
+ active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
+#else
+ active_ctx->schedule( std::exchange( s->ctx, nullptr) );
+#endif
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_consumers_ };
}
iterator & operator++() {
+ reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
increment_();
return * this;
}