template< typename T >
class unbuffered_channel {
public:
- typedef T value_type;
+ typedef typename std::remove_reference< T >::type value_type;
private:
typedef context::wait_queue_t wait_queue_type;
};
// shared cacheline
- std::atomic< slot * > slot_{ nullptr };
+ std::atomic< slot * > slot_{ nullptr };
// shared cacheline
- std::atomic_bool closed_{ false };
- mutable detail::spinlock splk_producers_{};
- wait_queue_type waiting_producers_{};
- mutable detail::spinlock splk_consumers_{};
- wait_queue_type waiting_consumers_{};
- char pad_[cacheline_length];
+ std::atomic_bool closed_{ false };
+ mutable detail::spinlock splk_producers_{};
+ wait_queue_type waiting_producers_{};
+ mutable detail::spinlock splk_consumers_{};
+ wait_queue_type waiting_consumers_{};
+ char pad_[cacheline_length];
bool is_empty_() {
return nullptr == slot_.load( std::memory_order_acquire);
waiting_producers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
} else if ( static_cast< std::intptr_t >( 0) == expected) {
// no timed-wait op.
// notify context
active_ctx->schedule( producer_ctx);
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
// notify all waiting consumers
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
} else if ( static_cast< std::intptr_t >( 0) == expected) {
// no timed-wait op.
// notify context
active_ctx->schedule( consumer_ctx);
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
}
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
// suspend till value has been consumed
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
// suspend till value has been consumed
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
// suspend this producer
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
continue;
}
active_ctx->wait_link( waiting_producers_);
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
// suspend this producer
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
continue;
}
active_ctx->wait_link( waiting_producers_);
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
}
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
}
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
}
continue;
}
active_ctx->wait_link( waiting_consumers_);
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this consumer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
}
}
- class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > {
+ class iterator {
private:
typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
}
public:
- typedef typename iterator::pointer pointer_t;
- typedef typename iterator::reference reference_t;
+ typedef std::input_iterator_tag iterator_category;
+ typedef std::ptrdiff_t difference_type;
+ typedef value_type * pointer;
+ typedef value_type & reference;
+
+ typedef pointer pointer_t;
+ typedef reference reference_t;
iterator() noexcept = default;