template< typename T >
class buffered_channel {
public:
- typedef typename std::remove_reference< T >::type value_type;
+ using value_type = typename std::remove_reference<T>::type;
private:
- typedef context::wait_queue_t wait_queue_type;
- typedef value_type slot_type;
+ using wait_queue_type = context::wait_queue_t;
+ using slot_type = value_type;
mutable detail::spinlock splk_{};
wait_queue_type waiting_producers_{};
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ auto expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
// notify context
active_ctx->schedule( producer_ctx);
while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ auto expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
// notify context
active_ctx->schedule( consumer_ctx);
detail::spinlock_lock lk{ splk_ };
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
- } else if ( is_full_() ) {
+ }
+ if ( is_full_() ) {
return channel_op_status::full;
- } else {
- slots_[pidx_] = value;
- pidx_ = (pidx_ + 1) % capacity_;
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- lk.unlock();
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- lk.unlock();
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
+ }
+ slots_[pidx_] = value;
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ while ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ auto expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ lk.unlock();
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ }
+ if ( static_cast< std::intptr_t >( 0) == expected) {
+ lk.unlock();
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
}
- return channel_op_status::success;
}
+ return channel_op_status::success;
}
channel_op_status try_push( value_type && value) {
detail::spinlock_lock lk{ splk_ };
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
- } else if ( is_full_() ) {
+ }
+ if ( is_full_() ) {
return channel_op_status::full;
- } else {
- slots_[pidx_] = std::move( value);
- pidx_ = (pidx_ + 1) % capacity_;
- // notify one waiting consumer
- while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx = & waiting_consumers_.front();
- waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- lk.unlock();
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- lk.unlock();
- // no timed-wait op.
- // notify context
- active_ctx->schedule( consumer_ctx);
- break;
- }
+ }
+ slots_[pidx_] = std::move( value);
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ while ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ auto expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ lk.unlock();
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ }
+ if ( static_cast< std::intptr_t >( 0) == expected) {
+ lk.unlock();
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
}
- return channel_op_status::success;
}
+ return channel_op_status::success;
}
channel_op_status push( value_type const& value) {
detail::spinlock_lock lk{ splk_ };
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
- } else if ( is_full_() ) {
+ }
+ if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ auto expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
lk.unlock();
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ }
+ if ( static_cast< std::intptr_t >( 0) == expected) {
lk.unlock();
// no timed-wait op.
// notify context
detail::spinlock_lock lk{ splk_ };
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
- } else if ( is_full_() ) {
+ }
+ if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ auto expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
lk.unlock();
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ }
+ if ( static_cast< std::intptr_t >( 0) == expected) {
lk.unlock();
// no timed-wait op.
// notify context
detail::spinlock_lock lk{ splk_ };
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
- } else if ( is_full_() ) {
+ }
+ if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ auto expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
lk.unlock();
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ }
+ if ( static_cast< std::intptr_t >( 0) == expected) {
lk.unlock();
// no timed-wait op.
// notify context
detail::spinlock_lock lk{ splk_ };
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
- } else if ( is_full_() ) {
+ }
+ if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ auto expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
lk.unlock();
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ }
+ if ( static_cast< std::intptr_t >( 0) == expected) {
lk.unlock();
// no timed-wait op.
// notify context
return is_closed_()
? channel_op_status::closed
: channel_op_status::empty;
- } else {
- value = std::move( slots_[cidx_]);
- cidx_ = (cidx_ + 1) % capacity_;
- // notify one waiting producer
- while ( ! waiting_producers_.empty() ) {
- context * producer_ctx = & waiting_producers_.front();
- waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
- if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- lk.unlock();
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
- lk.unlock();
- // no timed-wait op.
- // notify context
- active_ctx->schedule( producer_ctx);
- break;
- }
+ }
+ value = std::move( slots_[cidx_]);
+ cidx_ = (cidx_ + 1) % capacity_;
+ // notify one waiting producer
+ while ( ! waiting_producers_.empty() ) {
+ context * producer_ctx = & waiting_producers_.front();
+ waiting_producers_.pop_front();
+ auto expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ lk.unlock();
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ }
+ if ( static_cast< std::intptr_t >( 0) == expected) {
+ lk.unlock();
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
}
- return channel_op_status::success;
}
+ return channel_op_status::success;
}
channel_op_status pop( value_type & value) {
if ( is_empty_() ) {
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
- } else {
- active_ctx->wait_link( waiting_consumers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this consumer
- active_ctx->suspend( lk);
}
+ active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
+ // suspend this consumer
+ active_ctx->suspend( lk);
} else {
value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ auto expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
lk.unlock();
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ }
+ if ( static_cast< std::intptr_t >( 0) == expected) {
lk.unlock();
// no timed-wait op.
// notify context
throw fiber_error{
std::make_error_code( std::errc::operation_not_permitted),
"boost fiber: channel is closed" };
- } else {
- active_ctx->wait_link( waiting_consumers_);
- active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
- // suspend this consumer
- active_ctx->suspend( lk);
}
+ active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
+ // suspend this consumer
+ active_ctx->suspend( lk);
} else {
value_type value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ auto expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
lk.unlock();
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ }
+ if ( static_cast< std::intptr_t >( 0) == expected) {
lk.unlock();
// no timed-wait op.
// notify context
break;
}
}
- return std::move( value);
+ return value;
}
}
}
if ( is_empty_() ) {
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
- } else {
- active_ctx->wait_link( waiting_consumers_);
- active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
- // suspend this consumer
- if ( ! active_ctx->wait_until( timeout_time, lk) ) {
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- waiting_consumers_.remove( * active_ctx);
- return channel_op_status::timeout;
- }
+ }
+ active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
+ // suspend this consumer
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
+ // relock local lk
+ lk.lock();
+ // remove from waiting-queue
+ waiting_consumers_.remove( * active_ctx);
+ return channel_op_status::timeout;
}
} else {
value = std::move( slots_[cidx_]);
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ auto expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
lk.unlock();
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ } if ( static_cast< std::intptr_t >( 0) == expected) {
lk.unlock();
// no timed-wait op.
// notify context
}
public:
- typedef std::input_iterator_tag iterator_category;
- typedef std::ptrdiff_t difference_type;
- typedef value_type * pointer;
- typedef value_type & reference;
+ using iterator_category = std::input_iterator_tag;
+ using difference_type = std::ptrdiff_t;
+ using pointer = value_type *;
+ using reference = value_type &;
- typedef pointer pointer_t;
- typedef reference reference_t;
+ using pointer_t = pointer;
+ using reference_t = reference;
iterator() noexcept = default;
return * this;
}
- iterator operator++( int) = delete;
+ const iterator operator++( int) = delete;
reference_t operator*() noexcept {
return * reinterpret_cast< value_type * >( std::addressof( storage_) );