1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP
2 #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP
4 //////////////////////////////////////////////////////////////////////////////
6 // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost
7 // Software License, Version 1.0. (See accompanying file
8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10 // See http://www.boost.org/libs/thread for documentation.
12 //////////////////////////////////////////////////////////////////////////////
15 #include <boost/thread/detail/config.hpp>
16 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
17 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
18 #include <boost/thread/condition_variable.hpp>
19 #include <boost/thread/csbl/devector.hpp>
20 #include <boost/thread/detail/move.hpp>
21 #include <boost/thread/mutex.hpp>
23 #include <boost/throw_exception.hpp>
24 #include <boost/smart_ptr/shared_ptr.hpp>
25 #include <boost/smart_ptr/make_shared.hpp>
27 #include <boost/config/abi_prefix.hpp>
33 template <class ValueType, class Container = csbl::devector<ValueType> >
35 : public detail::sync_queue_base<ValueType, Container >
37 typedef detail::sync_queue_base<ValueType, Container > super;
40 typedef ValueType value_type;
41 //typedef typename super::value_type value_type; // fixme
42 typedef typename super::underlying_queue_type underlying_queue_type;
43 typedef typename super::size_type size_type;
44 typedef typename super::op_status op_status;
46 // Constructors/Assignment/Destructors
47 BOOST_THREAD_NO_COPYABLE(sync_queue)
49 //template <class Range>
50 //inline explicit sync_queue(Range range);
55 inline void push(const value_type& x);
56 inline queue_op_status try_push(const value_type& x);
57 inline queue_op_status nonblocking_push(const value_type& x);
58 inline queue_op_status wait_push(const value_type& x);
59 inline void push(BOOST_THREAD_RV_REF(value_type) x);
60 inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x);
61 inline queue_op_status nonblocking_push(BOOST_THREAD_RV_REF(value_type) x);
62 inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x);
64 // Observers/Modifiers
65 inline void pull(value_type&);
66 // enable_if is_nothrow_copy_movable<value_type>
67 inline value_type pull();
69 inline queue_op_status try_pull(value_type&);
70 inline queue_op_status nonblocking_pull(value_type&);
71 inline queue_op_status wait_pull(ValueType& elem);
75 inline queue_op_status try_pull(value_type& x, unique_lock<mutex>& lk);
76 inline queue_op_status wait_pull(value_type& x, unique_lock<mutex>& lk);
77 inline queue_op_status try_push(const value_type& x, unique_lock<mutex>& lk);
78 inline queue_op_status wait_push(const value_type& x, unique_lock<mutex>& lk);
79 inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
80 inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
82 inline void pull(value_type& elem, unique_lock<mutex>& )
84 elem = boost::move(super::data_.front());
85 super::data_.pop_front();
87 inline value_type pull(unique_lock<mutex>& )
89 value_type e = boost::move(super::data_.front());
90 super::data_.pop_front();
91 return boost::move(e);
94 inline void push(const value_type& elem, unique_lock<mutex>& lk)
96 super::data_.push_back(elem);
97 super::notify_not_empty_if_needed(lk);
100 inline void push(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk)
102 super::data_.push_back(boost::move(elem));
103 super::notify_not_empty_if_needed(lk);
107 template <class ValueType, class Container>
108 sync_queue<ValueType, Container>::sync_queue() :
113 // template <class ValueType, class Container>
114 // template <class Range>
115 // explicit sync_queue<ValueType, Container>::sync_queue(Range range) :
116 // data_(), closed_(false)
120 // typedef typename Range::iterator iterator_t;
121 // iterator_t first = boost::begin(range);
122 // iterator_t end = boost::end(range);
123 // for (iterator_t cur = first; cur != end; ++cur)
125 // data_.push(boost::move(*cur));;
127 // notify_not_empty_if_needed(lk);
135 template <class ValueType, class Container>
136 sync_queue<ValueType, Container>::~sync_queue()
140 template <class ValueType, class Container>
141 queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem, unique_lock<mutex>& lk)
143 if (super::empty(lk))
145 if (super::closed(lk)) return queue_op_status::closed;
146 return queue_op_status::empty;
149 return queue_op_status::success;
151 template <class ValueType, class Container>
152 queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem, unique_lock<mutex>& lk)
154 //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
155 if (super::empty(lk))
157 //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
158 if (super::closed(lk)) return queue_op_status::closed;
160 //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
161 bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
162 //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
163 if (has_been_closed) return queue_op_status::closed;
164 //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
166 //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
167 return queue_op_status::success;
170 template <class ValueType, class Container>
171 queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem)
173 unique_lock<mutex> lk(super::mtx_);
174 return try_pull(elem, lk);
177 template <class ValueType, class Container>
178 queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem)
180 unique_lock<mutex> lk(super::mtx_);
181 return wait_pull(elem, lk);
184 template <class ValueType, class Container>
185 queue_op_status sync_queue<ValueType, Container>::nonblocking_pull(ValueType& elem)
187 unique_lock<mutex> lk(super::mtx_, try_to_lock);
190 return queue_op_status::busy;
192 return try_pull(elem, lk);
195 template <class ValueType, class Container>
196 void sync_queue<ValueType, Container>::pull(ValueType& elem)
198 unique_lock<mutex> lk(super::mtx_);
199 super::wait_until_not_empty(lk);
203 // enable if ValueType is nothrow movable
204 template <class ValueType, class Container>
205 ValueType sync_queue<ValueType, Container>::pull()
207 unique_lock<mutex> lk(super::mtx_);
208 super::wait_until_not_empty(lk);
212 template <class ValueType, class Container>
213 queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem, unique_lock<mutex>& lk)
215 if (super::closed(lk)) return queue_op_status::closed;
217 return queue_op_status::success;
220 template <class ValueType, class Container>
221 queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem)
223 unique_lock<mutex> lk(super::mtx_);
224 return try_push(elem, lk);
227 template <class ValueType, class Container>
228 queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem, unique_lock<mutex>& lk)
230 if (super::closed(lk)) return queue_op_status::closed;
232 return queue_op_status::success;
235 template <class ValueType, class Container>
236 queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem)
238 unique_lock<mutex> lk(super::mtx_);
239 return wait_push(elem, lk);
242 template <class ValueType, class Container>
243 queue_op_status sync_queue<ValueType, Container>::nonblocking_push(const ValueType& elem)
245 unique_lock<mutex> lk(super::mtx_, try_to_lock);
246 if (!lk.owns_lock()) return queue_op_status::busy;
247 return try_push(elem, lk);
250 template <class ValueType, class Container>
251 void sync_queue<ValueType, Container>::push(const ValueType& elem)
253 unique_lock<mutex> lk(super::mtx_);
254 super::throw_if_closed(lk);
258 template <class ValueType, class Container>
259 queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
261 if (super::closed(lk)) return queue_op_status::closed;
262 push(boost::move(elem), lk);
263 return queue_op_status::success;
266 template <class ValueType, class Container>
267 queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem)
269 unique_lock<mutex> lk(super::mtx_);
270 return try_push(boost::move(elem), lk);
273 template <class ValueType, class Container>
274 queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
276 if (super::closed(lk)) return queue_op_status::closed;
277 push(boost::move(elem), lk);
278 return queue_op_status::success;
281 template <class ValueType, class Container>
282 queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem)
284 unique_lock<mutex> lk(super::mtx_);
285 return wait_push(boost::move(elem), lk);
288 template <class ValueType, class Container>
289 queue_op_status sync_queue<ValueType, Container>::nonblocking_push(BOOST_THREAD_RV_REF(ValueType) elem)
291 unique_lock<mutex> lk(super::mtx_, try_to_lock);
294 return queue_op_status::busy;
296 return try_push(boost::move(elem), lk);
299 template <class ValueType, class Container>
300 void sync_queue<ValueType, Container>::push(BOOST_THREAD_RV_REF(ValueType) elem)
302 unique_lock<mutex> lk(super::mtx_);
303 super::throw_if_closed(lk);
304 push(boost::move(elem), lk);
307 template <class ValueType, class Container>
308 sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, BOOST_THREAD_RV_REF(ValueType) elem)
310 sbq.push(boost::move(elem));
314 template <class ValueType, class Container>
315 sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, ValueType const&elem)
321 template <class ValueType, class Container>
322 sync_queue<ValueType, Container>& operator>>(sync_queue<ValueType, Container>& sbq, ValueType &elem)
329 using concurrent::sync_queue;
333 #include <boost/config/abi_suffix.hpp>