]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | // Copyright Oliver Kowalke 2013. | |
3 | // Distributed under the Boost Software License, Version 1.0. | |
4 | // (See accompanying file LICENSE_1_0.txt or copy at | |
5 | // http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | ||
8 | #ifndef BOOST_FIBERS_BOUNDED_CHANNEL_H | |
9 | #define BOOST_FIBERS_BOUNDED_CHANNEL_H | |
10 | ||
11 | #include <algorithm> | |
12 | #include <atomic> | |
13 | #include <chrono> | |
14 | #include <cstddef> | |
15 | #include <memory> | |
16 | #include <mutex> | |
17 | #include <system_error> | |
18 | #include <utility> | |
19 | ||
20 | #include <boost/config.hpp> | |
21 | #include <boost/intrusive_ptr.hpp> | |
22 | ||
23 | #include <boost/fiber/detail/config.hpp> | |
24 | #include <boost/fiber/exceptions.hpp> | |
25 | #include <boost/fiber/exceptions.hpp> | |
26 | #include <boost/fiber/condition_variable.hpp> | |
27 | #include <boost/fiber/mutex.hpp> | |
28 | #include <boost/fiber/channel_op_status.hpp> | |
29 | ||
30 | #ifdef BOOST_HAS_ABI_HEADERS | |
31 | # include BOOST_ABI_PREFIX | |
32 | #endif | |
33 | ||
34 | namespace boost { | |
35 | namespace fibers { | |
36 | ||
37 | template< typename T, | |
38 | typename Allocator = std::allocator< T > | |
39 | > | |
40 | class bounded_channel { | |
41 | public: | |
42 | typedef T value_type; | |
43 | ||
44 | private: | |
45 | struct node { | |
46 | typedef intrusive_ptr< node > ptr_t; | |
47 | typedef typename std::allocator_traits< Allocator >::template rebind_alloc< | |
48 | node | |
49 | > allocator_t; | |
50 | typedef std::allocator_traits< allocator_t > allocator_traits_t; | |
51 | ||
52 | #if ! defined(BOOST_FIBERS_NO_ATOMICS) | |
53 | std::atomic< std::size_t > use_count{ 0 }; | |
54 | #else | |
55 | std::size_t use_count{ 0 }; | |
56 | #endif | |
57 | allocator_t alloc; | |
58 | T va; | |
59 | ptr_t nxt{}; | |
60 | ||
61 | node( T const& t, allocator_t const& alloc_) noexcept : | |
62 | alloc{ alloc_ }, | |
63 | va{ t } { | |
64 | } | |
65 | ||
66 | node( T && t, allocator_t & alloc_) noexcept : | |
67 | alloc{ alloc_ }, | |
68 | va{ std::move( t) } { | |
69 | } | |
70 | ||
71 | friend | |
72 | void intrusive_ptr_add_ref( node * p) noexcept { | |
73 | ++p->use_count; | |
74 | } | |
75 | ||
76 | friend | |
77 | void intrusive_ptr_release( node * p) noexcept { | |
78 | if ( 0 == --p->use_count) { | |
79 | allocator_t alloc( p->alloc); | |
80 | allocator_traits_t::destroy( alloc, p); | |
81 | allocator_traits_t::deallocate( alloc, p, 1); | |
82 | } | |
83 | } | |
84 | }; | |
85 | ||
86 | using ptr_t = typename node::ptr_t; | |
87 | using allocator_t = typename node::allocator_t; | |
88 | using allocator_traits_t = typename node::allocator_traits_t; | |
89 | ||
90 | enum class queue_status { | |
91 | open = 0, | |
92 | closed | |
93 | }; | |
94 | ||
95 | allocator_t alloc_; | |
96 | queue_status state_{ queue_status::open }; | |
97 | std::size_t count_{ 0 }; | |
98 | ptr_t head_{}; | |
99 | ptr_t * tail_; | |
100 | mutable mutex mtx_{}; | |
101 | condition_variable not_empty_cond_{}; | |
102 | condition_variable not_full_cond_{}; | |
103 | std::size_t hwm_; | |
104 | std::size_t lwm_; | |
105 | ||
106 | bool is_closed_() const noexcept { | |
107 | return queue_status::closed == state_; | |
108 | } | |
109 | ||
110 | void close_( std::unique_lock< boost::fibers::mutex > & lk) noexcept { | |
111 | state_ = queue_status::closed; | |
112 | lk.unlock(); | |
113 | not_empty_cond_.notify_all(); | |
114 | not_full_cond_.notify_all(); | |
115 | } | |
116 | ||
117 | std::size_t size_() const noexcept { | |
118 | return count_; | |
119 | } | |
120 | ||
121 | bool is_empty_() const noexcept { | |
122 | return ! head_; | |
123 | } | |
124 | ||
125 | bool is_full_() const noexcept { | |
126 | return count_ >= hwm_; | |
127 | } | |
128 | ||
129 | channel_op_status push_( ptr_t new_node, | |
130 | std::unique_lock< boost::fibers::mutex > & lk) { | |
131 | if ( is_closed_() ) { | |
132 | return channel_op_status::closed; | |
133 | } | |
134 | not_full_cond_.wait( lk, | |
135 | [this](){ | |
136 | return ! is_full_(); | |
137 | }); | |
138 | return push_and_notify_( new_node, lk); | |
139 | } | |
140 | ||
141 | channel_op_status try_push_( ptr_t new_node, | |
142 | std::unique_lock< boost::fibers::mutex > & lk) noexcept { | |
143 | if ( is_closed_() ) { | |
144 | return channel_op_status::closed; | |
145 | } | |
146 | if ( is_full_() ) { | |
147 | return channel_op_status::full; | |
148 | } | |
149 | return push_and_notify_( new_node, lk); | |
150 | } | |
151 | ||
152 | template< typename Clock, typename Duration > | |
153 | channel_op_status push_wait_until_( ptr_t new_node, | |
154 | std::chrono::time_point< Clock, Duration > const& timeout_time, | |
155 | std::unique_lock< boost::fibers::mutex > & lk) { | |
156 | if ( is_closed_() ) { | |
157 | return channel_op_status::closed; | |
158 | } | |
159 | if ( ! not_full_cond_.wait_until( lk, timeout_time, | |
160 | [this](){ | |
161 | return ! is_full_(); | |
162 | })) { | |
163 | return channel_op_status::timeout; | |
164 | } | |
165 | return push_and_notify_( new_node, lk); | |
166 | } | |
167 | ||
168 | channel_op_status push_and_notify_( ptr_t new_node, | |
169 | std::unique_lock< boost::fibers::mutex > & lk) noexcept { | |
170 | push_tail_( new_node); | |
171 | lk.unlock(); | |
172 | not_empty_cond_.notify_one(); | |
173 | return channel_op_status::success; | |
174 | } | |
175 | ||
176 | void push_tail_( ptr_t new_node) noexcept { | |
177 | * tail_ = new_node; | |
178 | tail_ = & new_node->nxt; | |
179 | ++count_; | |
180 | } | |
181 | ||
182 | value_type value_pop_( std::unique_lock< boost::fibers::mutex > & lk) { | |
183 | BOOST_ASSERT( ! is_empty_() ); | |
184 | auto old_head = pop_head_(); | |
185 | if ( size_() <= lwm_) { | |
186 | if ( lwm_ == hwm_) { | |
187 | lk.unlock(); | |
188 | not_full_cond_.notify_one(); | |
189 | } else { | |
190 | lk.unlock(); | |
191 | // more than one producer could be waiting | |
192 | // to push a value | |
193 | not_full_cond_.notify_all(); | |
194 | } | |
195 | } | |
196 | return std::move( old_head->va); | |
197 | } | |
198 | ||
199 | ptr_t pop_head_() noexcept { | |
200 | auto old_head = head_; | |
201 | head_ = old_head->nxt; | |
202 | if ( ! head_) { | |
203 | tail_ = & head_; | |
204 | } | |
205 | old_head->nxt.reset(); | |
206 | --count_; | |
207 | return old_head; | |
208 | } | |
209 | ||
210 | public: | |
211 | bounded_channel( std::size_t hwm, std::size_t lwm, | |
212 | Allocator const& alloc = Allocator() ) : | |
213 | alloc_{ alloc }, | |
214 | tail_{ & head_ }, | |
215 | hwm_{ hwm }, | |
216 | lwm_{ lwm } { | |
217 | if ( hwm_ <= lwm_) { | |
218 | throw fiber_error( std::make_error_code( std::errc::invalid_argument), | |
219 | "boost fiber: high-watermark is less than or equal to low-watermark for bounded_channel"); | |
220 | } | |
221 | if ( 0 == hwm) { | |
222 | throw fiber_error( std::make_error_code( std::errc::invalid_argument), | |
223 | "boost fiber: high-watermark is zero"); | |
224 | } | |
225 | } | |
226 | ||
227 | bounded_channel( std::size_t wm, | |
228 | Allocator const& alloc = Allocator() ) : | |
229 | alloc_{ alloc }, | |
230 | tail_{ & head_ }, | |
231 | hwm_{ wm }, | |
232 | lwm_{ wm - 1 } { | |
233 | if ( 0 == wm) { | |
234 | throw fiber_error( std::make_error_code( std::errc::invalid_argument), | |
235 | "boost fiber: watermark is zero"); | |
236 | } | |
237 | } | |
238 | ||
239 | bounded_channel( bounded_channel const&) = delete; | |
240 | bounded_channel & operator=( bounded_channel const&) = delete; | |
241 | ||
242 | std::size_t upper_bound() const noexcept { | |
243 | return hwm_; | |
244 | } | |
245 | ||
246 | std::size_t lower_bound() const noexcept { | |
247 | return lwm_; | |
248 | } | |
249 | ||
250 | void close() noexcept { | |
251 | std::unique_lock< mutex > lk( mtx_); | |
252 | close_( lk); | |
253 | } | |
254 | ||
255 | channel_op_status push( value_type const& va) { | |
256 | typename allocator_traits_t::pointer ptr{ | |
257 | allocator_traits_t::allocate( alloc_, 1) }; | |
258 | try { | |
259 | allocator_traits_t::construct( alloc_, ptr, va, alloc_); | |
260 | } catch (...) { | |
261 | allocator_traits_t::deallocate( alloc_, ptr, 1); | |
262 | throw; | |
263 | } | |
264 | std::unique_lock< mutex > lk( mtx_); | |
265 | return push_( { detail::convert( ptr) }, lk); | |
266 | } | |
267 | ||
268 | channel_op_status push( value_type && va) { | |
269 | typename allocator_traits_t::pointer ptr{ | |
270 | allocator_traits_t::allocate( alloc_, 1) }; | |
271 | try { | |
272 | allocator_traits_t::construct( | |
273 | alloc_, ptr, std::move( va), alloc_); | |
274 | } catch (...) { | |
275 | allocator_traits_t::deallocate( alloc_, ptr, 1); | |
276 | throw; | |
277 | } | |
278 | std::unique_lock< mutex > lk( mtx_); | |
279 | return push_( { detail::convert( ptr) }, lk); | |
280 | } | |
281 | ||
282 | template< typename Rep, typename Period > | |
283 | channel_op_status push_wait_for( value_type const& va, | |
284 | std::chrono::duration< Rep, Period > const& timeout_duration) { | |
285 | return push_wait_until( va, | |
286 | std::chrono::steady_clock::now() + timeout_duration); | |
287 | } | |
288 | ||
289 | template< typename Rep, typename Period > | |
290 | channel_op_status push_wait_for( value_type && va, | |
291 | std::chrono::duration< Rep, Period > const& timeout_duration) { | |
292 | return push_wait_until( std::forward< value_type >( va), | |
293 | std::chrono::steady_clock::now() + timeout_duration); | |
294 | } | |
295 | ||
296 | template< typename Clock, typename Duration > | |
297 | channel_op_status push_wait_until( value_type const& va, | |
298 | std::chrono::time_point< Clock, Duration > const& timeout_time) { | |
299 | typename allocator_traits_t::pointer ptr{ | |
300 | allocator_traits_t::allocate( alloc_, 1) }; | |
301 | try { | |
302 | allocator_traits_t::construct( alloc_, ptr, va, alloc_); | |
303 | } catch (...) { | |
304 | allocator_traits_t::deallocate( alloc_, ptr, 1); | |
305 | throw; | |
306 | } | |
307 | std::unique_lock< mutex > lk( mtx_); | |
308 | return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk); | |
309 | } | |
310 | ||
311 | template< typename Clock, typename Duration > | |
312 | channel_op_status push_wait_until( value_type && va, | |
313 | std::chrono::time_point< Clock, Duration > const& timeout_time) { | |
314 | typename allocator_traits_t::pointer ptr{ | |
315 | allocator_traits_t::allocate( alloc_, 1) }; | |
316 | try { | |
317 | allocator_traits_t::construct( | |
318 | alloc_, ptr, std::move( va), alloc_); | |
319 | } catch (...) { | |
320 | allocator_traits_t::deallocate( alloc_, ptr, 1); | |
321 | throw; | |
322 | } | |
323 | std::unique_lock< mutex > lk( mtx_); | |
324 | return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk); | |
325 | } | |
326 | ||
327 | channel_op_status try_push( value_type const& va) { | |
328 | typename allocator_traits_t::pointer ptr{ | |
329 | allocator_traits_t::allocate( alloc_, 1) }; | |
330 | try { | |
331 | allocator_traits_t::construct( alloc_, ptr, va, alloc_); | |
332 | } catch (...) { | |
333 | allocator_traits_t::deallocate( alloc_, ptr, 1); | |
334 | throw; | |
335 | } | |
336 | std::unique_lock< mutex > lk( mtx_); | |
337 | return try_push_( { detail::convert( ptr) }, lk); | |
338 | } | |
339 | ||
340 | channel_op_status try_push( value_type && va) { | |
341 | typename allocator_traits_t::pointer ptr{ | |
342 | allocator_traits_t::allocate( alloc_, 1) }; | |
343 | try { | |
344 | allocator_traits_t::construct( | |
345 | alloc_, ptr, std::move( va), alloc_); | |
346 | } catch (...) { | |
347 | allocator_traits_t::deallocate( alloc_, ptr, 1); | |
348 | throw; | |
349 | } | |
350 | std::unique_lock< mutex > lk( mtx_); | |
351 | return try_push_( { detail::convert( ptr) }, lk); | |
352 | } | |
353 | ||
354 | channel_op_status pop( value_type & va) { | |
355 | std::unique_lock< mutex > lk( mtx_); | |
356 | not_empty_cond_.wait( lk, | |
357 | [this](){ | |
358 | return is_closed_() || ! is_empty_(); | |
359 | }); | |
360 | if ( is_closed_() && is_empty_() ) { | |
361 | return channel_op_status::closed; | |
362 | } | |
363 | va = value_pop_( lk); | |
364 | return channel_op_status::success; | |
365 | } | |
366 | ||
367 | value_type value_pop() { | |
368 | std::unique_lock< mutex > lk( mtx_); | |
369 | not_empty_cond_.wait( lk, | |
370 | [this](){ | |
371 | return is_closed_() || ! is_empty_(); | |
372 | }); | |
373 | if ( is_closed_() && is_empty_() ) { | |
374 | throw fiber_error( | |
375 | std::make_error_code( std::errc::operation_not_permitted), | |
376 | "boost fiber: queue is closed"); | |
377 | } | |
378 | return value_pop_( lk); | |
379 | } | |
380 | ||
381 | channel_op_status try_pop( value_type & va) { | |
382 | std::unique_lock< mutex > lk( mtx_); | |
383 | if ( is_closed_() && is_empty_() ) { | |
384 | // let other fibers run | |
385 | lk.unlock(); | |
386 | this_fiber::yield(); | |
387 | return channel_op_status::closed; | |
388 | } | |
389 | if ( is_empty_() ) { | |
390 | // let other fibers run | |
391 | lk.unlock(); | |
392 | this_fiber::yield(); | |
393 | return channel_op_status::empty; | |
394 | } | |
395 | va = value_pop_( lk); | |
396 | return channel_op_status::success; | |
397 | } | |
398 | ||
399 | template< typename Rep, typename Period > | |
400 | channel_op_status pop_wait_for( value_type & va, | |
401 | std::chrono::duration< Rep, Period > const& timeout_duration) { | |
402 | return pop_wait_until( va, | |
403 | std::chrono::steady_clock::now() + timeout_duration); | |
404 | } | |
405 | ||
406 | template< typename Clock, typename Duration > | |
407 | channel_op_status pop_wait_until( value_type & va, | |
408 | std::chrono::time_point< Clock, Duration > const& timeout_time) { | |
409 | std::unique_lock< mutex > lk( mtx_); | |
410 | if ( ! not_empty_cond_.wait_until( lk, | |
411 | timeout_time, | |
412 | [this](){ | |
413 | return is_closed_() || ! is_empty_(); | |
414 | })) { | |
415 | return channel_op_status::timeout; | |
416 | } | |
417 | if ( is_closed_() && is_empty_() ) { | |
418 | return channel_op_status::closed; | |
419 | } | |
420 | va = value_pop_( lk); | |
421 | return channel_op_status::success; | |
422 | } | |
423 | }; | |
424 | ||
425 | }} | |
426 | ||
427 | #ifdef BOOST_HAS_ABI_HEADERS | |
428 | # include BOOST_ABI_SUFFIX | |
429 | #endif | |
430 | ||
431 | #endif // BOOST_FIBERS_BOUNDED_CHANNEL_H |