2 // Copyright Oliver Kowalke 2013.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BOOST_FIBERS_UNBOUNDED_CHANNEL_H
9 #define BOOST_FIBERS_UNBOUNDED_CHANNEL_H
20 #include <boost/config.hpp>
21 #include <boost/intrusive_ptr.hpp>
23 #include <boost/fiber/detail/config.hpp>
24 #include <boost/fiber/channel_op_status.hpp>
25 #include <boost/fiber/condition_variable.hpp>
26 #include <boost/fiber/detail/convert.hpp>
27 #include <boost/fiber/exceptions.hpp>
28 #include <boost/fiber/mutex.hpp>
29 #include <boost/fiber/operations.hpp>
31 #ifdef BOOST_HAS_ABI_HEADERS
32 # include BOOST_ABI_PREFIX
39 typename Allocator = std::allocator< T >
41 class unbounded_channel {
47 typedef intrusive_ptr< node > ptr_t;
48 typedef typename std::allocator_traits< Allocator >::template rebind_alloc<
51 typedef std::allocator_traits< allocator_t > allocator_traits_t;
53 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
54 std::atomic< std::size_t > use_count{ 0 };
56 std::size_t use_count{ 0 };
62 node( T const& t, allocator_t const& alloc_) noexcept :
67 node( T && t, allocator_t const& alloc_) noexcept :
73 void intrusive_ptr_add_ref( node * p) noexcept {
78 void intrusive_ptr_release( node * p) noexcept {
79 if ( 0 == --p->use_count) {
80 allocator_t alloc( p->alloc);
81 allocator_traits_t::destroy( alloc, p);
82 allocator_traits_t::deallocate( alloc, p, 1);
87 using ptr_t = typename node::ptr_t;
88 using allocator_t = typename node::allocator_t;
89 using allocator_traits_t = typename node::allocator_traits_t;
91 enum class queue_status {
97 queue_status state_{ queue_status::open };
100 mutable mutex mtx_{};
101 condition_variable not_empty_cond_{};
103 bool is_closed_() const noexcept {
104 return queue_status::closed == state_;
107 void close_( std::unique_lock< mutex > & lk) noexcept {
108 state_ = queue_status::closed;
110 not_empty_cond_.notify_all();
113 bool is_empty_() const noexcept {
117 channel_op_status push_( ptr_t new_node,
118 std::unique_lock< mutex > & lk) noexcept {
119 if ( is_closed_() ) {
120 return channel_op_status::closed;
122 return push_and_notify_( new_node, lk);
125 channel_op_status push_and_notify_( ptr_t new_node,
126 std::unique_lock< mutex > & lk) noexcept {
127 push_tail_( new_node);
129 not_empty_cond_.notify_one();
130 return channel_op_status::success;
133 void push_tail_( ptr_t new_node) noexcept {
135 tail_ = & new_node->nxt;
138 value_type value_pop_( std::unique_lock< mutex > & lk) {
139 BOOST_ASSERT( ! is_empty_() );
140 auto old_head = pop_head_();
141 return std::move( old_head->va);
144 ptr_t pop_head_() noexcept {
145 auto old_head = head_;
146 head_ = old_head->nxt;
150 old_head->nxt.reset();
155 explicit unbounded_channel( Allocator const& alloc = Allocator() ) noexcept :
160 unbounded_channel( unbounded_channel const&) = delete;
161 unbounded_channel & operator=( unbounded_channel const&) = delete;
163 void close() noexcept {
164 std::unique_lock< mutex > lk( mtx_);
168 channel_op_status push( value_type const& va) {
169 typename allocator_traits_t::pointer ptr{
170 allocator_traits_t::allocate( alloc_, 1) };
172 allocator_traits_t::construct( alloc_, ptr, va, alloc_);
174 allocator_traits_t::deallocate( alloc_, ptr, 1);
177 std::unique_lock< mutex > lk( mtx_);
178 return push_( { detail::convert( ptr) }, lk);
181 channel_op_status push( value_type && va) {
182 typename allocator_traits_t::pointer ptr{
183 allocator_traits_t::allocate( alloc_, 1) };
185 allocator_traits_t::construct(
186 alloc_, ptr, std::move( va), alloc_);
188 allocator_traits_t::deallocate( alloc_, ptr, 1);
191 std::unique_lock< mutex > lk( mtx_);
192 return push_( { detail::convert( ptr) }, lk);
195 channel_op_status pop( value_type & va) {
196 std::unique_lock< mutex > lk( mtx_);
197 not_empty_cond_.wait( lk,
199 return is_closed_() || ! is_empty_();
201 if ( is_closed_() && is_empty_() ) {
202 return channel_op_status::closed;
204 va = value_pop_( lk);
205 return channel_op_status::success;
208 value_type value_pop() {
209 std::unique_lock< mutex > lk( mtx_);
210 not_empty_cond_.wait( lk,
212 return is_closed_() || ! is_empty_();
214 if ( is_closed_() && is_empty_() ) {
216 std::make_error_code( std::errc::operation_not_permitted),
217 "boost fiber: queue is closed");
219 return value_pop_( lk);
222 channel_op_status try_pop( value_type & va) {
223 std::unique_lock< mutex > lk( mtx_);
224 if ( is_closed_() && is_empty_() ) {
225 // let other fibers run
228 return channel_op_status::closed;
231 // let other fibers run
234 return channel_op_status::empty;
236 va = value_pop_( lk);
237 return channel_op_status::success;
240 template< typename Rep, typename Period >
241 channel_op_status pop_wait_for( value_type & va,
242 std::chrono::duration< Rep, Period > const& timeout_duration) {
243 return pop_wait_until( va, std::chrono::steady_clock::now() + timeout_duration);
246 template< typename Clock, typename Duration >
247 channel_op_status pop_wait_until( value_type & va,
248 std::chrono::time_point< Clock, Duration > const& timeout_time) {
249 std::unique_lock< mutex > lk( mtx_);
250 if ( ! not_empty_cond_.wait_until( lk, timeout_time,
252 return is_closed_() || ! is_empty_();
254 return channel_op_status::timeout;
256 if ( is_closed_() && is_empty_() ) {
257 return channel_op_status::closed;
259 va = value_pop_( lk);
260 return channel_op_status::success;
266 #ifdef BOOST_HAS_ABI_HEADERS
267 # include BOOST_ABI_SUFFIX
270 #endif // BOOST_FIBERS_UNBOUNDED_CHANNEL_H