2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
9 #define BOOST_FIBERS_BUFFERED_CHANNEL_H
16 #include <type_traits>
18 #include <boost/config.hpp>
20 #include <boost/fiber/channel_op_status.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/detail/config.hpp>
23 #include <boost/fiber/detail/convert.hpp>
24 #include <boost/fiber/detail/spinlock.hpp>
25 #include <boost/fiber/exceptions.hpp>
27 #ifdef BOOST_HAS_ABI_HEADERS
28 # include BOOST_ABI_PREFIX
34 template< typename T >
35 class buffered_channel {
40 typedef context::wait_queue_t wait_queue_type;
43 mutable detail::spinlock splk_{};
44 wait_queue_type waiting_producers_{};
45 wait_queue_type waiting_consumers_{};
47 std::size_t pidx_{ 0 };
48 std::size_t cidx_{ 0 };
49 std::size_t capacity_;
50 bool closed_{ false };
52 bool is_full_() const noexcept {
53 return cidx_ == ((pidx_ + 1) % capacity_);
56 bool is_empty_() const noexcept {
57 return cidx_ == pidx_;
60 bool is_closed_() const noexcept {
65 explicit buffered_channel( std::size_t capacity) :
66 capacity_{ capacity } {
67 if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
68 throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
69 "boost fiber: buffer capacity is invalid" };
71 slots_ = new slot_type[capacity_];
79 buffered_channel( buffered_channel const&) = delete;
80 buffered_channel & operator=( buffered_channel const&) = delete;
82 bool is_closed() const noexcept {
83 detail::spinlock_lock lk{ splk_ };
87 void close() noexcept {
88 context * active_ctx = context::active();
89 detail::spinlock_lock lk{ splk_ };
91 // notify all waiting producers
92 while ( ! waiting_producers_.empty() ) {
93 context * producer_ctx = & waiting_producers_.front();
94 waiting_producers_.pop_front();
95 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
96 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
97 // notify before timeout
98 intrusive_ptr_release( producer_ctx);
100 active_ctx->schedule( producer_ctx);
101 } else if ( static_cast< std::intptr_t >( 0) == expected) {
104 active_ctx->schedule( producer_ctx);
107 // expected == -1: notify after timeout, same timed-wait op.
108 // expected == <any>: notify after timeout, another timed-wait op. was already started
109 intrusive_ptr_release( producer_ctx);
113 // notify all waiting consumers
114 while ( ! waiting_consumers_.empty() ) {
115 context * consumer_ctx = & waiting_consumers_.front();
116 waiting_consumers_.pop_front();
117 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
118 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
119 // notify before timeout
120 intrusive_ptr_release( consumer_ctx);
122 active_ctx->schedule( consumer_ctx);
123 } else if ( static_cast< std::intptr_t >( 0) == expected) {
126 active_ctx->schedule( consumer_ctx);
129 // expected == -1: notify after timeout, same timed-wait op.
130 // expected == <any>: notify after timeout, another timed-wait op. was already started
131 intrusive_ptr_release( consumer_ctx);
137 channel_op_status try_push( value_type const& value) {
138 context * active_ctx = context::active();
139 detail::spinlock_lock lk{ splk_ };
140 if ( BOOST_UNLIKELY( is_closed_() ) ) {
141 return channel_op_status::closed;
142 } else if ( is_full_() ) {
143 return channel_op_status::full;
145 slots_[pidx_] = value;
146 pidx_ = (pidx_ + 1) % capacity_;
147 // notify one waiting consumer
148 while ( ! waiting_consumers_.empty() ) {
149 context * consumer_ctx = & waiting_consumers_.front();
150 waiting_consumers_.pop_front();
151 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
152 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
153 // notify before timeout
154 intrusive_ptr_release( consumer_ctx);
156 active_ctx->schedule( consumer_ctx);
158 } else if ( static_cast< std::intptr_t >( 0) == expected) {
161 active_ctx->schedule( consumer_ctx);
165 // expected == -1: notify after timeout, same timed-wait op.
166 // expected == <any>: notify after timeout, another timed-wait op. was already started
167 intrusive_ptr_release( consumer_ctx);
171 return channel_op_status::success;
175 channel_op_status try_push( value_type && value) {
176 context * active_ctx = context::active();
177 detail::spinlock_lock lk{ splk_ };
178 if ( BOOST_UNLIKELY( is_closed_() ) ) {
179 return channel_op_status::closed;
180 } else if ( is_full_() ) {
181 return channel_op_status::full;
183 slots_[pidx_] = std::move( value);
184 pidx_ = (pidx_ + 1) % capacity_;
185 // notify one waiting consumer
186 while ( ! waiting_consumers_.empty() ) {
187 context * consumer_ctx = & waiting_consumers_.front();
188 waiting_consumers_.pop_front();
190 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
191 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
192 // notify before timeout
193 intrusive_ptr_release( consumer_ctx);
195 active_ctx->schedule( consumer_ctx);
197 } else if ( static_cast< std::intptr_t >( 0) == expected) {
200 active_ctx->schedule( consumer_ctx);
204 // expected == -1: notify after timeout, same timed-wait op.
205 // expected == <any>: notify after timeout, another timed-wait op. was already started
206 intrusive_ptr_release( consumer_ctx);
210 return channel_op_status::success;
214 channel_op_status push( value_type const& value) {
215 context * active_ctx = context::active();
217 detail::spinlock_lock lk{ splk_ };
218 if ( BOOST_UNLIKELY( is_closed_() ) ) {
219 return channel_op_status::closed;
220 } else if ( is_full_() ) {
221 active_ctx->wait_link( waiting_producers_);
222 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
223 // suspend this producer
224 active_ctx->suspend( lk);
226 slots_[pidx_] = value;
227 pidx_ = (pidx_ + 1) % capacity_;
228 // notify one waiting consumer
229 while ( ! waiting_consumers_.empty() ) {
230 context * consumer_ctx = & waiting_consumers_.front();
231 waiting_consumers_.pop_front();
233 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
234 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
235 // notify before timeout
236 intrusive_ptr_release( consumer_ctx);
238 active_ctx->schedule( consumer_ctx);
240 } else if ( static_cast< std::intptr_t >( 0) == expected) {
243 active_ctx->schedule( consumer_ctx);
247 // expected == -1: notify after timeout, same timed-wait op.
248 // expected == <any>: notify after timeout, another timed-wait op. was already started
249 intrusive_ptr_release( consumer_ctx);
253 return channel_op_status::success;
258 channel_op_status push( value_type && value) {
259 context * active_ctx = context::active();
261 detail::spinlock_lock lk{ splk_ };
262 if ( BOOST_UNLIKELY( is_closed_() ) ) {
263 return channel_op_status::closed;
264 } else if ( is_full_() ) {
265 active_ctx->wait_link( waiting_producers_);
266 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
267 // suspend this producer
268 active_ctx->suspend( lk);
270 slots_[pidx_] = std::move( value);
271 pidx_ = (pidx_ + 1) % capacity_;
272 // notify one waiting consumer
273 while ( ! waiting_consumers_.empty() ) {
274 context * consumer_ctx = & waiting_consumers_.front();
275 waiting_consumers_.pop_front();
277 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
278 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
279 // notify before timeout
280 intrusive_ptr_release( consumer_ctx);
282 active_ctx->schedule( consumer_ctx);
284 } else if ( static_cast< std::intptr_t >( 0) == expected) {
287 active_ctx->schedule( consumer_ctx);
291 // expected == -1: notify after timeout, same timed-wait op.
292 // expected == <any>: notify after timeout, another timed-wait op. was already started
293 intrusive_ptr_release( consumer_ctx);
297 return channel_op_status::success;
302 template< typename Rep, typename Period >
303 channel_op_status push_wait_for( value_type const& value,
304 std::chrono::duration< Rep, Period > const& timeout_duration) {
305 return push_wait_until( value,
306 std::chrono::steady_clock::now() + timeout_duration);
309 template< typename Rep, typename Period >
310 channel_op_status push_wait_for( value_type && value,
311 std::chrono::duration< Rep, Period > const& timeout_duration) {
312 return push_wait_until( std::forward< value_type >( value),
313 std::chrono::steady_clock::now() + timeout_duration);
316 template< typename Clock, typename Duration >
317 channel_op_status push_wait_until( value_type const& value,
318 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
319 context * active_ctx = context::active();
320 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
322 detail::spinlock_lock lk{ splk_ };
323 if ( BOOST_UNLIKELY( is_closed_() ) ) {
324 return channel_op_status::closed;
325 } else if ( is_full_() ) {
326 active_ctx->wait_link( waiting_producers_);
327 intrusive_ptr_add_ref( active_ctx);
328 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
329 // suspend this producer
330 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
333 // remove from waiting-queue
334 waiting_producers_.remove( * active_ctx);
335 return channel_op_status::timeout;
338 slots_[pidx_] = value;
339 pidx_ = (pidx_ + 1) % capacity_;
340 // notify one waiting consumer
341 while ( ! waiting_consumers_.empty() ) {
342 context * consumer_ctx = & waiting_consumers_.front();
343 waiting_consumers_.pop_front();
345 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
346 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
347 // notify before timeout
348 intrusive_ptr_release( consumer_ctx);
350 active_ctx->schedule( consumer_ctx);
352 } else if ( static_cast< std::intptr_t >( 0) == expected) {
355 active_ctx->schedule( consumer_ctx);
359 // expected == -1: notify after timeout, same timed-wait op.
360 // expected == <any>: notify after timeout, another timed-wait op. was already started
361 intrusive_ptr_release( consumer_ctx);
365 return channel_op_status::success;
370 template< typename Clock, typename Duration >
371 channel_op_status push_wait_until( value_type && value,
372 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
373 context * active_ctx = context::active();
374 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
376 detail::spinlock_lock lk{ splk_ };
377 if ( BOOST_UNLIKELY( is_closed_() ) ) {
378 return channel_op_status::closed;
379 } else if ( is_full_() ) {
380 active_ctx->wait_link( waiting_producers_);
381 intrusive_ptr_add_ref( active_ctx);
382 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
383 // suspend this producer
384 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
387 // remove from waiting-queue
388 waiting_producers_.remove( * active_ctx);
389 return channel_op_status::timeout;
392 slots_[pidx_] = std::move( value);
393 pidx_ = (pidx_ + 1) % capacity_;
394 // notify one waiting consumer
395 while ( ! waiting_consumers_.empty() ) {
396 context * consumer_ctx = & waiting_consumers_.front();
397 waiting_consumers_.pop_front();
399 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
400 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
401 // notify before timeout
402 intrusive_ptr_release( consumer_ctx);
404 active_ctx->schedule( consumer_ctx);
406 } else if ( static_cast< std::intptr_t >( 0) == expected) {
409 active_ctx->schedule( consumer_ctx);
413 // expected == -1: notify after timeout, same timed-wait op.
414 // expected == <any>: notify after timeout, another timed-wait op. was already started
415 intrusive_ptr_release( consumer_ctx);
419 return channel_op_status::success;
424 channel_op_status try_pop( value_type & value) {
425 context * active_ctx = context::active();
426 detail::spinlock_lock lk{ splk_ };
429 ? channel_op_status::closed
430 : channel_op_status::empty;
432 value = std::move( slots_[cidx_]);
433 cidx_ = (cidx_ + 1) % capacity_;
434 // notify one waiting producer
435 while ( ! waiting_producers_.empty() ) {
436 context * producer_ctx = & waiting_producers_.front();
437 waiting_producers_.pop_front();
439 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
440 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
441 // notify before timeout
442 intrusive_ptr_release( producer_ctx);
444 active_ctx->schedule( producer_ctx);
446 } else if ( static_cast< std::intptr_t >( 0) == expected) {
449 active_ctx->schedule( producer_ctx);
453 // expected == -1: notify after timeout, same timed-wait op.
454 // expected == <any>: notify after timeout, another timed-wait op. was already started
455 intrusive_ptr_release( producer_ctx);
459 return channel_op_status::success;
463 channel_op_status pop( value_type & value) {
464 context * active_ctx = context::active();
466 detail::spinlock_lock lk{ splk_ };
468 if ( BOOST_UNLIKELY( is_closed_() ) ) {
469 return channel_op_status::closed;
471 active_ctx->wait_link( waiting_consumers_);
472 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
473 // suspend this consumer
474 active_ctx->suspend( lk);
477 value = std::move( slots_[cidx_]);
478 cidx_ = (cidx_ + 1) % capacity_;
479 // notify one waiting producer
480 while ( ! waiting_producers_.empty() ) {
481 context * producer_ctx = & waiting_producers_.front();
482 waiting_producers_.pop_front();
484 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
485 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
486 // notify before timeout
487 intrusive_ptr_release( producer_ctx);
489 active_ctx->schedule( producer_ctx);
491 } else if ( static_cast< std::intptr_t >( 0) == expected) {
494 active_ctx->schedule( producer_ctx);
498 // expected == -1: notify after timeout, same timed-wait op.
499 // expected == <any>: notify after timeout, another timed-wait op. was already started
500 intrusive_ptr_release( producer_ctx);
504 return channel_op_status::success;
509 value_type value_pop() {
510 context * active_ctx = context::active();
512 detail::spinlock_lock lk{ splk_ };
514 if ( BOOST_UNLIKELY( is_closed_() ) ) {
516 std::make_error_code( std::errc::operation_not_permitted),
517 "boost fiber: channel is closed" };
519 active_ctx->wait_link( waiting_consumers_);
520 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
521 // suspend this consumer
522 active_ctx->suspend( lk);
525 value_type value = std::move( slots_[cidx_]);
526 cidx_ = (cidx_ + 1) % capacity_;
527 // notify one waiting producer
528 while ( ! waiting_producers_.empty() ) {
529 context * producer_ctx = & waiting_producers_.front();
530 waiting_producers_.pop_front();
532 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
533 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
534 // notify before timeout
535 intrusive_ptr_release( producer_ctx);
537 active_ctx->schedule( producer_ctx);
539 } else if ( static_cast< std::intptr_t >( 0) == expected) {
542 active_ctx->schedule( producer_ctx);
546 // expected == -1: notify after timeout, same timed-wait op.
547 // expected == <any>: notify after timeout, another timed-wait op. was already started
548 intrusive_ptr_release( producer_ctx);
552 return std::move( value);
557 template< typename Rep, typename Period >
558 channel_op_status pop_wait_for( value_type & value,
559 std::chrono::duration< Rep, Period > const& timeout_duration) {
560 return pop_wait_until( value,
561 std::chrono::steady_clock::now() + timeout_duration);
564 template< typename Clock, typename Duration >
565 channel_op_status pop_wait_until( value_type & value,
566 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
567 context * active_ctx = context::active();
568 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
570 detail::spinlock_lock lk{ splk_ };
572 if ( BOOST_UNLIKELY( is_closed_() ) ) {
573 return channel_op_status::closed;
575 active_ctx->wait_link( waiting_consumers_);
576 intrusive_ptr_add_ref( active_ctx);
577 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
578 // suspend this consumer
579 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
582 // remove from waiting-queue
583 waiting_consumers_.remove( * active_ctx);
584 return channel_op_status::timeout;
588 value = std::move( slots_[cidx_]);
589 cidx_ = (cidx_ + 1) % capacity_;
590 // notify one waiting producer
591 while ( ! waiting_producers_.empty() ) {
592 context * producer_ctx = & waiting_producers_.front();
593 waiting_producers_.pop_front();
595 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
596 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
597 // notify before timeout
598 intrusive_ptr_release( producer_ctx);
600 active_ctx->schedule( producer_ctx);
602 } else if ( static_cast< std::intptr_t >( 0) == expected) {
605 active_ctx->schedule( producer_ctx);
609 // expected == -1: notify after timeout, same timed-wait op.
610 // expected == <any>: notify after timeout, another timed-wait op. was already started
611 intrusive_ptr_release( producer_ctx);
615 return channel_op_status::success;
620 class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > {
622 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
624 buffered_channel * chan_{ nullptr };
625 storage_type storage_;
628 BOOST_ASSERT( nullptr != chan_);
630 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
631 } catch ( fiber_error const&) {
637 typedef typename iterator::pointer pointer_t;
638 typedef typename iterator::reference reference_t;
640 iterator() noexcept = default;
642 explicit iterator( buffered_channel< T > * chan) noexcept :
647 iterator( iterator const& other) noexcept :
648 chan_{ other.chan_ } {
651 iterator & operator=( iterator const& other) noexcept {
652 if ( BOOST_LIKELY( this != & other) ) {
658 bool operator==( iterator const& other) const noexcept {
659 return other.chan_ == chan_;
662 bool operator!=( iterator const& other) const noexcept {
663 return other.chan_ != chan_;
666 iterator & operator++() {
671 iterator operator++( int) = delete;
673 reference_t operator*() noexcept {
674 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
677 pointer_t operator->() noexcept {
678 return reinterpret_cast< value_type * >( std::addressof( storage_) );
682 friend class iterator;
685 template< typename T >
686 typename buffered_channel< T >::iterator
687 begin( buffered_channel< T > & chan) {
688 return typename buffered_channel< T >::iterator( & chan);
691 template< typename T >
692 typename buffered_channel< T >::iterator
693 end( buffered_channel< T > &) {
694 return typename buffered_channel< T >::iterator();
699 #ifdef BOOST_HAS_ABI_HEADERS
700 # include BOOST_ABI_SUFFIX
703 #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H