2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
8 #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
17 #include <boost/config.hpp>
19 #include <boost/fiber/channel_op_status.hpp>
20 #include <boost/fiber/context.hpp>
21 #include <boost/fiber/detail/config.hpp>
22 #include <boost/fiber/detail/convert.hpp>
23 #include <boost/fiber/detail/spinlock.hpp>
24 #include <boost/fiber/exceptions.hpp>
26 #ifdef BOOST_HAS_ABI_HEADERS
27 # include BOOST_ABI_PREFIX
33 template< typename T >
34 class unbuffered_channel {
36 typedef typename std::remove_reference< T >::type value_type;
39 typedef context::wait_queue_t wait_queue_type;
45 slot( value_type const& value_, context * ctx_) :
50 slot( value_type && value_, context * ctx_) :
51 value{ std::move( value_) },
57 std::atomic< slot * > slot_{ nullptr };
59 std::atomic_bool closed_{ false };
60 mutable detail::spinlock splk_producers_{};
61 wait_queue_type waiting_producers_{};
62 mutable detail::spinlock splk_consumers_{};
63 wait_queue_type waiting_consumers_{};
64 char pad_[cacheline_length];
67 return nullptr == slot_.load( std::memory_order_acquire);
70 bool try_push_( slot * own_slot) {
72 slot * s = slot_.load( std::memory_order_acquire);
74 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
85 slot * nil_slot = nullptr;
87 slot * s = slot_.load( std::memory_order_acquire);
89 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
97 unbuffered_channel() {
100 ~unbuffered_channel() {
104 unbuffered_channel( unbuffered_channel const&) = delete;
105 unbuffered_channel & operator=( unbuffered_channel const&) = delete;
107 bool is_closed() const noexcept {
108 return closed_.load( std::memory_order_acquire);
111 void close() noexcept {
112 context * active_ctx = context::active();
113 // notify all waiting producers
114 closed_.store( true, std::memory_order_release);
115 detail::spinlock_lock lk1{ splk_producers_ };
116 while ( ! waiting_producers_.empty() ) {
117 context * producer_ctx = & waiting_producers_.front();
118 waiting_producers_.pop_front();
119 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
120 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
122 active_ctx->schedule( producer_ctx);
123 } else if ( static_cast< std::intptr_t >( 0) == expected) {
126 active_ctx->schedule( producer_ctx);
129 // notify all waiting consumers
130 detail::spinlock_lock lk2{ splk_consumers_ };
131 while ( ! waiting_consumers_.empty() ) {
132 context * consumer_ctx = & waiting_consumers_.front();
133 waiting_consumers_.pop_front();
134 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
135 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
137 active_ctx->schedule( consumer_ctx);
138 } else if ( static_cast< std::intptr_t >( 0) == expected) {
141 active_ctx->schedule( consumer_ctx);
146 channel_op_status push( value_type const& value) {
147 context * active_ctx = context::active();
148 slot s{ value, active_ctx };
150 if ( BOOST_UNLIKELY( is_closed() ) ) {
151 return channel_op_status::closed;
153 if ( try_push_( & s) ) {
154 detail::spinlock_lock lk{ splk_consumers_ };
155 // notify one waiting consumer
156 while ( ! waiting_consumers_.empty() ) {
157 context * consumer_ctx = & waiting_consumers_.front();
158 waiting_consumers_.pop_front();
159 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
160 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
162 active_ctx->schedule( consumer_ctx);
164 } else if ( static_cast< std::intptr_t >( 0) == expected) {
167 active_ctx->schedule( consumer_ctx);
171 // suspend till value has been consumed
172 active_ctx->suspend( lk);
173 // resumed, value has been consumed
174 return channel_op_status::success;
176 detail::spinlock_lock lk{ splk_producers_ };
177 if ( BOOST_UNLIKELY( is_closed() ) ) {
178 return channel_op_status::closed;
183 active_ctx->wait_link( waiting_producers_);
184 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
185 // suspend this producer
186 active_ctx->suspend( lk);
187 // resumed, slot mabye free
192 channel_op_status push( value_type && value) {
193 context * active_ctx = context::active();
194 slot s{ std::move( value), active_ctx };
196 if ( BOOST_UNLIKELY( is_closed() ) ) {
197 return channel_op_status::closed;
199 if ( try_push_( & s) ) {
200 detail::spinlock_lock lk{ splk_consumers_ };
201 // notify one waiting consumer
202 while ( ! waiting_consumers_.empty() ) {
203 context * consumer_ctx = & waiting_consumers_.front();
204 waiting_consumers_.pop_front();
205 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
206 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
208 active_ctx->schedule( consumer_ctx);
210 } else if ( static_cast< std::intptr_t >( 0) == expected) {
213 active_ctx->schedule( consumer_ctx);
217 // suspend till value has been consumed
218 active_ctx->suspend( lk);
219 // resumed, value has been consumed
220 return channel_op_status::success;
222 detail::spinlock_lock lk{ splk_producers_ };
223 if ( BOOST_UNLIKELY( is_closed() ) ) {
224 return channel_op_status::closed;
229 active_ctx->wait_link( waiting_producers_);
230 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
231 // suspend this producer
232 active_ctx->suspend( lk);
233 // resumed, slot mabye free
238 template< typename Rep, typename Period >
239 channel_op_status push_wait_for( value_type const& value,
240 std::chrono::duration< Rep, Period > const& timeout_duration) {
241 return push_wait_until( value,
242 std::chrono::steady_clock::now() + timeout_duration);
245 template< typename Rep, typename Period >
246 channel_op_status push_wait_for( value_type && value,
247 std::chrono::duration< Rep, Period > const& timeout_duration) {
248 return push_wait_until( std::forward< value_type >( value),
249 std::chrono::steady_clock::now() + timeout_duration);
252 template< typename Clock, typename Duration >
253 channel_op_status push_wait_until( value_type const& value,
254 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
255 context * active_ctx = context::active();
256 slot s{ value, active_ctx };
257 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
259 if ( BOOST_UNLIKELY( is_closed() ) ) {
260 return channel_op_status::closed;
262 if ( try_push_( & s) ) {
263 detail::spinlock_lock lk{ splk_consumers_ };
264 // notify one waiting consumer
265 while ( ! waiting_consumers_.empty() ) {
266 context * consumer_ctx = & waiting_consumers_.front();
267 waiting_consumers_.pop_front();
268 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
269 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
271 active_ctx->schedule( consumer_ctx);
273 } else if ( static_cast< std::intptr_t >( 0) == expected) {
276 active_ctx->schedule( consumer_ctx);
280 // suspend this producer
281 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
282 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
284 slot * nil_slot = nullptr, * own_slot = & s;
285 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
286 // resumed, value has not been consumed
287 return channel_op_status::timeout;
289 // resumed, value has been consumed
290 return channel_op_status::success;
292 detail::spinlock_lock lk{ splk_producers_ };
293 if ( BOOST_UNLIKELY( is_closed() ) ) {
294 return channel_op_status::closed;
299 active_ctx->wait_link( waiting_producers_);
300 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
301 // suspend this producer
302 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
305 // remove from waiting-queue
306 waiting_producers_.remove( * active_ctx);
307 return channel_op_status::timeout;
309 // resumed, slot maybe free
314 template< typename Clock, typename Duration >
315 channel_op_status push_wait_until( value_type && value,
316 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
317 context * active_ctx = context::active();
318 slot s{ std::move( value), active_ctx };
319 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
321 if ( BOOST_UNLIKELY( is_closed() ) ) {
322 return channel_op_status::closed;
324 if ( try_push_( & s) ) {
325 detail::spinlock_lock lk{ splk_consumers_ };
326 // notify one waiting consumer
327 while ( ! waiting_consumers_.empty() ) {
328 context * consumer_ctx = & waiting_consumers_.front();
329 waiting_consumers_.pop_front();
330 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
331 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
333 active_ctx->schedule( consumer_ctx);
335 } else if ( static_cast< std::intptr_t >( 0) == expected) {
338 active_ctx->schedule( consumer_ctx);
342 // suspend this producer
343 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
344 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
346 slot * nil_slot = nullptr, * own_slot = & s;
347 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
348 // resumed, value has not been consumed
349 return channel_op_status::timeout;
351 // resumed, value has been consumed
352 return channel_op_status::success;
354 detail::spinlock_lock lk{ splk_producers_ };
355 if ( BOOST_UNLIKELY( is_closed() ) ) {
356 return channel_op_status::closed;
361 active_ctx->wait_link( waiting_producers_);
362 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
363 // suspend this producer
364 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
367 // remove from waiting-queue
368 waiting_producers_.remove( * active_ctx);
369 return channel_op_status::timeout;
371 // resumed, slot maybe free
376 channel_op_status pop( value_type & value) {
377 context * active_ctx = context::active();
380 if ( nullptr != ( s = try_pop_() ) ) {
382 detail::spinlock_lock lk{ splk_producers_ };
383 // notify one waiting producer
384 while ( ! waiting_producers_.empty() ) {
385 context * producer_ctx = & waiting_producers_.front();
386 waiting_producers_.pop_front();
388 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
389 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
391 active_ctx->schedule( producer_ctx);
393 } else if ( static_cast< std::intptr_t >( 0) == expected) {
396 active_ctx->schedule( producer_ctx);
401 value = std::move( s->value);
403 active_ctx->schedule( s->ctx);
404 return channel_op_status::success;
406 detail::spinlock_lock lk{ splk_consumers_ };
407 if ( BOOST_UNLIKELY( is_closed() ) ) {
408 return channel_op_status::closed;
410 if ( ! is_empty_() ) {
413 active_ctx->wait_link( waiting_consumers_);
414 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
415 // suspend this consumer
416 active_ctx->suspend( lk);
417 // resumed, slot mabye set
422 value_type value_pop() {
423 context * active_ctx = context::active();
426 if ( nullptr != ( s = try_pop_() ) ) {
428 detail::spinlock_lock lk{ splk_producers_ };
429 // notify one waiting producer
430 while ( ! waiting_producers_.empty() ) {
431 context * producer_ctx = & waiting_producers_.front();
432 waiting_producers_.pop_front();
434 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
435 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
437 active_ctx->schedule( producer_ctx);
439 } else if ( static_cast< std::intptr_t >( 0) == expected) {
442 active_ctx->schedule( producer_ctx);
448 value_type value = std::move( s->value);
450 active_ctx->schedule( s->ctx);
451 return std::move( value);
453 detail::spinlock_lock lk{ splk_consumers_ };
454 if ( BOOST_UNLIKELY( is_closed() ) ) {
456 std::make_error_code( std::errc::operation_not_permitted),
457 "boost fiber: channel is closed" };
459 if ( ! is_empty_() ) {
462 active_ctx->wait_link( waiting_consumers_);
463 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
464 // suspend this consumer
465 active_ctx->suspend( lk);
466 // resumed, slot mabye set
471 template< typename Rep, typename Period >
472 channel_op_status pop_wait_for( value_type & value,
473 std::chrono::duration< Rep, Period > const& timeout_duration) {
474 return pop_wait_until( value,
475 std::chrono::steady_clock::now() + timeout_duration);
478 template< typename Clock, typename Duration >
479 channel_op_status pop_wait_until( value_type & value,
480 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
481 context * active_ctx = context::active();
483 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
485 if ( nullptr != ( s = try_pop_() ) ) {
487 detail::spinlock_lock lk{ splk_producers_ };
488 // notify one waiting producer
489 while ( ! waiting_producers_.empty() ) {
490 context * producer_ctx = & waiting_producers_.front();
491 waiting_producers_.pop_front();
493 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
494 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
496 active_ctx->schedule( producer_ctx);
498 } else if ( static_cast< std::intptr_t >( 0) == expected) {
501 active_ctx->schedule( producer_ctx);
507 value = std::move( s->value);
509 active_ctx->schedule( s->ctx);
510 return channel_op_status::success;
512 detail::spinlock_lock lk{ splk_consumers_ };
513 if ( BOOST_UNLIKELY( is_closed() ) ) {
514 return channel_op_status::closed;
516 if ( ! is_empty_() ) {
519 active_ctx->wait_link( waiting_consumers_);
520 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
521 // suspend this consumer
522 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
525 // remove from waiting-queue
526 waiting_consumers_.remove( * active_ctx);
527 return channel_op_status::timeout;
535 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
537 unbuffered_channel * chan_{ nullptr };
538 storage_type storage_;
541 BOOST_ASSERT( nullptr != chan_);
543 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
544 } catch ( fiber_error const&) {
550 typedef std::input_iterator_tag iterator_category;
551 typedef std::ptrdiff_t difference_type;
552 typedef value_type * pointer;
553 typedef value_type & reference;
555 typedef pointer pointer_t;
556 typedef reference reference_t;
558 iterator() noexcept = default;
560 explicit iterator( unbuffered_channel< T > * chan) noexcept :
565 iterator( iterator const& other) noexcept :
566 chan_{ other.chan_ } {
569 iterator & operator=( iterator const& other) noexcept {
570 if ( this == & other) return * this;
575 bool operator==( iterator const& other) const noexcept {
576 return other.chan_ == chan_;
579 bool operator!=( iterator const& other) const noexcept {
580 return other.chan_ != chan_;
583 iterator & operator++() {
588 iterator operator++( int) = delete;
590 reference_t operator*() noexcept {
591 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
594 pointer_t operator->() noexcept {
595 return reinterpret_cast< value_type * >( std::addressof( storage_) );
599 friend class iterator;
602 template< typename T >
603 typename unbuffered_channel< T >::iterator
604 begin( unbuffered_channel< T > & chan) {
605 return typename unbuffered_channel< T >::iterator( & chan);
608 template< typename T >
609 typename unbuffered_channel< T >::iterator
610 end( unbuffered_channel< T > &) {
611 return typename unbuffered_channel< T >::iterator();
616 #ifdef BOOST_HAS_ABI_HEADERS
617 # include BOOST_ABI_SUFFIX
620 #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H