]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | // Copyright Oliver Kowalke 2013. | |
3 | // Distributed under the Boost Software License, Version 1.0. | |
4 | // (See accompanying file LICENSE_1_0.txt or copy at | |
5 | // http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | ||
8 | #ifndef BOOST_FIBERS_UNBOUNDED_CHANNEL_H | |
9 | #define BOOST_FIBERS_UNBOUNDED_CHANNEL_H | |
10 | ||
11 | #include <atomic> | |
12 | #include <algorithm> | |
13 | #include <chrono> | |
14 | #include <cstddef> | |
15 | #include <deque> | |
16 | #include <memory> | |
17 | #include <mutex> | |
18 | #include <utility> | |
19 | ||
20 | #include <boost/config.hpp> | |
21 | #include <boost/intrusive_ptr.hpp> | |
22 | ||
23 | #include <boost/fiber/detail/config.hpp> | |
24 | #include <boost/fiber/channel_op_status.hpp> | |
25 | #include <boost/fiber/condition_variable.hpp> | |
26 | #include <boost/fiber/detail/convert.hpp> | |
27 | #include <boost/fiber/exceptions.hpp> | |
28 | #include <boost/fiber/mutex.hpp> | |
29 | #include <boost/fiber/operations.hpp> | |
30 | ||
31 | #ifdef BOOST_HAS_ABI_HEADERS | |
32 | # include BOOST_ABI_PREFIX | |
33 | #endif | |
34 | ||
35 | namespace boost { | |
36 | namespace fibers { | |
37 | ||
38 | template< typename T, | |
39 | typename Allocator = std::allocator< T > | |
40 | > | |
41 | class unbounded_channel { | |
42 | public: | |
43 | typedef T value_type; | |
44 | ||
45 | private: | |
46 | struct node { | |
47 | typedef intrusive_ptr< node > ptr_t; | |
48 | typedef typename std::allocator_traits< Allocator >::template rebind_alloc< | |
49 | node | |
50 | > allocator_t; | |
51 | typedef std::allocator_traits< allocator_t > allocator_traits_t; | |
52 | ||
53 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) | |
54 | std::atomic< std::size_t > use_count{ 0 }; | |
55 | #else | |
56 | std::size_t use_count{ 0 }; | |
57 | #endif | |
58 | allocator_t alloc; | |
59 | T va; | |
60 | ptr_t nxt{}; | |
61 | ||
62 | node( T const& t, allocator_t const& alloc_) noexcept : | |
63 | alloc{ alloc_ }, | |
64 | va{ t } { | |
65 | } | |
66 | ||
67 | node( T && t, allocator_t const& alloc_) noexcept : | |
68 | alloc{ alloc_ }, | |
69 | va{ std::move( t) } { | |
70 | } | |
71 | ||
72 | friend | |
73 | void intrusive_ptr_add_ref( node * p) noexcept { | |
74 | ++p->use_count; | |
75 | } | |
76 | ||
77 | friend | |
78 | void intrusive_ptr_release( node * p) noexcept { | |
79 | if ( 0 == --p->use_count) { | |
80 | allocator_t alloc( p->alloc); | |
81 | allocator_traits_t::destroy( alloc, p); | |
82 | allocator_traits_t::deallocate( alloc, p, 1); | |
83 | } | |
84 | } | |
85 | }; | |
86 | ||
87 | using ptr_t = typename node::ptr_t; | |
88 | using allocator_t = typename node::allocator_t; | |
89 | using allocator_traits_t = typename node::allocator_traits_t; | |
90 | ||
91 | enum class queue_status { | |
92 | open = 0, | |
93 | closed | |
94 | }; | |
95 | ||
96 | allocator_t alloc_; | |
97 | queue_status state_{ queue_status::open }; | |
98 | ptr_t head_{}; | |
99 | ptr_t * tail_; | |
100 | mutable mutex mtx_{}; | |
101 | condition_variable not_empty_cond_{}; | |
102 | ||
103 | bool is_closed_() const noexcept { | |
104 | return queue_status::closed == state_; | |
105 | } | |
106 | ||
107 | void close_( std::unique_lock< mutex > & lk) noexcept { | |
108 | state_ = queue_status::closed; | |
109 | lk.unlock(); | |
110 | not_empty_cond_.notify_all(); | |
111 | } | |
112 | ||
113 | bool is_empty_() const noexcept { | |
114 | return ! head_; | |
115 | } | |
116 | ||
117 | channel_op_status push_( ptr_t new_node, | |
118 | std::unique_lock< mutex > & lk) noexcept { | |
119 | if ( is_closed_() ) { | |
120 | return channel_op_status::closed; | |
121 | } | |
122 | return push_and_notify_( new_node, lk); | |
123 | } | |
124 | ||
125 | channel_op_status push_and_notify_( ptr_t new_node, | |
126 | std::unique_lock< mutex > & lk) noexcept { | |
127 | push_tail_( new_node); | |
128 | lk.unlock(); | |
129 | not_empty_cond_.notify_one(); | |
130 | return channel_op_status::success; | |
131 | } | |
132 | ||
133 | void push_tail_( ptr_t new_node) noexcept { | |
134 | * tail_ = new_node; | |
135 | tail_ = & new_node->nxt; | |
136 | } | |
137 | ||
138 | value_type value_pop_( std::unique_lock< mutex > & lk) { | |
139 | BOOST_ASSERT( ! is_empty_() ); | |
140 | auto old_head = pop_head_(); | |
141 | return std::move( old_head->va); | |
142 | } | |
143 | ||
144 | ptr_t pop_head_() noexcept { | |
145 | auto old_head = head_; | |
146 | head_ = old_head->nxt; | |
147 | if ( ! head_) { | |
148 | tail_ = & head_; | |
149 | } | |
150 | old_head->nxt.reset(); | |
151 | return old_head; | |
152 | } | |
153 | ||
154 | public: | |
155 | explicit unbounded_channel( Allocator const& alloc = Allocator() ) noexcept : | |
156 | alloc_{ alloc }, | |
157 | tail_{ & head_ } { | |
158 | } | |
159 | ||
160 | unbounded_channel( unbounded_channel const&) = delete; | |
161 | unbounded_channel & operator=( unbounded_channel const&) = delete; | |
162 | ||
163 | void close() noexcept { | |
164 | std::unique_lock< mutex > lk( mtx_); | |
165 | close_( lk); | |
166 | } | |
167 | ||
168 | channel_op_status push( value_type const& va) { | |
169 | typename allocator_traits_t::pointer ptr{ | |
170 | allocator_traits_t::allocate( alloc_, 1) }; | |
171 | try { | |
172 | allocator_traits_t::construct( alloc_, ptr, va, alloc_); | |
173 | } catch (...) { | |
174 | allocator_traits_t::deallocate( alloc_, ptr, 1); | |
175 | throw; | |
176 | } | |
177 | std::unique_lock< mutex > lk( mtx_); | |
178 | return push_( { detail::convert( ptr) }, lk); | |
179 | } | |
180 | ||
181 | channel_op_status push( value_type && va) { | |
182 | typename allocator_traits_t::pointer ptr{ | |
183 | allocator_traits_t::allocate( alloc_, 1) }; | |
184 | try { | |
185 | allocator_traits_t::construct( | |
186 | alloc_, ptr, std::move( va), alloc_); | |
187 | } catch (...) { | |
188 | allocator_traits_t::deallocate( alloc_, ptr, 1); | |
189 | throw; | |
190 | } | |
191 | std::unique_lock< mutex > lk( mtx_); | |
192 | return push_( { detail::convert( ptr) }, lk); | |
193 | } | |
194 | ||
195 | channel_op_status pop( value_type & va) { | |
196 | std::unique_lock< mutex > lk( mtx_); | |
197 | not_empty_cond_.wait( lk, | |
198 | [this](){ | |
199 | return is_closed_() || ! is_empty_(); | |
200 | }); | |
201 | if ( is_closed_() && is_empty_() ) { | |
202 | return channel_op_status::closed; | |
203 | } | |
204 | va = value_pop_( lk); | |
205 | return channel_op_status::success; | |
206 | } | |
207 | ||
208 | value_type value_pop() { | |
209 | std::unique_lock< mutex > lk( mtx_); | |
210 | not_empty_cond_.wait( lk, | |
211 | [this](){ | |
212 | return is_closed_() || ! is_empty_(); | |
213 | }); | |
214 | if ( is_closed_() && is_empty_() ) { | |
215 | throw fiber_error( | |
216 | std::make_error_code( std::errc::operation_not_permitted), | |
217 | "boost fiber: queue is closed"); | |
218 | } | |
219 | return value_pop_( lk); | |
220 | } | |
221 | ||
222 | channel_op_status try_pop( value_type & va) { | |
223 | std::unique_lock< mutex > lk( mtx_); | |
224 | if ( is_closed_() && is_empty_() ) { | |
225 | // let other fibers run | |
226 | lk.unlock(); | |
227 | this_fiber::yield(); | |
228 | return channel_op_status::closed; | |
229 | } | |
230 | if ( is_empty_() ) { | |
231 | // let other fibers run | |
232 | lk.unlock(); | |
233 | this_fiber::yield(); | |
234 | return channel_op_status::empty; | |
235 | } | |
236 | va = value_pop_( lk); | |
237 | return channel_op_status::success; | |
238 | } | |
239 | ||
240 | template< typename Rep, typename Period > | |
241 | channel_op_status pop_wait_for( value_type & va, | |
242 | std::chrono::duration< Rep, Period > const& timeout_duration) { | |
243 | return pop_wait_until( va, std::chrono::steady_clock::now() + timeout_duration); | |
244 | } | |
245 | ||
246 | template< typename Clock, typename Duration > | |
247 | channel_op_status pop_wait_until( value_type & va, | |
248 | std::chrono::time_point< Clock, Duration > const& timeout_time) { | |
249 | std::unique_lock< mutex > lk( mtx_); | |
250 | if ( ! not_empty_cond_.wait_until( lk, timeout_time, | |
251 | [this](){ | |
252 | return is_closed_() || ! is_empty_(); | |
253 | })) { | |
254 | return channel_op_status::timeout; | |
255 | } | |
256 | if ( is_closed_() && is_empty_() ) { | |
257 | return channel_op_status::closed; | |
258 | } | |
259 | va = value_pop_( lk); | |
260 | return channel_op_status::success; | |
261 | } | |
262 | }; | |
263 | ||
264 | }} | |
265 | ||
266 | #ifdef BOOST_HAS_ABI_HEADERS | |
267 | # include BOOST_ABI_SUFFIX | |
268 | #endif | |
269 | ||
270 | #endif // BOOST_FIBERS_UNBOUNDED_CHANNEL_H |