1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP
2 #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP
4 //////////////////////////////////////////////////////////////////////////////
6 // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost
7 // Software License, Version 1.0. (See accompanying file
8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10 // See http://www.boost.org/libs/thread for documentation.
12 //////////////////////////////////////////////////////////////////////////////
14 #include <boost/thread/detail/config.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/mutex.hpp>
17 #include <boost/thread/detail/move.hpp>
18 #include <boost/throw_exception.hpp>
19 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
21 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
22 #include <boost/smart_ptr/shared_ptr.hpp>
23 #include <boost/smart_ptr/make_shared.hpp>
25 #include <boost/config/abi_prefix.hpp>
31 template <typename ValueType>
32 class sync_bounded_queue
35 typedef ValueType value_type;
36 typedef std::size_t size_type;
38 // Constructors/Assignment/Destructors
39 BOOST_THREAD_NO_COPYABLE(sync_bounded_queue)
40 explicit sync_bounded_queue(size_type max_elems);
41 template <typename Range>
42 sync_bounded_queue(size_type max_elems, Range range);
43 ~sync_bounded_queue();
46 inline bool empty() const;
47 inline bool full() const;
48 inline size_type capacity() const;
49 inline size_type size() const;
50 inline bool closed() const;
55 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
56 inline void push(const value_type& x);
57 inline void push(BOOST_THREAD_RV_REF(value_type) x);
58 inline bool try_push(const value_type& x);
59 inline bool try_push(BOOST_THREAD_RV_REF(value_type) x);
60 inline bool try_push(no_block_tag, const value_type& x);
61 inline bool try_push(no_block_tag, BOOST_THREAD_RV_REF(value_type) x);
63 inline void push_back(const value_type& x);
64 inline void push_back(BOOST_THREAD_RV_REF(value_type) x);
65 inline queue_op_status try_push_back(const value_type& x);
66 inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x);
67 inline queue_op_status nonblocking_push_back(const value_type& x);
68 inline queue_op_status nonblocking_push_back(BOOST_THREAD_RV_REF(value_type) x);
69 inline queue_op_status wait_push_back(const value_type& x);
70 inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x);
72 // Observers/Modifiers
73 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
74 inline void pull(value_type&);
75 // enable_if is_nothrow_copy_movable<value_type>
76 inline value_type pull();
77 inline shared_ptr<ValueType> ptr_pull();
78 inline bool try_pull(value_type&);
79 inline bool try_pull(no_block_tag,value_type&);
80 inline shared_ptr<ValueType> try_pull();
82 inline void pull_front(value_type&);
83 // enable_if is_nothrow_copy_movable<value_type>
84 inline value_type pull_front();
85 inline queue_op_status try_pull_front(value_type&);
86 inline queue_op_status nonblocking_pull_front(value_type&);
88 inline queue_op_status wait_pull_front(ValueType& elem);
92 condition_variable not_empty_;
93 condition_variable not_full_;
94 size_type waiting_full_;
95 size_type waiting_empty_;
102 inline size_type inc(size_type idx) const BOOST_NOEXCEPT
104 return (idx + 1) % capacity_;
107 inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT
111 inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT
115 inline bool full(unique_lock<mutex>& ) const BOOST_NOEXCEPT
117 return (inc(in_) == out_);
119 inline bool full(lock_guard<mutex>& ) const BOOST_NOEXCEPT
121 return (inc(in_) == out_);
123 inline size_type capacity(lock_guard<mutex>& ) const BOOST_NOEXCEPT
127 inline size_type size(lock_guard<mutex>& lk) const BOOST_NOEXCEPT
129 if (full(lk)) return capacity(lk);
130 return ((in_+capacity(lk)-out_) % capacity(lk));
133 inline void throw_if_closed(unique_lock<mutex>&);
134 inline bool closed(unique_lock<mutex>&) const;
136 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
137 inline bool try_pull(value_type& x, unique_lock<mutex>& lk);
138 inline shared_ptr<value_type> try_pull(unique_lock<mutex>& lk);
139 inline bool try_push(const value_type& x, unique_lock<mutex>& lk);
140 inline bool try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
142 inline queue_op_status try_pull_front(value_type& x, unique_lock<mutex>& lk);
143 inline queue_op_status try_push_back(const value_type& x, unique_lock<mutex>& lk);
144 inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
146 inline queue_op_status wait_pull_front(value_type& x, unique_lock<mutex>& lk);
147 inline queue_op_status wait_push_back(const value_type& x, unique_lock<mutex>& lk);
148 inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
150 inline void wait_until_not_empty(unique_lock<mutex>& lk);
151 inline void wait_until_not_empty(unique_lock<mutex>& lk, bool&);
152 inline size_type wait_until_not_full(unique_lock<mutex>& lk);
153 inline size_type wait_until_not_full(unique_lock<mutex>& lk, bool&);
156 inline void notify_not_empty_if_needed(unique_lock<mutex>& lk)
158 if (waiting_empty_ > 0)
162 not_empty_.notify_one();
165 inline void notify_not_full_if_needed(unique_lock<mutex>& lk)
167 if (waiting_full_ > 0)
171 not_full_.notify_one();
175 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
176 inline void pull(value_type& elem, unique_lock<mutex>& lk)
178 elem = boost::move(data_[out_]);
180 notify_not_full_if_needed(lk);
182 inline value_type pull(unique_lock<mutex>& lk)
184 value_type elem = boost::move(data_[out_]);
186 notify_not_full_if_needed(lk);
187 return boost::move(elem);
189 inline boost::shared_ptr<value_type> ptr_pull(unique_lock<mutex>& lk)
191 shared_ptr<value_type> res = make_shared<value_type>(boost::move(data_[out_]));
193 notify_not_full_if_needed(lk);
197 inline void pull_front(value_type& elem, unique_lock<mutex>& lk)
199 elem = boost::move(data_[out_]);
201 notify_not_full_if_needed(lk);
203 inline value_type pull_front(unique_lock<mutex>& lk)
205 value_type elem = boost::move(data_[out_]);
207 notify_not_full_if_needed(lk);
208 return boost::move(elem);
211 inline void set_in(size_type in, unique_lock<mutex>& lk)
214 notify_not_empty_if_needed(lk);
217 inline void push_at(const value_type& elem, size_type in_p_1, unique_lock<mutex>& lk)
223 inline void push_at(BOOST_THREAD_RV_REF(value_type) elem, size_type in_p_1, unique_lock<mutex>& lk)
225 data_[in_] = boost::move(elem);
230 template <typename ValueType>
231 sync_bounded_queue<ValueType>::sync_bounded_queue(typename sync_bounded_queue<ValueType>::size_type max_elems) :
232 waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1),
235 BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1");
238 // template <typename ValueType>
239 // template <typename Range>
240 // sync_bounded_queue<ValueType>::sync_bounded_queue(size_type max_elems, Range range) :
241 // waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1),
244 // BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1");
245 // BOOST_ASSERT_MSG(max_elems == size(range), "number of elements must match range's size");
248 // typedef typename Range::iterator iterator_t;
249 // iterator_t first = boost::begin(range);
250 // iterator_t end = boost::end(range);
252 // for (iterator_t cur = first; cur != end; ++cur, ++in)
264 template <typename ValueType>
265 sync_bounded_queue<ValueType>::~sync_bounded_queue()
270 template <typename ValueType>
271 void sync_bounded_queue<ValueType>::close()
274 lock_guard<mutex> lk(mtx_);
277 not_empty_.notify_all();
278 not_full_.notify_all();
281 template <typename ValueType>
282 bool sync_bounded_queue<ValueType>::closed() const
284 lock_guard<mutex> lk(mtx_);
287 template <typename ValueType>
288 bool sync_bounded_queue<ValueType>::closed(unique_lock<mutex>& ) const
293 template <typename ValueType>
294 bool sync_bounded_queue<ValueType>::empty() const
296 lock_guard<mutex> lk(mtx_);
299 template <typename ValueType>
300 bool sync_bounded_queue<ValueType>::full() const
302 lock_guard<mutex> lk(mtx_);
306 template <typename ValueType>
307 typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::capacity() const
309 lock_guard<mutex> lk(mtx_);
313 template <typename ValueType>
314 typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::size() const
316 lock_guard<mutex> lk(mtx_);
320 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
321 template <typename ValueType>
322 bool sync_bounded_queue<ValueType>::try_pull(ValueType& elem, unique_lock<mutex>& lk)
332 template <typename ValueType>
333 shared_ptr<ValueType> sync_bounded_queue<ValueType>::try_pull(unique_lock<mutex>& lk)
338 return shared_ptr<ValueType>();
342 template <typename ValueType>
343 bool sync_bounded_queue<ValueType>::try_pull(ValueType& elem)
345 unique_lock<mutex> lk(mtx_);
346 return try_pull(elem, lk);
350 template <typename ValueType>
351 queue_op_status sync_bounded_queue<ValueType>::try_pull_front(ValueType& elem, unique_lock<mutex>& lk)
355 if (closed(lk)) return queue_op_status::closed;
356 return queue_op_status::empty;
358 pull_front(elem, lk);
359 return queue_op_status::success;
362 template <typename ValueType>
363 queue_op_status sync_bounded_queue<ValueType>::try_pull_front(ValueType& elem)
365 unique_lock<mutex> lk(mtx_);
366 return try_pull_front(elem, lk);
369 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
370 template <typename ValueType>
371 bool sync_bounded_queue<ValueType>::try_pull(no_block_tag,ValueType& elem)
373 unique_lock<mutex> lk(mtx_, try_to_lock);
378 return try_pull(elem, lk);
380 template <typename ValueType>
381 boost::shared_ptr<ValueType> sync_bounded_queue<ValueType>::try_pull()
383 unique_lock<mutex> lk(mtx_);
388 template <typename ValueType>
389 queue_op_status sync_bounded_queue<ValueType>::nonblocking_pull_front(ValueType& elem)
391 unique_lock<mutex> lk(mtx_, try_to_lock);
394 return queue_op_status::busy;
396 return try_pull_front(elem, lk);
399 template <typename ValueType>
400 void sync_bounded_queue<ValueType>::throw_if_closed(unique_lock<mutex>&)
404 BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
408 template <typename ValueType>
409 void sync_bounded_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk)
413 if (out_ != in_) break;
419 template <typename ValueType>
420 void sync_bounded_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk, bool & closed)
424 if (out_ != in_) break;
425 if (closed_) {closed=true; return;}
431 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
432 template <typename ValueType>
433 void sync_bounded_queue<ValueType>::pull(ValueType& elem)
435 unique_lock<mutex> lk(mtx_);
436 wait_until_not_empty(lk);
439 // template <typename ValueType>
440 // void sync_bounded_queue<ValueType>::pull(ValueType& elem, bool & closed)
442 // unique_lock<mutex> lk(mtx_);
443 // wait_until_not_empty(lk, closed);
444 // if (closed) {return;}
448 // enable if ValueType is nothrow movable
449 template <typename ValueType>
450 ValueType sync_bounded_queue<ValueType>::pull()
452 unique_lock<mutex> lk(mtx_);
453 wait_until_not_empty(lk);
456 template <typename ValueType>
457 boost::shared_ptr<ValueType> sync_bounded_queue<ValueType>::ptr_pull()
459 unique_lock<mutex> lk(mtx_);
460 wait_until_not_empty(lk);
466 template <typename ValueType>
467 void sync_bounded_queue<ValueType>::pull_front(ValueType& elem)
469 unique_lock<mutex> lk(mtx_);
470 wait_until_not_empty(lk);
471 pull_front(elem, lk);
474 // enable if ValueType is nothrow movable
475 template <typename ValueType>
476 ValueType sync_bounded_queue<ValueType>::pull_front()
478 unique_lock<mutex> lk(mtx_);
479 wait_until_not_empty(lk);
480 return pull_front(lk);
483 template <typename ValueType>
484 queue_op_status sync_bounded_queue<ValueType>::wait_pull_front(ValueType& elem, unique_lock<mutex>& lk)
486 if (empty(lk) && closed(lk)) {return queue_op_status::closed;}
487 bool is_closed = false;
488 wait_until_not_empty(lk, is_closed);
489 if (is_closed) {return queue_op_status::closed;}
490 pull_front(elem, lk);
491 return queue_op_status::success;
493 template <typename ValueType>
494 queue_op_status sync_bounded_queue<ValueType>::wait_pull_front(ValueType& elem)
496 unique_lock<mutex> lk(mtx_);
497 return wait_pull_front(elem, lk);
500 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
501 template <typename ValueType>
502 bool sync_bounded_queue<ValueType>::try_push(const ValueType& elem, unique_lock<mutex>& lk)
505 size_type in_p_1 = inc(in_);
506 if (in_p_1 == out_) // full()
510 push_at(elem, in_p_1, lk);
513 template <typename ValueType>
514 bool sync_bounded_queue<ValueType>::try_push(const ValueType& elem)
516 unique_lock<mutex> lk(mtx_);
517 return try_push(elem, lk);
522 template <typename ValueType>
523 queue_op_status sync_bounded_queue<ValueType>::try_push_back(const ValueType& elem, unique_lock<mutex>& lk)
525 if (closed(lk)) return queue_op_status::closed;
526 size_type in_p_1 = inc(in_);
527 if (in_p_1 == out_) // full()
529 return queue_op_status::full;
531 push_at(elem, in_p_1, lk);
532 return queue_op_status::success;
535 template <typename ValueType>
536 queue_op_status sync_bounded_queue<ValueType>::try_push_back(const ValueType& elem)
538 unique_lock<mutex> lk(mtx_);
539 return try_push_back(elem, lk);
542 template <typename ValueType>
543 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(const ValueType& elem, unique_lock<mutex>& lk)
545 if (closed(lk)) return queue_op_status::closed;
546 push_at(elem, wait_until_not_full(lk), lk);
547 return queue_op_status::success;
549 template <typename ValueType>
550 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(const ValueType& elem)
552 unique_lock<mutex> lk(mtx_);
553 return wait_push_back(elem, lk);
557 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
558 template <typename ValueType>
559 bool sync_bounded_queue<ValueType>::try_push(no_block_tag, const ValueType& elem)
561 unique_lock<mutex> lk(mtx_, try_to_lock);
562 if (!lk.owns_lock()) return false;
563 return try_push(elem, lk);
567 template <typename ValueType>
568 queue_op_status sync_bounded_queue<ValueType>::nonblocking_push_back(const ValueType& elem)
570 unique_lock<mutex> lk(mtx_, try_to_lock);
571 if (!lk.owns_lock()) return queue_op_status::busy;
572 return try_push_back(elem, lk);
575 template <typename ValueType>
576 typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::wait_until_not_full(unique_lock<mutex>& lk)
581 size_type in_p_1 = inc(in_);
582 if (in_p_1 != out_) // ! full()
591 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
592 template <typename ValueType>
593 void sync_bounded_queue<ValueType>::push(const ValueType& elem)
595 unique_lock<mutex> lk(mtx_);
596 push_at(elem, wait_until_not_full(lk), lk);
599 template <typename ValueType>
600 void sync_bounded_queue<ValueType>::push_back(const ValueType& elem)
602 unique_lock<mutex> lk(mtx_);
603 push_at(elem, wait_until_not_full(lk), lk);
606 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
607 template <typename ValueType>
608 bool sync_bounded_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
611 size_type in_p_1 = inc(in_);
612 if (in_p_1 == out_) // full()
616 push_at(boost::move(elem), in_p_1, lk);
620 template <typename ValueType>
621 bool sync_bounded_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem)
623 unique_lock<mutex> lk(mtx_);
624 return try_push(boost::move(elem), lk);
628 template <typename ValueType>
629 queue_op_status sync_bounded_queue<ValueType>::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
631 if (closed(lk)) return queue_op_status::closed;
632 size_type in_p_1 = inc(in_);
633 if (in_p_1 == out_) // full()
635 return queue_op_status::full;
637 push_at(boost::move(elem), in_p_1, lk);
638 return queue_op_status::success;
640 template <typename ValueType>
641 queue_op_status sync_bounded_queue<ValueType>::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem)
643 unique_lock<mutex> lk(mtx_);
644 return try_push_back(boost::move(elem), lk);
647 template <typename ValueType>
648 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
650 if (closed(lk)) return queue_op_status::closed;
651 push_at(boost::move(elem), wait_until_not_full(lk), lk);
652 return queue_op_status::success;
654 template <typename ValueType>
655 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem)
657 unique_lock<mutex> lk(mtx_);
658 return try_push_back(boost::move(elem), lk);
662 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
663 template <typename ValueType>
664 bool sync_bounded_queue<ValueType>::try_push(no_block_tag, BOOST_THREAD_RV_REF(ValueType) elem)
666 unique_lock<mutex> lk(mtx_, try_to_lock);
671 return try_push(boost::move(elem), lk);
674 template <typename ValueType>
675 queue_op_status sync_bounded_queue<ValueType>::nonblocking_push_back(BOOST_THREAD_RV_REF(ValueType) elem)
677 unique_lock<mutex> lk(mtx_, try_to_lock);
680 return queue_op_status::busy;
682 return try_push_back(boost::move(elem), lk);
685 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
686 template <typename ValueType>
687 void sync_bounded_queue<ValueType>::push(BOOST_THREAD_RV_REF(ValueType) elem)
689 unique_lock<mutex> lk(mtx_);
690 push_at(boost::move(elem), wait_until_not_full(lk), lk);
693 template <typename ValueType>
694 void sync_bounded_queue<ValueType>::push_back(BOOST_THREAD_RV_REF(ValueType) elem)
696 unique_lock<mutex> lk(mtx_);
697 push_at(boost::move(elem), wait_until_not_full(lk), lk);
700 template <typename ValueType>
701 sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, BOOST_THREAD_RV_REF(ValueType) elem)
703 sbq.push_back(boost::move(elem));
707 template <typename ValueType>
708 sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, ValueType const&elem)
714 template <typename ValueType>
715 sync_bounded_queue<ValueType>& operator>>(sync_bounded_queue<ValueType>& sbq, ValueType &elem)
717 sbq.pull_front(elem);
721 using concurrent::sync_bounded_queue;
725 #include <boost/config/abi_suffix.hpp>