]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/thread/include/boost/thread/concurrent_queues/sync_priority_queue.hpp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / boost / libs / thread / include / boost / thread / concurrent_queues / sync_priority_queue.hpp
CommitLineData
7c673cae
FG
1// Copyright (C) 2014 Ian Forbed
2// Copyright (C) 2014 Vicente J. Botet Escriba
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
9#define BOOST_THREAD_SYNC_PRIORITY_QUEUE
10
11#include <boost/thread/detail/config.hpp>
12
13#include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
14#include <boost/thread/concurrent_queues/queue_op_status.hpp>
15#include <boost/thread/condition_variable.hpp>
16#include <boost/thread/csbl/vector.hpp>
17#include <boost/thread/detail/move.hpp>
18#include <boost/thread/mutex.hpp>
19
20#include <boost/atomic.hpp>
21#include <boost/chrono/duration.hpp>
22#include <boost/chrono/time_point.hpp>
23
24#include <exception>
25#include <queue>
26#include <utility>
27
28#include <boost/config/abi_prefix.hpp>
29
30namespace boost
31{
32namespace detail {
33
34 template <
35 class Type,
36 class Container = csbl::vector<Type>,
37 class Compare = std::less<Type>
38 >
39 class priority_queue
40 {
41 private:
42 Container _elements;
43 Compare _compare;
44 public:
45 typedef Type value_type;
46 typedef typename Container::size_type size_type;
47
48 explicit priority_queue(const Compare& compare = Compare())
49 : _elements(), _compare(compare)
50 { }
51
52 size_type size() const
53 {
54 return _elements.size();
55 }
56
57 bool empty() const
58 {
59 return _elements.empty();
60 }
61
62 void push(Type const& element)
63 {
64 _elements.push_back(element);
65 std::push_heap(_elements.begin(), _elements.end(), _compare);
66 }
67 void push(BOOST_RV_REF(Type) element)
68 {
69 _elements.push_back(boost::move(element));
70 std::push_heap(_elements.begin(), _elements.end(), _compare);
71 }
72
73 void pop()
74 {
75 std::pop_heap(_elements.begin(), _elements.end(), _compare);
76 _elements.pop_back();
77 }
78 Type pull()
79 {
80 Type result = boost::move(_elements.front());
81 pop();
82 return boost::move(result);
83 }
84
85 Type const& top()
86 {
87 return _elements.front();
88 }
89 };
90}
91
92namespace concurrent
93{
94 template <class ValueType,
95 class Container = csbl::vector<ValueType>,
96 class Compare = std::less<typename Container::value_type> >
97 class sync_priority_queue
98 : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
99 {
100 typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super;
101
102 public:
103 typedef ValueType value_type;
104 //typedef typename super::value_type value_type; // fixme
105 typedef typename super::underlying_queue_type underlying_queue_type;
106 typedef typename super::size_type size_type;
107 typedef typename super::op_status op_status;
108
109 typedef chrono::steady_clock clock;
110 protected:
111
112 public:
113 sync_priority_queue() {}
114
115 ~sync_priority_queue()
116 {
117 if(!super::closed())
118 {
119 super::close();
120 }
121 }
122
123 void push(const ValueType& elem);
124 void push(BOOST_THREAD_RV_REF(ValueType) elem);
125
126 queue_op_status try_push(const ValueType& elem);
127 queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);
128
129 ValueType pull();
130
131 void pull(ValueType&);
132
133 queue_op_status pull_until(const clock::time_point&, ValueType&);
134 queue_op_status pull_for(const clock::duration&, ValueType&);
135
136 queue_op_status try_pull(ValueType& elem);
137 queue_op_status wait_pull(ValueType& elem);
138 queue_op_status nonblocking_pull(ValueType&);
139
140 private:
141 void push(unique_lock<mutex>&, const ValueType& elem);
142 void push(lock_guard<mutex>&, const ValueType& elem);
143 void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
144 void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
145
146 queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
147 queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
148
149 ValueType pull(unique_lock<mutex>&);
150 ValueType pull(lock_guard<mutex>&);
151
152 void pull(unique_lock<mutex>&, ValueType&);
153 void pull(lock_guard<mutex>&, ValueType&);
154
155 queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
156 queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);
157
158 queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);
159
160 queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);
161
162 sync_priority_queue(const sync_priority_queue&);
163 sync_priority_queue& operator= (const sync_priority_queue&);
164 sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
165 sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
166 }; //end class
167
168
169 //////////////////////
170 template <class T, class Container,class Cmp>
171 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
172 {
173 super::throw_if_closed(lk);
174 super::data_.push(elem);
175 super::notify_not_empty_if_needed(lk);
176 }
177 template <class T, class Container,class Cmp>
178 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
179 {
180 super::throw_if_closed(lk);
181 super::data_.push(elem);
182 super::notify_not_empty_if_needed(lk);
183 }
184 template <class T, class Container,class Cmp>
185 void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
186 {
187 lock_guard<mutex> lk(super::mtx_);
188 push(lk, elem);
189 }
190
191 //////////////////////
192 template <class T, class Container,class Cmp>
193 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
194 {
195 super::throw_if_closed(lk);
196 super::data_.push(boost::move(elem));
197 super::notify_not_empty_if_needed(lk);
198 }
199 template <class T, class Container,class Cmp>
200 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
201 {
202 super::throw_if_closed(lk);
203 super::data_.push(boost::move(elem));
204 super::notify_not_empty_if_needed(lk);
205 }
206 template <class T, class Container,class Cmp>
207 void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
208 {
209 lock_guard<mutex> lk(super::mtx_);
210 push(lk, boost::move(elem));
211 }
212
213 //////////////////////
214 template <class T, class Container,class Cmp>
215 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
216 {
217 lock_guard<mutex> lk(super::mtx_);
218 if (super::closed(lk)) return queue_op_status::closed;
219 push(lk, elem);
220 return queue_op_status::success;
221 }
222
223 //////////////////////
224 template <class T, class Container,class Cmp>
225 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
226 {
227 lock_guard<mutex> lk(super::mtx_);
228 if (super::closed(lk)) return queue_op_status::closed;
229 push(lk, boost::move(elem));
230
231 return queue_op_status::success;
232 }
233
234 //////////////////////
235 template <class T,class Container, class Cmp>
236 T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
237 {
238 return super::data_.pull();
239 }
240 template <class T,class Container, class Cmp>
241 T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
242 {
243 return super::data_.pull();
244 }
245
246 template <class T,class Container, class Cmp>
247 T sync_priority_queue<T,Container,Cmp>::pull()
248 {
249 unique_lock<mutex> lk(super::mtx_);
250 super::wait_until_not_empty(lk);
251 return pull(lk);
252 }
253
254 //////////////////////
255 template <class T,class Container, class Cmp>
256 void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
257 {
258 elem = super::data_.pull();
259 }
260 template <class T,class Container, class Cmp>
261 void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
262 {
263 elem = super::data_.pull();
264 }
265
266 template <class T,class Container, class Cmp>
267 void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
268 {
269 unique_lock<mutex> lk(super::mtx_);
270 super::wait_until_not_empty(lk);
271 pull(lk, elem);
272 }
273
274 //////////////////////
275 template <class T, class Cont,class Cmp>
276 queue_op_status
277 sync_priority_queue<T,Cont,Cmp>::pull_until(const clock::time_point& tp, T& elem)
278 {
279 unique_lock<mutex> lk(super::mtx_);
280 if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
281 return queue_op_status::timeout;
282 pull(lk, elem);
283 return queue_op_status::success;
284 }
285
286 //////////////////////
287 template <class T, class Cont,class Cmp>
288 queue_op_status
289 sync_priority_queue<T,Cont,Cmp>::pull_for(const clock::duration& dura, T& elem)
290 {
291 return pull_until(clock::now() + dura, elem);
292 }
293
294 //////////////////////
295 template <class T, class Container,class Cmp>
296 queue_op_status
297 sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
298 {
299 if (super::empty(lk))
300 {
301 if (super::closed(lk)) return queue_op_status::closed;
302 return queue_op_status::empty;
303 }
304 pull(lk, elem);
305 return queue_op_status::success;
306 }
307
308 template <class T, class Container,class Cmp>
309 queue_op_status
310 sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
311 {
312 if (super::empty(lk))
313 {
314 if (super::closed(lk)) return queue_op_status::closed;
315 return queue_op_status::empty;
316 }
317 pull(lk, elem);
318 return queue_op_status::success;
319 }
320
321 template <class T, class Container,class Cmp>
322 queue_op_status
323 sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
324 {
325 lock_guard<mutex> lk(super::mtx_);
326 return try_pull(lk, elem);
327 }
328
329 //////////////////////
330 template <class T,class Container, class Cmp>
331 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
332 {
333 if (super::empty(lk))
334 {
335 if (super::closed(lk)) return queue_op_status::closed;
336 }
337 bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
338 if (has_been_closed) return queue_op_status::closed;
339 pull(lk, elem);
340 return queue_op_status::success;
341 }
342
343 template <class T,class Container, class Cmp>
344 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
345 {
346 unique_lock<mutex> lk(super::mtx_);
347 return wait_pull(lk, elem);
348 }
349
350 //////////////////////
351
352 template <class T,class Container, class Cmp>
353 queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
354 {
355 unique_lock<mutex> lk(super::mtx_, try_to_lock);
356 if (!lk.owns_lock()) return queue_op_status::busy;
357 return try_pull(lk, elem);
358 }
359
360
361
362} //end concurrent namespace
363
364using concurrent::sync_priority_queue;
365
366} //end boost namespace
367#include <boost/config/abi_suffix.hpp>
368
369#endif