]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (C) 2014 Ian Forbed |
2 | // Copyright (C) 2014 Vicente J. Botet Escriba | |
3 | // | |
4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | ||
8 | #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE | |
9 | #define BOOST_THREAD_SYNC_PRIORITY_QUEUE | |
10 | ||
11 | #include <boost/thread/detail/config.hpp> | |
12 | ||
13 | #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp> | |
14 | #include <boost/thread/concurrent_queues/queue_op_status.hpp> | |
15 | #include <boost/thread/condition_variable.hpp> | |
16 | #include <boost/thread/csbl/vector.hpp> | |
17 | #include <boost/thread/detail/move.hpp> | |
18 | #include <boost/thread/mutex.hpp> | |
19 | ||
20 | #include <boost/atomic.hpp> | |
21 | #include <boost/chrono/duration.hpp> | |
22 | #include <boost/chrono/time_point.hpp> | |
23 | ||
24 | #include <exception> | |
25 | #include <queue> | |
26 | #include <utility> | |
27 | ||
28 | #include <boost/config/abi_prefix.hpp> | |
29 | ||
30 | namespace boost | |
31 | { | |
32 | namespace detail { | |
33 | ||
34 | template < | |
35 | class Type, | |
36 | class Container = csbl::vector<Type>, | |
37 | class Compare = std::less<Type> | |
38 | > | |
39 | class priority_queue | |
40 | { | |
41 | private: | |
42 | Container _elements; | |
43 | Compare _compare; | |
44 | public: | |
45 | typedef Type value_type; | |
46 | typedef typename Container::size_type size_type; | |
47 | ||
48 | explicit priority_queue(const Compare& compare = Compare()) | |
49 | : _elements(), _compare(compare) | |
50 | { } | |
51 | ||
52 | size_type size() const | |
53 | { | |
54 | return _elements.size(); | |
55 | } | |
56 | ||
57 | bool empty() const | |
58 | { | |
59 | return _elements.empty(); | |
60 | } | |
61 | ||
62 | void push(Type const& element) | |
63 | { | |
64 | _elements.push_back(element); | |
65 | std::push_heap(_elements.begin(), _elements.end(), _compare); | |
66 | } | |
67 | void push(BOOST_RV_REF(Type) element) | |
68 | { | |
69 | _elements.push_back(boost::move(element)); | |
70 | std::push_heap(_elements.begin(), _elements.end(), _compare); | |
71 | } | |
72 | ||
73 | void pop() | |
74 | { | |
75 | std::pop_heap(_elements.begin(), _elements.end(), _compare); | |
76 | _elements.pop_back(); | |
77 | } | |
78 | Type pull() | |
79 | { | |
80 | Type result = boost::move(_elements.front()); | |
81 | pop(); | |
82 | return boost::move(result); | |
83 | } | |
84 | ||
85 | Type const& top() | |
86 | { | |
87 | return _elements.front(); | |
88 | } | |
89 | }; | |
90 | } | |
91 | ||
92 | namespace concurrent | |
93 | { | |
94 | template <class ValueType, | |
95 | class Container = csbl::vector<ValueType>, | |
96 | class Compare = std::less<typename Container::value_type> > | |
97 | class sync_priority_queue | |
98 | : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > | |
99 | { | |
100 | typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super; | |
101 | ||
102 | public: | |
103 | typedef ValueType value_type; | |
104 | //typedef typename super::value_type value_type; // fixme | |
105 | typedef typename super::underlying_queue_type underlying_queue_type; | |
106 | typedef typename super::size_type size_type; | |
107 | typedef typename super::op_status op_status; | |
108 | ||
109 | typedef chrono::steady_clock clock; | |
110 | protected: | |
111 | ||
112 | public: | |
113 | sync_priority_queue() {} | |
114 | ||
115 | ~sync_priority_queue() | |
116 | { | |
117 | if(!super::closed()) | |
118 | { | |
119 | super::close(); | |
120 | } | |
121 | } | |
122 | ||
123 | void push(const ValueType& elem); | |
124 | void push(BOOST_THREAD_RV_REF(ValueType) elem); | |
125 | ||
126 | queue_op_status try_push(const ValueType& elem); | |
127 | queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem); | |
128 | ||
129 | ValueType pull(); | |
130 | ||
131 | void pull(ValueType&); | |
132 | ||
133 | queue_op_status pull_until(const clock::time_point&, ValueType&); | |
134 | queue_op_status pull_for(const clock::duration&, ValueType&); | |
135 | ||
136 | queue_op_status try_pull(ValueType& elem); | |
137 | queue_op_status wait_pull(ValueType& elem); | |
138 | queue_op_status nonblocking_pull(ValueType&); | |
139 | ||
140 | private: | |
141 | void push(unique_lock<mutex>&, const ValueType& elem); | |
142 | void push(lock_guard<mutex>&, const ValueType& elem); | |
143 | void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem); | |
144 | void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem); | |
145 | ||
146 | queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem); | |
147 | queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem); | |
148 | ||
149 | ValueType pull(unique_lock<mutex>&); | |
150 | ValueType pull(lock_guard<mutex>&); | |
151 | ||
152 | void pull(unique_lock<mutex>&, ValueType&); | |
153 | void pull(lock_guard<mutex>&, ValueType&); | |
154 | ||
155 | queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem); | |
156 | queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem); | |
157 | ||
158 | queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem); | |
159 | ||
160 | queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&); | |
161 | ||
162 | sync_priority_queue(const sync_priority_queue&); | |
163 | sync_priority_queue& operator= (const sync_priority_queue&); | |
164 | sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue)); | |
165 | sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue)); | |
166 | }; //end class | |
167 | ||
168 | ||
169 | ////////////////////// | |
170 | template <class T, class Container,class Cmp> | |
171 | void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem) | |
172 | { | |
173 | super::throw_if_closed(lk); | |
174 | super::data_.push(elem); | |
175 | super::notify_not_empty_if_needed(lk); | |
176 | } | |
177 | template <class T, class Container,class Cmp> | |
178 | void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem) | |
179 | { | |
180 | super::throw_if_closed(lk); | |
181 | super::data_.push(elem); | |
182 | super::notify_not_empty_if_needed(lk); | |
183 | } | |
184 | template <class T, class Container,class Cmp> | |
185 | void sync_priority_queue<T,Container,Cmp>::push(const T& elem) | |
186 | { | |
187 | lock_guard<mutex> lk(super::mtx_); | |
188 | push(lk, elem); | |
189 | } | |
190 | ||
191 | ////////////////////// | |
192 | template <class T, class Container,class Cmp> | |
193 | void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem) | |
194 | { | |
195 | super::throw_if_closed(lk); | |
196 | super::data_.push(boost::move(elem)); | |
197 | super::notify_not_empty_if_needed(lk); | |
198 | } | |
199 | template <class T, class Container,class Cmp> | |
200 | void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem) | |
201 | { | |
202 | super::throw_if_closed(lk); | |
203 | super::data_.push(boost::move(elem)); | |
204 | super::notify_not_empty_if_needed(lk); | |
205 | } | |
206 | template <class T, class Container,class Cmp> | |
207 | void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem) | |
208 | { | |
209 | lock_guard<mutex> lk(super::mtx_); | |
210 | push(lk, boost::move(elem)); | |
211 | } | |
212 | ||
213 | ////////////////////// | |
214 | template <class T, class Container,class Cmp> | |
215 | queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem) | |
216 | { | |
217 | lock_guard<mutex> lk(super::mtx_); | |
218 | if (super::closed(lk)) return queue_op_status::closed; | |
219 | push(lk, elem); | |
220 | return queue_op_status::success; | |
221 | } | |
222 | ||
223 | ////////////////////// | |
224 | template <class T, class Container,class Cmp> | |
225 | queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem) | |
226 | { | |
227 | lock_guard<mutex> lk(super::mtx_); | |
228 | if (super::closed(lk)) return queue_op_status::closed; | |
229 | push(lk, boost::move(elem)); | |
230 | ||
231 | return queue_op_status::success; | |
232 | } | |
233 | ||
234 | ////////////////////// | |
235 | template <class T,class Container, class Cmp> | |
236 | T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&) | |
237 | { | |
238 | return super::data_.pull(); | |
239 | } | |
240 | template <class T,class Container, class Cmp> | |
241 | T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&) | |
242 | { | |
243 | return super::data_.pull(); | |
244 | } | |
245 | ||
246 | template <class T,class Container, class Cmp> | |
247 | T sync_priority_queue<T,Container,Cmp>::pull() | |
248 | { | |
249 | unique_lock<mutex> lk(super::mtx_); | |
250 | super::wait_until_not_empty(lk); | |
251 | return pull(lk); | |
252 | } | |
253 | ||
254 | ////////////////////// | |
255 | template <class T,class Container, class Cmp> | |
256 | void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem) | |
257 | { | |
258 | elem = super::data_.pull(); | |
259 | } | |
260 | template <class T,class Container, class Cmp> | |
261 | void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem) | |
262 | { | |
263 | elem = super::data_.pull(); | |
264 | } | |
265 | ||
266 | template <class T,class Container, class Cmp> | |
267 | void sync_priority_queue<T,Container,Cmp>::pull(T& elem) | |
268 | { | |
269 | unique_lock<mutex> lk(super::mtx_); | |
270 | super::wait_until_not_empty(lk); | |
271 | pull(lk, elem); | |
272 | } | |
273 | ||
274 | ////////////////////// | |
275 | template <class T, class Cont,class Cmp> | |
276 | queue_op_status | |
277 | sync_priority_queue<T,Cont,Cmp>::pull_until(const clock::time_point& tp, T& elem) | |
278 | { | |
279 | unique_lock<mutex> lk(super::mtx_); | |
280 | if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp)) | |
281 | return queue_op_status::timeout; | |
282 | pull(lk, elem); | |
283 | return queue_op_status::success; | |
284 | } | |
285 | ||
286 | ////////////////////// | |
287 | template <class T, class Cont,class Cmp> | |
288 | queue_op_status | |
289 | sync_priority_queue<T,Cont,Cmp>::pull_for(const clock::duration& dura, T& elem) | |
290 | { | |
291 | return pull_until(clock::now() + dura, elem); | |
292 | } | |
293 | ||
294 | ////////////////////// | |
295 | template <class T, class Container,class Cmp> | |
296 | queue_op_status | |
297 | sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem) | |
298 | { | |
299 | if (super::empty(lk)) | |
300 | { | |
301 | if (super::closed(lk)) return queue_op_status::closed; | |
302 | return queue_op_status::empty; | |
303 | } | |
304 | pull(lk, elem); | |
305 | return queue_op_status::success; | |
306 | } | |
307 | ||
308 | template <class T, class Container,class Cmp> | |
309 | queue_op_status | |
310 | sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem) | |
311 | { | |
312 | if (super::empty(lk)) | |
313 | { | |
314 | if (super::closed(lk)) return queue_op_status::closed; | |
315 | return queue_op_status::empty; | |
316 | } | |
317 | pull(lk, elem); | |
318 | return queue_op_status::success; | |
319 | } | |
320 | ||
321 | template <class T, class Container,class Cmp> | |
322 | queue_op_status | |
323 | sync_priority_queue<T,Container,Cmp>::try_pull(T& elem) | |
324 | { | |
325 | lock_guard<mutex> lk(super::mtx_); | |
326 | return try_pull(lk, elem); | |
327 | } | |
328 | ||
329 | ////////////////////// | |
330 | template <class T,class Container, class Cmp> | |
331 | queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem) | |
332 | { | |
333 | if (super::empty(lk)) | |
334 | { | |
335 | if (super::closed(lk)) return queue_op_status::closed; | |
336 | } | |
337 | bool has_been_closed = super::wait_until_not_empty_or_closed(lk); | |
338 | if (has_been_closed) return queue_op_status::closed; | |
339 | pull(lk, elem); | |
340 | return queue_op_status::success; | |
341 | } | |
342 | ||
343 | template <class T,class Container, class Cmp> | |
344 | queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem) | |
345 | { | |
346 | unique_lock<mutex> lk(super::mtx_); | |
347 | return wait_pull(lk, elem); | |
348 | } | |
349 | ||
350 | ////////////////////// | |
351 | ||
352 | template <class T,class Container, class Cmp> | |
353 | queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem) | |
354 | { | |
355 | unique_lock<mutex> lk(super::mtx_, try_to_lock); | |
356 | if (!lk.owns_lock()) return queue_op_status::busy; | |
357 | return try_pull(lk, elem); | |
358 | } | |
359 | ||
360 | ||
361 | ||
362 | } //end concurrent namespace | |
363 | ||
364 | using concurrent::sync_priority_queue; | |
365 | ||
366 | } //end boost namespace | |
367 | #include <boost/config/abi_suffix.hpp> | |
368 | ||
369 | #endif |