2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
9 #define BOOST_FIBERS_BUFFERED_CHANNEL_H
16 #include <type_traits>
18 #include <boost/config.hpp>
20 #include <boost/fiber/channel_op_status.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/waker.hpp>
23 #include <boost/fiber/detail/config.hpp>
24 #include <boost/fiber/detail/convert.hpp>
25 #include <boost/fiber/detail/spinlock.hpp>
26 #include <boost/fiber/exceptions.hpp>
28 #ifdef BOOST_HAS_ABI_HEADERS
29 # include BOOST_ABI_PREFIX
35 template< typename T >
36 class buffered_channel {
38 using value_type = typename std::remove_reference<T>::type;
41 using slot_type = value_type;
43 mutable detail::spinlock splk_{};
44 wait_queue waiting_producers_{};
45 wait_queue waiting_consumers_{};
47 std::size_t pidx_{ 0 };
48 std::size_t cidx_{ 0 };
49 std::size_t capacity_;
50 bool closed_{ false };
52 bool is_full_() const noexcept {
53 return cidx_ == ((pidx_ + 1) % capacity_);
56 bool is_empty_() const noexcept {
57 return cidx_ == pidx_;
60 bool is_closed_() const noexcept {
65 explicit buffered_channel( std::size_t capacity) :
66 capacity_{ capacity } {
67 if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
68 throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
69 "boost fiber: buffer capacity is invalid" };
71 slots_ = new slot_type[capacity_];
79 buffered_channel( buffered_channel const&) = delete;
80 buffered_channel & operator=( buffered_channel const&) = delete;
82 bool is_closed() const noexcept {
83 detail::spinlock_lock lk{ splk_ };
87 void close() noexcept {
88 detail::spinlock_lock lk{ splk_ };
91 waiting_producers_.notify_all();
92 waiting_consumers_.notify_all();
96 channel_op_status try_push( value_type const& value) {
97 detail::spinlock_lock lk{ splk_ };
98 if ( BOOST_UNLIKELY( is_closed_() ) ) {
99 return channel_op_status::closed;
102 return channel_op_status::full;
104 slots_[pidx_] = value;
105 pidx_ = (pidx_ + 1) % capacity_;
106 waiting_consumers_.notify_one();
107 return channel_op_status::success;
110 channel_op_status try_push( value_type && value) {
111 detail::spinlock_lock lk{ splk_ };
112 if ( BOOST_UNLIKELY( is_closed_() ) ) {
113 return channel_op_status::closed;
116 return channel_op_status::full;
118 slots_[pidx_] = std::move( value);
119 pidx_ = (pidx_ + 1) % capacity_;
120 waiting_consumers_.notify_one();
121 return channel_op_status::success;
124 channel_op_status push( value_type const& value) {
125 context * active_ctx = context::active();
127 detail::spinlock_lock lk{ splk_ };
128 if ( BOOST_UNLIKELY( is_closed_() ) ) {
129 return channel_op_status::closed;
132 waiting_producers_.suspend_and_wait( lk, active_ctx);
134 slots_[pidx_] = value;
135 pidx_ = (pidx_ + 1) % capacity_;
136 waiting_consumers_.notify_one();
137 return channel_op_status::success;
142 channel_op_status push( value_type && value) {
143 context * active_ctx = context::active();
145 detail::spinlock_lock lk{ splk_ };
146 if ( BOOST_UNLIKELY( is_closed_() ) ) {
147 return channel_op_status::closed;
150 waiting_producers_.suspend_and_wait( lk, active_ctx);
152 slots_[pidx_] = std::move( value);
153 pidx_ = (pidx_ + 1) % capacity_;
155 waiting_consumers_.notify_one();
156 return channel_op_status::success;
161 template< typename Rep, typename Period >
162 channel_op_status push_wait_for( value_type const& value,
163 std::chrono::duration< Rep, Period > const& timeout_duration) {
164 return push_wait_until( value,
165 std::chrono::steady_clock::now() + timeout_duration);
168 template< typename Rep, typename Period >
169 channel_op_status push_wait_for( value_type && value,
170 std::chrono::duration< Rep, Period > const& timeout_duration) {
171 return push_wait_until( std::forward< value_type >( value),
172 std::chrono::steady_clock::now() + timeout_duration);
175 template< typename Clock, typename Duration >
176 channel_op_status push_wait_until( value_type const& value,
177 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
178 context * active_ctx = context::active();
179 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
181 detail::spinlock_lock lk{ splk_ };
182 if ( BOOST_UNLIKELY( is_closed_() ) ) {
183 return channel_op_status::closed;
186 if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
187 return channel_op_status::timeout;
190 slots_[pidx_] = value;
191 pidx_ = (pidx_ + 1) % capacity_;
192 waiting_consumers_.notify_one();
193 return channel_op_status::success;
198 template< typename Clock, typename Duration >
199 channel_op_status push_wait_until( value_type && value,
200 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
201 context * active_ctx = context::active();
202 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
204 detail::spinlock_lock lk{ splk_ };
205 if ( BOOST_UNLIKELY( is_closed_() ) ) {
206 return channel_op_status::closed;
209 if ( ! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
210 return channel_op_status::timeout;
213 slots_[pidx_] = std::move( value);
214 pidx_ = (pidx_ + 1) % capacity_;
215 // notify one waiting consumer
216 waiting_consumers_.notify_one();
217 return channel_op_status::success;
222 channel_op_status try_pop( value_type & value) {
223 detail::spinlock_lock lk{ splk_ };
226 ? channel_op_status::closed
227 : channel_op_status::empty;
229 value = std::move( slots_[cidx_]);
230 cidx_ = (cidx_ + 1) % capacity_;
231 waiting_producers_.notify_one();
232 return channel_op_status::success;
235 channel_op_status pop( value_type & value) {
236 context * active_ctx = context::active();
238 detail::spinlock_lock lk{ splk_ };
240 if ( BOOST_UNLIKELY( is_closed_() ) ) {
241 return channel_op_status::closed;
243 waiting_consumers_.suspend_and_wait( lk, active_ctx);
245 value = std::move( slots_[cidx_]);
246 cidx_ = (cidx_ + 1) % capacity_;
247 waiting_producers_.notify_one();
248 return channel_op_status::success;
253 value_type value_pop() {
254 context * active_ctx = context::active();
256 detail::spinlock_lock lk{ splk_ };
258 if ( BOOST_UNLIKELY( is_closed_() ) ) {
260 std::make_error_code( std::errc::operation_not_permitted),
261 "boost fiber: channel is closed" };
263 waiting_consumers_.suspend_and_wait( lk, active_ctx);
265 value_type value = std::move( slots_[cidx_]);
266 cidx_ = (cidx_ + 1) % capacity_;
267 waiting_producers_.notify_one();
273 template< typename Rep, typename Period >
274 channel_op_status pop_wait_for( value_type & value,
275 std::chrono::duration< Rep, Period > const& timeout_duration) {
276 return pop_wait_until( value,
277 std::chrono::steady_clock::now() + timeout_duration);
280 template< typename Clock, typename Duration >
281 channel_op_status pop_wait_until( value_type & value,
282 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
283 context * active_ctx = context::active();
284 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
286 detail::spinlock_lock lk{ splk_ };
288 if ( BOOST_UNLIKELY( is_closed_() ) ) {
289 return channel_op_status::closed;
291 if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
292 return channel_op_status::timeout;
295 value = std::move( slots_[cidx_]);
296 cidx_ = (cidx_ + 1) % capacity_;
297 waiting_producers_.notify_one();
298 return channel_op_status::success;
305 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
307 buffered_channel * chan_{ nullptr };
308 storage_type storage_;
310 void increment_( bool initial = false) {
311 BOOST_ASSERT( nullptr != chan_);
314 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
316 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
317 } catch ( fiber_error const&) {
323 using iterator_category = std::input_iterator_tag;
324 using difference_type = std::ptrdiff_t;
325 using pointer = value_type *;
326 using reference = value_type &;
328 using pointer_t = pointer;
329 using reference_t = reference;
331 iterator() = default;
333 explicit iterator( buffered_channel< T > * chan) noexcept :
338 iterator( iterator const& other) noexcept :
339 chan_{ other.chan_ } {
342 iterator & operator=( iterator const& other) noexcept {
343 if ( BOOST_LIKELY( this != & other) ) {
349 bool operator==( iterator const& other) const noexcept {
350 return other.chan_ == chan_;
353 bool operator!=( iterator const& other) const noexcept {
354 return other.chan_ != chan_;
357 iterator & operator++() {
358 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
363 const iterator operator++( int) = delete;
365 reference_t operator*() noexcept {
366 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
369 pointer_t operator->() noexcept {
370 return reinterpret_cast< value_type * >( std::addressof( storage_) );
374 friend class iterator;
377 template< typename T >
378 typename buffered_channel< T >::iterator
379 begin( buffered_channel< T > & chan) {
380 return typename buffered_channel< T >::iterator( & chan);
383 template< typename T >
384 typename buffered_channel< T >::iterator
385 end( buffered_channel< T > &) {
386 return typename buffered_channel< T >::iterator();
391 #ifdef BOOST_HAS_ABI_HEADERS
392 # include BOOST_ABI_SUFFIX
395 #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H