]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/thread/concurrent_queues/sync_priority_queue.hpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / boost / thread / concurrent_queues / sync_priority_queue.hpp
1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
9 #define BOOST_THREAD_SYNC_PRIORITY_QUEUE
10
11 #include <boost/thread/detail/config.hpp>
12
13 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
14 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/csbl/vector.hpp>
17 #include <boost/thread/detail/move.hpp>
18 #include <boost/thread/mutex.hpp>
19
20 #include <boost/atomic.hpp>
21 #include <boost/chrono/duration.hpp>
22 #include <boost/chrono/time_point.hpp>
23
24 #include <exception>
25 #include <queue>
26 #include <utility>
27
28 #include <boost/config/abi_prefix.hpp>
29
30 namespace boost
31 {
32 namespace detail {
33
34 template <
35 class Type,
36 class Container = csbl::vector<Type>,
37 class Compare = std::less<Type>
38 >
39 class priority_queue
40 {
41 private:
42 Container _elements;
43 Compare _compare;
44 public:
45 typedef Type value_type;
46 typedef typename Container::size_type size_type;
47
48 explicit priority_queue(const Compare& compare = Compare())
49 : _elements(), _compare(compare)
50 { }
51
52 size_type size() const
53 {
54 return _elements.size();
55 }
56
57 bool empty() const
58 {
59 return _elements.empty();
60 }
61
62 void push(Type const& element)
63 {
64 _elements.push_back(element);
65 std::push_heap(_elements.begin(), _elements.end(), _compare);
66 }
67 void push(BOOST_RV_REF(Type) element)
68 {
69 _elements.push_back(boost::move(element));
70 std::push_heap(_elements.begin(), _elements.end(), _compare);
71 }
72
73 void pop()
74 {
75 std::pop_heap(_elements.begin(), _elements.end(), _compare);
76 _elements.pop_back();
77 }
78 Type pull()
79 {
80 Type result = boost::move(_elements.front());
81 pop();
82 return boost::move(result);
83 }
84
85 Type const& top()
86 {
87 return _elements.front();
88 }
89 };
90 }
91
92 namespace concurrent
93 {
94 template <class ValueType,
95 class Container = csbl::vector<ValueType>,
96 class Compare = std::less<typename Container::value_type> >
97 class sync_priority_queue
98 : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
99 {
100 typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super;
101
102 public:
103 typedef ValueType value_type;
104 //typedef typename super::value_type value_type; // fixme
105 typedef typename super::underlying_queue_type underlying_queue_type;
106 typedef typename super::size_type size_type;
107 typedef typename super::op_status op_status;
108
109 typedef chrono::steady_clock clock;
110 protected:
111
112 public:
113 sync_priority_queue() {}
114
115 ~sync_priority_queue()
116 {
117 if(!super::closed())
118 {
119 super::close();
120 }
121 }
122
123 void push(const ValueType& elem);
124 void push(BOOST_THREAD_RV_REF(ValueType) elem);
125
126 queue_op_status try_push(const ValueType& elem);
127 queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);
128
129 ValueType pull();
130
131 void pull(ValueType&);
132
133 template <class WClock, class Duration>
134 queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&);
135 template <class Rep, class Period>
136 queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&);
137
138 queue_op_status try_pull(ValueType& elem);
139 queue_op_status wait_pull(ValueType& elem);
140 queue_op_status nonblocking_pull(ValueType&);
141
142 private:
143 void push(unique_lock<mutex>&, const ValueType& elem);
144 void push(lock_guard<mutex>&, const ValueType& elem);
145 void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
146 void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
147
148 queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
149 queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
150
151 ValueType pull(unique_lock<mutex>&);
152 ValueType pull(lock_guard<mutex>&);
153
154 void pull(unique_lock<mutex>&, ValueType&);
155 void pull(lock_guard<mutex>&, ValueType&);
156
157 queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
158 queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);
159
160 queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);
161
162 queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);
163
164 sync_priority_queue(const sync_priority_queue&);
165 sync_priority_queue& operator= (const sync_priority_queue&);
166 sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
167 sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
168 }; //end class
169
170
171 //////////////////////
172 template <class T, class Container,class Cmp>
173 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
174 {
175 super::throw_if_closed(lk);
176 super::data_.push(elem);
177 super::notify_not_empty_if_needed(lk);
178 }
179 template <class T, class Container,class Cmp>
180 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
181 {
182 super::throw_if_closed(lk);
183 super::data_.push(elem);
184 super::notify_not_empty_if_needed(lk);
185 }
186 template <class T, class Container,class Cmp>
187 void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
188 {
189 lock_guard<mutex> lk(super::mtx_);
190 push(lk, elem);
191 }
192
193 //////////////////////
194 template <class T, class Container,class Cmp>
195 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
196 {
197 super::throw_if_closed(lk);
198 super::data_.push(boost::move(elem));
199 super::notify_not_empty_if_needed(lk);
200 }
201 template <class T, class Container,class Cmp>
202 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
203 {
204 super::throw_if_closed(lk);
205 super::data_.push(boost::move(elem));
206 super::notify_not_empty_if_needed(lk);
207 }
208 template <class T, class Container,class Cmp>
209 void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
210 {
211 lock_guard<mutex> lk(super::mtx_);
212 push(lk, boost::move(elem));
213 }
214
215 //////////////////////
216 template <class T, class Container,class Cmp>
217 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
218 {
219 lock_guard<mutex> lk(super::mtx_);
220 if (super::closed(lk)) return queue_op_status::closed;
221 push(lk, elem);
222 return queue_op_status::success;
223 }
224
225 //////////////////////
226 template <class T, class Container,class Cmp>
227 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
228 {
229 lock_guard<mutex> lk(super::mtx_);
230 if (super::closed(lk)) return queue_op_status::closed;
231 push(lk, boost::move(elem));
232
233 return queue_op_status::success;
234 }
235
236 //////////////////////
237 template <class T,class Container, class Cmp>
238 T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
239 {
240 return super::data_.pull();
241 }
242 template <class T,class Container, class Cmp>
243 T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
244 {
245 return super::data_.pull();
246 }
247
248 template <class T,class Container, class Cmp>
249 T sync_priority_queue<T,Container,Cmp>::pull()
250 {
251 unique_lock<mutex> lk(super::mtx_);
252 super::wait_until_not_empty(lk);
253 return pull(lk);
254 }
255
256 //////////////////////
257 template <class T,class Container, class Cmp>
258 void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
259 {
260 elem = super::data_.pull();
261 }
262 template <class T,class Container, class Cmp>
263 void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
264 {
265 elem = super::data_.pull();
266 }
267
268 template <class T,class Container, class Cmp>
269 void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
270 {
271 unique_lock<mutex> lk(super::mtx_);
272 super::wait_until_not_empty(lk);
273 pull(lk, elem);
274 }
275
276 //////////////////////
277 template <class T, class Cont,class Cmp>
278 template <class WClock, class Duration>
279 queue_op_status
280 sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem)
281 {
282 unique_lock<mutex> lk(super::mtx_);
283 if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
284 return queue_op_status::timeout;
285 pull(lk, elem);
286 return queue_op_status::success;
287 }
288
289 //////////////////////
290 template <class T, class Cont,class Cmp>
291 template <class Rep, class Period>
292 queue_op_status
293 sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem)
294 {
295 return pull_until(clock::now() + dura, elem);
296 }
297
298 //////////////////////
299 template <class T, class Container,class Cmp>
300 queue_op_status
301 sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
302 {
303 if (super::empty(lk))
304 {
305 if (super::closed(lk)) return queue_op_status::closed;
306 return queue_op_status::empty;
307 }
308 pull(lk, elem);
309 return queue_op_status::success;
310 }
311
312 template <class T, class Container,class Cmp>
313 queue_op_status
314 sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
315 {
316 if (super::empty(lk))
317 {
318 if (super::closed(lk)) return queue_op_status::closed;
319 return queue_op_status::empty;
320 }
321 pull(lk, elem);
322 return queue_op_status::success;
323 }
324
325 template <class T, class Container,class Cmp>
326 queue_op_status
327 sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
328 {
329 lock_guard<mutex> lk(super::mtx_);
330 return try_pull(lk, elem);
331 }
332
333 //////////////////////
334 template <class T,class Container, class Cmp>
335 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
336 {
337 if (super::empty(lk))
338 {
339 if (super::closed(lk)) return queue_op_status::closed;
340 }
341 bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
342 if (has_been_closed) return queue_op_status::closed;
343 pull(lk, elem);
344 return queue_op_status::success;
345 }
346
347 template <class T,class Container, class Cmp>
348 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
349 {
350 unique_lock<mutex> lk(super::mtx_);
351 return wait_pull(lk, elem);
352 }
353
354 //////////////////////
355
356 template <class T,class Container, class Cmp>
357 queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
358 {
359 unique_lock<mutex> lk(super::mtx_, try_to_lock);
360 if (!lk.owns_lock()) return queue_op_status::busy;
361 return try_pull(lk, elem);
362 }
363
364
365
366 } //end concurrent namespace
367
368 using concurrent::sync_priority_queue;
369
370 } //end boost namespace
371 #include <boost/config/abi_suffix.hpp>
372
373 #endif