2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
9 #define BOOST_FIBERS_BUFFERED_CHANNEL_H
16 #include <type_traits>
18 #include <boost/config.hpp>
20 #include <boost/fiber/channel_op_status.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/detail/config.hpp>
23 #include <boost/fiber/detail/convert.hpp>
24 #include <boost/fiber/detail/spinlock.hpp>
25 #include <boost/fiber/exceptions.hpp>
27 #ifdef BOOST_HAS_ABI_HEADERS
28 # include BOOST_ABI_PREFIX
34 template< typename T >
35 class buffered_channel {
37 using value_type = typename std::remove_reference<T>::type;
40 using wait_queue_type = context::wait_queue_t;
41 using slot_type = value_type;
43 mutable detail::spinlock splk_{};
44 wait_queue_type waiting_producers_{};
45 wait_queue_type waiting_consumers_{};
47 std::size_t pidx_{ 0 };
48 std::size_t cidx_{ 0 };
49 std::size_t capacity_;
50 bool closed_{ false };
52 bool is_full_() const noexcept {
53 return cidx_ == ((pidx_ + 1) % capacity_);
56 bool is_empty_() const noexcept {
57 return cidx_ == pidx_;
60 bool is_closed_() const noexcept {
65 explicit buffered_channel( std::size_t capacity) :
66 capacity_{ capacity } {
67 if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
68 throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
69 "boost fiber: buffer capacity is invalid" };
71 slots_ = new slot_type[capacity_];
79 buffered_channel( buffered_channel const&) = delete;
80 buffered_channel & operator=( buffered_channel const&) = delete;
82 bool is_closed() const noexcept {
83 detail::spinlock_lock lk{ splk_ };
87 void close() noexcept {
88 context * active_ctx = context::active();
89 detail::spinlock_lock lk{ splk_ };
92 // notify all waiting producers
93 while ( ! waiting_producers_.empty() ) {
94 context * producer_ctx = & waiting_producers_.front();
95 waiting_producers_.pop_front();
96 auto expected = reinterpret_cast< std::intptr_t >( this);
97 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
99 active_ctx->schedule( producer_ctx);
100 } else if ( static_cast< std::intptr_t >( 0) == expected) {
103 active_ctx->schedule( producer_ctx);
106 // notify all waiting consumers
107 while ( ! waiting_consumers_.empty() ) {
108 context * consumer_ctx = & waiting_consumers_.front();
109 waiting_consumers_.pop_front();
110 auto expected = reinterpret_cast< std::intptr_t >( this);
111 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
113 active_ctx->schedule( consumer_ctx);
114 } else if ( static_cast< std::intptr_t >( 0) == expected) {
117 active_ctx->schedule( consumer_ctx);
123 channel_op_status try_push( value_type const& value) {
124 context * active_ctx = context::active();
125 detail::spinlock_lock lk{ splk_ };
126 if ( BOOST_UNLIKELY( is_closed_() ) ) {
127 return channel_op_status::closed;
130 return channel_op_status::full;
132 slots_[pidx_] = value;
133 pidx_ = (pidx_ + 1) % capacity_;
134 // notify one waiting consumer
135 while ( ! waiting_consumers_.empty() ) {
136 context * consumer_ctx = & waiting_consumers_.front();
137 waiting_consumers_.pop_front();
138 auto expected = reinterpret_cast< std::intptr_t >( this);
139 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
142 active_ctx->schedule( consumer_ctx);
145 if ( static_cast< std::intptr_t >( 0) == expected) {
149 active_ctx->schedule( consumer_ctx);
153 return channel_op_status::success;
156 channel_op_status try_push( value_type && value) {
157 context * active_ctx = context::active();
158 detail::spinlock_lock lk{ splk_ };
159 if ( BOOST_UNLIKELY( is_closed_() ) ) {
160 return channel_op_status::closed;
163 return channel_op_status::full;
165 slots_[pidx_] = std::move( value);
166 pidx_ = (pidx_ + 1) % capacity_;
167 // notify one waiting consumer
168 while ( ! waiting_consumers_.empty() ) {
169 context * consumer_ctx = & waiting_consumers_.front();
170 waiting_consumers_.pop_front();
171 auto expected = reinterpret_cast< std::intptr_t >( this);
172 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
175 active_ctx->schedule( consumer_ctx);
178 if ( static_cast< std::intptr_t >( 0) == expected) {
182 active_ctx->schedule( consumer_ctx);
186 return channel_op_status::success;
189 channel_op_status push( value_type const& value) {
190 context * active_ctx = context::active();
192 detail::spinlock_lock lk{ splk_ };
193 if ( BOOST_UNLIKELY( is_closed_() ) ) {
194 return channel_op_status::closed;
197 active_ctx->wait_link( waiting_producers_);
198 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
199 // suspend this producer
200 active_ctx->suspend( lk);
202 slots_[pidx_] = value;
203 pidx_ = (pidx_ + 1) % capacity_;
204 // notify one waiting consumer
205 while ( ! waiting_consumers_.empty() ) {
206 context * consumer_ctx = & waiting_consumers_.front();
207 waiting_consumers_.pop_front();
208 auto expected = reinterpret_cast< std::intptr_t >( this);
209 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
212 active_ctx->schedule( consumer_ctx);
215 if ( static_cast< std::intptr_t >( 0) == expected) {
219 active_ctx->schedule( consumer_ctx);
223 return channel_op_status::success;
228 channel_op_status push( value_type && value) {
229 context * active_ctx = context::active();
231 detail::spinlock_lock lk{ splk_ };
232 if ( BOOST_UNLIKELY( is_closed_() ) ) {
233 return channel_op_status::closed;
236 active_ctx->wait_link( waiting_producers_);
237 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
238 // suspend this producer
239 active_ctx->suspend( lk);
241 slots_[pidx_] = std::move( value);
242 pidx_ = (pidx_ + 1) % capacity_;
243 // notify one waiting consumer
244 while ( ! waiting_consumers_.empty() ) {
245 context * consumer_ctx = & waiting_consumers_.front();
246 waiting_consumers_.pop_front();
247 auto expected = reinterpret_cast< std::intptr_t >( this);
248 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
251 active_ctx->schedule( consumer_ctx);
254 if ( static_cast< std::intptr_t >( 0) == expected) {
258 active_ctx->schedule( consumer_ctx);
262 return channel_op_status::success;
267 template< typename Rep, typename Period >
268 channel_op_status push_wait_for( value_type const& value,
269 std::chrono::duration< Rep, Period > const& timeout_duration) {
270 return push_wait_until( value,
271 std::chrono::steady_clock::now() + timeout_duration);
274 template< typename Rep, typename Period >
275 channel_op_status push_wait_for( value_type && value,
276 std::chrono::duration< Rep, Period > const& timeout_duration) {
277 return push_wait_until( std::forward< value_type >( value),
278 std::chrono::steady_clock::now() + timeout_duration);
281 template< typename Clock, typename Duration >
282 channel_op_status push_wait_until( value_type const& value,
283 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
284 context * active_ctx = context::active();
285 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
287 detail::spinlock_lock lk{ splk_ };
288 if ( BOOST_UNLIKELY( is_closed_() ) ) {
289 return channel_op_status::closed;
292 active_ctx->wait_link( waiting_producers_);
293 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
294 // suspend this producer
295 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
298 // remove from waiting-queue
299 waiting_producers_.remove( * active_ctx);
300 return channel_op_status::timeout;
303 slots_[pidx_] = value;
304 pidx_ = (pidx_ + 1) % capacity_;
305 // notify one waiting consumer
306 while ( ! waiting_consumers_.empty() ) {
307 context * consumer_ctx = & waiting_consumers_.front();
308 waiting_consumers_.pop_front();
309 auto expected = reinterpret_cast< std::intptr_t >( this);
310 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
313 active_ctx->schedule( consumer_ctx);
316 if ( static_cast< std::intptr_t >( 0) == expected) {
320 active_ctx->schedule( consumer_ctx);
324 return channel_op_status::success;
329 template< typename Clock, typename Duration >
330 channel_op_status push_wait_until( value_type && value,
331 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
332 context * active_ctx = context::active();
333 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
335 detail::spinlock_lock lk{ splk_ };
336 if ( BOOST_UNLIKELY( is_closed_() ) ) {
337 return channel_op_status::closed;
340 active_ctx->wait_link( waiting_producers_);
341 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
342 // suspend this producer
343 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
346 // remove from waiting-queue
347 waiting_producers_.remove( * active_ctx);
348 return channel_op_status::timeout;
351 slots_[pidx_] = std::move( value);
352 pidx_ = (pidx_ + 1) % capacity_;
353 // notify one waiting consumer
354 while ( ! waiting_consumers_.empty() ) {
355 context * consumer_ctx = & waiting_consumers_.front();
356 waiting_consumers_.pop_front();
357 auto expected = reinterpret_cast< std::intptr_t >( this);
358 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
361 active_ctx->schedule( consumer_ctx);
364 if ( static_cast< std::intptr_t >( 0) == expected) {
368 active_ctx->schedule( consumer_ctx);
372 return channel_op_status::success;
377 channel_op_status try_pop( value_type & value) {
378 context * active_ctx = context::active();
379 detail::spinlock_lock lk{ splk_ };
382 ? channel_op_status::closed
383 : channel_op_status::empty;
385 value = std::move( slots_[cidx_]);
386 cidx_ = (cidx_ + 1) % capacity_;
387 // notify one waiting producer
388 while ( ! waiting_producers_.empty() ) {
389 context * producer_ctx = & waiting_producers_.front();
390 waiting_producers_.pop_front();
391 auto expected = reinterpret_cast< std::intptr_t >( this);
392 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
395 active_ctx->schedule( producer_ctx);
398 if ( static_cast< std::intptr_t >( 0) == expected) {
402 active_ctx->schedule( producer_ctx);
406 return channel_op_status::success;
409 channel_op_status pop( value_type & value) {
410 context * active_ctx = context::active();
412 detail::spinlock_lock lk{ splk_ };
414 if ( BOOST_UNLIKELY( is_closed_() ) ) {
415 return channel_op_status::closed;
417 active_ctx->wait_link( waiting_consumers_);
418 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
419 // suspend this consumer
420 active_ctx->suspend( lk);
422 value = std::move( slots_[cidx_]);
423 cidx_ = (cidx_ + 1) % capacity_;
424 // notify one waiting producer
425 while ( ! waiting_producers_.empty() ) {
426 context * producer_ctx = & waiting_producers_.front();
427 waiting_producers_.pop_front();
428 auto expected = reinterpret_cast< std::intptr_t >( this);
429 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
432 active_ctx->schedule( producer_ctx);
435 if ( static_cast< std::intptr_t >( 0) == expected) {
439 active_ctx->schedule( producer_ctx);
443 return channel_op_status::success;
448 value_type value_pop() {
449 context * active_ctx = context::active();
451 detail::spinlock_lock lk{ splk_ };
453 if ( BOOST_UNLIKELY( is_closed_() ) ) {
455 std::make_error_code( std::errc::operation_not_permitted),
456 "boost fiber: channel is closed" };
458 active_ctx->wait_link( waiting_consumers_);
459 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
460 // suspend this consumer
461 active_ctx->suspend( lk);
463 value_type value = std::move( slots_[cidx_]);
464 cidx_ = (cidx_ + 1) % capacity_;
465 // notify one waiting producer
466 while ( ! waiting_producers_.empty() ) {
467 context * producer_ctx = & waiting_producers_.front();
468 waiting_producers_.pop_front();
469 auto expected = reinterpret_cast< std::intptr_t >( this);
470 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
473 active_ctx->schedule( producer_ctx);
476 if ( static_cast< std::intptr_t >( 0) == expected) {
480 active_ctx->schedule( producer_ctx);
489 template< typename Rep, typename Period >
490 channel_op_status pop_wait_for( value_type & value,
491 std::chrono::duration< Rep, Period > const& timeout_duration) {
492 return pop_wait_until( value,
493 std::chrono::steady_clock::now() + timeout_duration);
496 template< typename Clock, typename Duration >
497 channel_op_status pop_wait_until( value_type & value,
498 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
499 context * active_ctx = context::active();
500 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
502 detail::spinlock_lock lk{ splk_ };
504 if ( BOOST_UNLIKELY( is_closed_() ) ) {
505 return channel_op_status::closed;
507 active_ctx->wait_link( waiting_consumers_);
508 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
509 // suspend this consumer
510 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
513 // remove from waiting-queue
514 waiting_consumers_.remove( * active_ctx);
515 return channel_op_status::timeout;
518 value = std::move( slots_[cidx_]);
519 cidx_ = (cidx_ + 1) % capacity_;
520 // notify one waiting producer
521 while ( ! waiting_producers_.empty() ) {
522 context * producer_ctx = & waiting_producers_.front();
523 waiting_producers_.pop_front();
524 auto expected = reinterpret_cast< std::intptr_t >( this);
525 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
528 active_ctx->schedule( producer_ctx);
530 } if ( static_cast< std::intptr_t >( 0) == expected) {
534 active_ctx->schedule( producer_ctx);
538 return channel_op_status::success;
545 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
547 buffered_channel * chan_{ nullptr };
548 storage_type storage_;
551 BOOST_ASSERT( nullptr != chan_);
553 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
554 } catch ( fiber_error const&) {
560 using iterator_category = std::input_iterator_tag;
561 using difference_type = std::ptrdiff_t;
562 using pointer = value_type *;
563 using reference = value_type &;
565 using pointer_t = pointer;
566 using reference_t = reference;
568 iterator() noexcept = default;
570 explicit iterator( buffered_channel< T > * chan) noexcept :
575 iterator( iterator const& other) noexcept :
576 chan_{ other.chan_ } {
579 iterator & operator=( iterator const& other) noexcept {
580 if ( BOOST_LIKELY( this != & other) ) {
586 bool operator==( iterator const& other) const noexcept {
587 return other.chan_ == chan_;
590 bool operator!=( iterator const& other) const noexcept {
591 return other.chan_ != chan_;
594 iterator & operator++() {
595 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
600 const iterator operator++( int) = delete;
602 reference_t operator*() noexcept {
603 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
606 pointer_t operator->() noexcept {
607 return reinterpret_cast< value_type * >( std::addressof( storage_) );
611 friend class iterator;
614 template< typename T >
615 typename buffered_channel< T >::iterator
616 begin( buffered_channel< T > & chan) {
617 return typename buffered_channel< T >::iterator( & chan);
620 template< typename T >
621 typename buffered_channel< T >::iterator
622 end( buffered_channel< T > &) {
623 return typename buffered_channel< T >::iterator();
628 #ifdef BOOST_HAS_ABI_HEADERS
629 # include BOOST_ABI_SUFFIX
632 #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H