1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
9 #define BOOST_THREAD_SYNC_PRIORITY_QUEUE
11 #include <boost/thread/detail/config.hpp>
13 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
14 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/csbl/vector.hpp>
17 #include <boost/thread/detail/move.hpp>
18 #include <boost/thread/mutex.hpp>
20 #include <boost/atomic.hpp>
21 #include <boost/chrono/duration.hpp>
22 #include <boost/chrono/time_point.hpp>
28 #include <boost/config/abi_prefix.hpp>
36 class Container = csbl::vector<Type>,
37 class Compare = std::less<Type>
45 typedef Type value_type;
46 typedef typename Container::size_type size_type;
48 explicit priority_queue(const Compare& compare = Compare())
49 : _elements(), _compare(compare)
52 size_type size() const
54 return _elements.size();
59 return _elements.empty();
62 void push(Type const& element)
64 _elements.push_back(element);
65 std::push_heap(_elements.begin(), _elements.end(), _compare);
67 void push(BOOST_RV_REF(Type) element)
69 _elements.push_back(boost::move(element));
70 std::push_heap(_elements.begin(), _elements.end(), _compare);
75 std::pop_heap(_elements.begin(), _elements.end(), _compare);
80 Type result = boost::move(_elements.front());
82 return boost::move(result);
87 return _elements.front();
94 template <class ValueType,
95 class Container = csbl::vector<ValueType>,
96 class Compare = std::less<typename Container::value_type> >
97 class sync_priority_queue
98 : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
100 typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super;
103 typedef ValueType value_type;
104 //typedef typename super::value_type value_type; // fixme
105 typedef typename super::underlying_queue_type underlying_queue_type;
106 typedef typename super::size_type size_type;
107 typedef typename super::op_status op_status;
109 typedef chrono::steady_clock clock;
113 sync_priority_queue() {}
115 ~sync_priority_queue()
123 void push(const ValueType& elem);
124 void push(BOOST_THREAD_RV_REF(ValueType) elem);
126 queue_op_status try_push(const ValueType& elem);
127 queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);
131 void pull(ValueType&);
133 template <class WClock, class Duration>
134 queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&);
135 template <class Rep, class Period>
136 queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&);
138 queue_op_status try_pull(ValueType& elem);
139 queue_op_status wait_pull(ValueType& elem);
140 queue_op_status nonblocking_pull(ValueType&);
143 void push(unique_lock<mutex>&, const ValueType& elem);
144 void push(lock_guard<mutex>&, const ValueType& elem);
145 void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
146 void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
148 queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
149 queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
151 ValueType pull(unique_lock<mutex>&);
152 ValueType pull(lock_guard<mutex>&);
154 void pull(unique_lock<mutex>&, ValueType&);
155 void pull(lock_guard<mutex>&, ValueType&);
157 queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
158 queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);
160 queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);
162 queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);
164 sync_priority_queue(const sync_priority_queue&);
165 sync_priority_queue& operator= (const sync_priority_queue&);
166 sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
167 sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
171 //////////////////////
172 template <class T, class Container,class Cmp>
173 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
175 super::throw_if_closed(lk);
176 super::data_.push(elem);
177 super::notify_not_empty_if_needed(lk);
179 template <class T, class Container,class Cmp>
180 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
182 super::throw_if_closed(lk);
183 super::data_.push(elem);
184 super::notify_not_empty_if_needed(lk);
186 template <class T, class Container,class Cmp>
187 void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
189 lock_guard<mutex> lk(super::mtx_);
193 //////////////////////
194 template <class T, class Container,class Cmp>
195 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
197 super::throw_if_closed(lk);
198 super::data_.push(boost::move(elem));
199 super::notify_not_empty_if_needed(lk);
201 template <class T, class Container,class Cmp>
202 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
204 super::throw_if_closed(lk);
205 super::data_.push(boost::move(elem));
206 super::notify_not_empty_if_needed(lk);
208 template <class T, class Container,class Cmp>
209 void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
211 lock_guard<mutex> lk(super::mtx_);
212 push(lk, boost::move(elem));
215 //////////////////////
216 template <class T, class Container,class Cmp>
217 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
219 lock_guard<mutex> lk(super::mtx_);
220 if (super::closed(lk)) return queue_op_status::closed;
222 return queue_op_status::success;
225 //////////////////////
226 template <class T, class Container,class Cmp>
227 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
229 lock_guard<mutex> lk(super::mtx_);
230 if (super::closed(lk)) return queue_op_status::closed;
231 push(lk, boost::move(elem));
233 return queue_op_status::success;
236 //////////////////////
237 template <class T,class Container, class Cmp>
238 T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
240 return super::data_.pull();
242 template <class T,class Container, class Cmp>
243 T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
245 return super::data_.pull();
248 template <class T,class Container, class Cmp>
249 T sync_priority_queue<T,Container,Cmp>::pull()
251 unique_lock<mutex> lk(super::mtx_);
252 super::wait_until_not_empty(lk);
256 //////////////////////
257 template <class T,class Container, class Cmp>
258 void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
260 elem = super::data_.pull();
262 template <class T,class Container, class Cmp>
263 void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
265 elem = super::data_.pull();
268 template <class T,class Container, class Cmp>
269 void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
271 unique_lock<mutex> lk(super::mtx_);
272 super::wait_until_not_empty(lk);
276 //////////////////////
277 template <class T, class Cont,class Cmp>
278 template <class WClock, class Duration>
280 sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem)
282 unique_lock<mutex> lk(super::mtx_);
283 if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
284 return queue_op_status::timeout;
286 return queue_op_status::success;
289 //////////////////////
290 template <class T, class Cont,class Cmp>
291 template <class Rep, class Period>
293 sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem)
295 return pull_until(clock::now() + dura, elem);
298 //////////////////////
299 template <class T, class Container,class Cmp>
301 sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
303 if (super::empty(lk))
305 if (super::closed(lk)) return queue_op_status::closed;
306 return queue_op_status::empty;
309 return queue_op_status::success;
312 template <class T, class Container,class Cmp>
314 sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
316 if (super::empty(lk))
318 if (super::closed(lk)) return queue_op_status::closed;
319 return queue_op_status::empty;
322 return queue_op_status::success;
325 template <class T, class Container,class Cmp>
327 sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
329 lock_guard<mutex> lk(super::mtx_);
330 return try_pull(lk, elem);
333 //////////////////////
334 template <class T,class Container, class Cmp>
335 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
337 if (super::empty(lk))
339 if (super::closed(lk)) return queue_op_status::closed;
341 bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
342 if (has_been_closed) return queue_op_status::closed;
344 return queue_op_status::success;
347 template <class T,class Container, class Cmp>
348 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
350 unique_lock<mutex> lk(super::mtx_);
351 return wait_pull(lk, elem);
354 //////////////////////
356 template <class T,class Container, class Cmp>
357 queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
359 unique_lock<mutex> lk(super::mtx_, try_to_lock);
360 if (!lk.owns_lock()) return queue_op_status::busy;
361 return try_pull(lk, elem);
366 } //end concurrent namespace
368 using concurrent::sync_priority_queue;
370 } //end boost namespace
371 #include <boost/config/abi_suffix.hpp>