]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/thread/include/boost/thread/concurrent_queues/sync_timed_queue.hpp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / boost / libs / thread / include / boost / thread / concurrent_queues / sync_timed_queue.hpp
CommitLineData
7c673cae
FG
1// Copyright (C) 2014 Ian Forbed
2// Copyright (C) 2014 Vicente J. Botet Escriba
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
9#define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
10
11#include <boost/thread/detail/config.hpp>
12
13#include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
14#include <boost/chrono/duration.hpp>
15#include <boost/chrono/time_point.hpp>
16#include <boost/chrono/system_clocks.hpp>
17#include <boost/chrono/chrono_io.hpp>
18
19#include <boost/config/abi_prefix.hpp>
20
21namespace boost
22{
23namespace concurrent
24{
25namespace detail
26{
27 template <class T, class Clock = chrono::steady_clock>
28 struct scheduled_type
29 {
30 typedef T value_type;
31 typedef Clock clock;
32 typedef typename clock::time_point time_point;
33 T data;
34 time_point time;
35
36 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
37
38 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
39 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
40
41 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
42 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
43 data = other.data;
44 time = other.time;
45 return *this;
46 }
47
48 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
49 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
50 data = boost::move(other.data);
51 time = other.time;
52 return *this;
53 }
54
55 bool time_not_reached() const
56 {
57 return time > clock::now();
58 }
59
60 bool operator <(const scheduled_type<T> other) const
61 {
62 return this->time > other.time;
63 }
64 }; //end struct
65
66} //end detail namespace
67
68 template <class T, class Clock = chrono::steady_clock>
69 class sync_timed_queue
70 : private sync_priority_queue<detail::scheduled_type<T, Clock> >
71 {
72 typedef detail::scheduled_type<T, Clock> stype;
73 typedef sync_priority_queue<stype> super;
74 public:
75 typedef T value_type;
76 typedef Clock clock;
77 typedef typename clock::duration duration;
78 typedef typename clock::time_point time_point;
79 typedef typename super::underlying_queue_type underlying_queue_type;
80 typedef typename super::size_type size_type;
81 typedef typename super::op_status op_status;
82
83 sync_timed_queue() : super() {};
84 ~sync_timed_queue() {}
85
86 using super::size;
87 using super::empty;
88 using super::full;
89 using super::close;
90 using super::closed;
91
92 T pull();
93 void pull(T& elem);
94
95 template <class Duration>
96 queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
97 template <class Rep, class Period>
98 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
99
100 queue_op_status try_pull(T& elem);
101 queue_op_status wait_pull(T& elem);
102 queue_op_status nonblocking_pull(T& elem);
103
104 template <class Duration>
105 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
106 template <class Rep, class Period>
107 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
108
109 template <class Duration>
110 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
111 template <class Rep, class Period>
112 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
113
114 template <class Duration>
115 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
116 template <class Rep, class Period>
117 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
118
119 template <class Duration>
120 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
121 template <class Rep, class Period>
122 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
123
124 private:
125 T pull(unique_lock<mutex>&);
126 T pull(lock_guard<mutex>&);
127
128 void pull(unique_lock<mutex>&, T& elem);
129 void pull(lock_guard<mutex>&, T& elem);
130
131 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
132 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
133
134 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
135
136 bool wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>&);
137 T pull_when_time_reached(unique_lock<mutex>&);
138 template <class Duration>
139 queue_op_status pull_when_time_reached_until(unique_lock<mutex>&, chrono::time_point<clock,Duration> const& tp, T& elem);
140 bool time_not_reached(unique_lock<mutex>&);
141 bool time_not_reached(lock_guard<mutex>&);
142 bool empty_or_time_not_reached(unique_lock<mutex>&);
143 bool empty_or_time_not_reached(lock_guard<mutex>&);
144
145 sync_timed_queue(const sync_timed_queue&);
146 sync_timed_queue& operator=(const sync_timed_queue&);
147 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
148 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
149 }; //end class
150
151
152 template <class T, class Clock>
153 template <class Duration>
154 void sync_timed_queue<T, Clock>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
155 {
156 super::push(stype(elem,tp));
157 }
158
159 template <class T, class Clock>
160 template <class Rep, class Period>
161 void sync_timed_queue<T, Clock>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
162 {
163 push(elem, clock::now() + dura);
164 }
165
166 template <class T, class Clock>
167 template <class Duration>
168 void sync_timed_queue<T, Clock>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
169 {
170 super::push(stype(boost::move(elem),tp));
171 }
172
173 template <class T, class Clock>
174 template <class Rep, class Period>
175 void sync_timed_queue<T, Clock>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
176 {
177 push(boost::move(elem), clock::now() + dura);
178 }
179
180
181
182 template <class T, class Clock>
183 template <class Duration>
184 queue_op_status sync_timed_queue<T, Clock>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
185 {
186 return super::try_push(stype(elem,tp));
187 }
188
189 template <class T, class Clock>
190 template <class Rep, class Period>
191 queue_op_status sync_timed_queue<T, Clock>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
192 {
193 return try_push(elem,clock::now() + dura);
194 }
195
196 template <class T, class Clock>
197 template <class Duration>
198 queue_op_status sync_timed_queue<T, Clock>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
199 {
200 return super::try_push(stype(boost::move(elem), tp));
201 }
202
203 template <class T, class Clock>
204 template <class Rep, class Period>
205 queue_op_status sync_timed_queue<T, Clock>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
206 {
207 return try_push(boost::move(elem), clock::now() + dura);
208 }
209
210 ///////////////////////////
211 template <class T, class Clock>
212 bool sync_timed_queue<T, Clock>::time_not_reached(unique_lock<mutex>&)
213 {
214 return super::data_.top().time_not_reached();
215 }
216
217 template <class T, class Clock>
218 bool sync_timed_queue<T, Clock>::time_not_reached(lock_guard<mutex>&)
219 {
220 return super::data_.top().time_not_reached();
221 }
222
223 ///////////////////////////
224 template <class T, class Clock>
225 bool sync_timed_queue<T, Clock>::wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>& lk)
226 {
227 for (;;)
228 {
229 if (super::closed(lk)) return true;
230 while (! super::empty(lk)) {
231 if (! time_not_reached(lk)) return false;
232 time_point tp = super::data_.top().time;
233 super::not_empty_.wait_until(lk, tp);
234 if (super::closed(lk)) return true;
235 }
236 if (super::closed(lk)) return true;
237 super::not_empty_.wait(lk);
238 }
239 //return false;
240 }
241
242 ///////////////////////////
243 template <class T, class Clock>
244 T sync_timed_queue<T, Clock>::pull_when_time_reached(unique_lock<mutex>& lk)
245 {
246 while (time_not_reached(lk))
247 {
248 super::throw_if_closed(lk);
249 time_point tp = super::data_.top().time;
250 super::not_empty_.wait_until(lk,tp);
251 super::wait_until_not_empty(lk);
252 }
253 return pull(lk);
254 }
255
256 template <class T, class Clock>
257 template <class Duration>
258 queue_op_status
259 sync_timed_queue<T, Clock>::pull_when_time_reached_until(unique_lock<mutex>& lk, chrono::time_point<clock,Duration> const& tp, T& elem)
260 {
261 chrono::time_point<clock,Duration> tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time;
262 while (time_not_reached(lk))
263 {
264 super::throw_if_closed(lk);
265 if (cv_status::timeout == super::not_empty_.wait_until(lk, tpmin)) {
266 if (time_not_reached(lk)) return queue_op_status::not_ready;
267 return queue_op_status::timeout;
268 }
269 }
270 pull(lk, elem);
271 return queue_op_status::success;
272 }
273
274 ///////////////////////////
275 template <class T, class Clock>
276 bool sync_timed_queue<T, Clock>::empty_or_time_not_reached(unique_lock<mutex>& lk)
277 {
278 if ( super::empty(lk) ) return true;
279 if ( time_not_reached(lk) ) return true;
280 return false;
281 }
282 template <class T, class Clock>
283 bool sync_timed_queue<T, Clock>::empty_or_time_not_reached(lock_guard<mutex>& lk)
284 {
285 if ( super::empty(lk) ) return true;
286 if ( time_not_reached(lk) ) return true;
287 return false;
288 }
289
290 ///////////////////////////
291 template <class T, class Clock>
292 T sync_timed_queue<T, Clock>::pull(unique_lock<mutex>&)
293 {
294#if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
295 return boost::move(super::data_.pull().data);
296#else
297 return super::data_.pull().data;
298#endif
299 }
300
301 template <class T, class Clock>
302 T sync_timed_queue<T, Clock>::pull(lock_guard<mutex>&)
303 {
304#if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
305 return boost::move(super::data_.pull().data);
306#else
307 return super::data_.pull().data;
308#endif
309 }
310 template <class T, class Clock>
311 T sync_timed_queue<T, Clock>::pull()
312 {
313 unique_lock<mutex> lk(super::mtx_);
314 super::wait_until_not_empty(lk);
315 return pull_when_time_reached(lk);
316 }
317
318 ///////////////////////////
319 template <class T, class Clock>
320 void sync_timed_queue<T, Clock>::pull(unique_lock<mutex>&, T& elem)
321 {
322#if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
323 elem = boost::move(super::data_.pull().data);
324#else
325 elem = super::data_.pull().data;
326#endif
327 }
328
329 template <class T, class Clock>
330 void sync_timed_queue<T, Clock>::pull(lock_guard<mutex>&, T& elem)
331 {
332#if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
333 elem = boost::move(super::data_.pull().data);
334#else
335 elem = super::data_.pull().data;
336#endif
337 }
338
339 template <class T, class Clock>
340 void sync_timed_queue<T, Clock>::pull(T& elem)
341 {
342 unique_lock<mutex> lk(super::mtx_);
343 super::wait_until_not_empty(lk);
344 elem = pull_when_time_reached(lk);
345 }
346
347 //////////////////////
348 template <class T, class Clock>
349 template <class Duration>
350 queue_op_status
351 sync_timed_queue<T, Clock>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
352 {
353 unique_lock<mutex> lk(super::mtx_);
354
355 if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
356 return queue_op_status::timeout;
357 return pull_when_time_reached_until(lk, tp, elem);
358 }
359
360 //////////////////////
361 template <class T, class Clock>
362 template <class Rep, class Period>
363 queue_op_status
364 sync_timed_queue<T, Clock>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
365 {
366 return pull_until(clock::now() + dura, elem);
367 }
368
369 ///////////////////////////
370 template <class T, class Clock>
371 queue_op_status sync_timed_queue<T, Clock>::try_pull(unique_lock<mutex>& lk, T& elem)
372 {
373 if ( super::empty(lk) )
374 {
375 if (super::closed(lk)) return queue_op_status::closed;
376 return queue_op_status::empty;
377 }
378 if ( time_not_reached(lk) )
379 {
380 if (super::closed(lk)) return queue_op_status::closed;
381 return queue_op_status::not_ready;
382 }
383
384 pull(lk, elem);
385 return queue_op_status::success;
386 }
387 template <class T, class Clock>
388 queue_op_status sync_timed_queue<T, Clock>::try_pull(lock_guard<mutex>& lk, T& elem)
389 {
390 if ( super::empty(lk) )
391 {
392 if (super::closed(lk)) return queue_op_status::closed;
393 return queue_op_status::empty;
394 }
395 if ( time_not_reached(lk) )
396 {
397 if (super::closed(lk)) return queue_op_status::closed;
398 return queue_op_status::not_ready;
399 }
400 pull(lk, elem);
401 return queue_op_status::success;
402 }
403
404 template <class T, class Clock>
405 queue_op_status sync_timed_queue<T, Clock>::try_pull(T& elem)
406 {
407 lock_guard<mutex> lk(super::mtx_);
408 return try_pull(lk, elem);
409 }
410
411 ///////////////////////////
412 template <class T, class Clock>
413 queue_op_status sync_timed_queue<T, Clock>::wait_pull(unique_lock<mutex>& lk, T& elem)
414 {
415 if (super::empty(lk))
416 {
417 if (super::closed(lk)) return queue_op_status::closed;
418 }
419 bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk);
420 if (has_been_closed) return queue_op_status::closed;
421 pull(lk, elem);
422 return queue_op_status::success;
423 }
424
425 template <class T, class Clock>
426 queue_op_status sync_timed_queue<T, Clock>::wait_pull(T& elem)
427 {
428 unique_lock<mutex> lk(super::mtx_);
429 return wait_pull(lk, elem);
430 }
431
432// ///////////////////////////
433// template <class T, class Clock>
434// queue_op_status sync_timed_queue<T, Clock>::wait_pull(unique_lock<mutex> &lk, T& elem)
435// {
436// if (super::empty(lk))
437// {
438// if (super::closed(lk)) return queue_op_status::closed;
439// }
440// bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
441// if (has_been_closed) return queue_op_status::closed;
442// pull(lk, elem);
443// return queue_op_status::success;
444// }
445// template <class T>
446// queue_op_status sync_timed_queue<T, Clock>::wait_pull(T& elem)
447// {
448// unique_lock<mutex> lk(super::mtx_);
449// return wait_pull(lk, elem);
450// }
451
452 ///////////////////////////
453 template <class T, class Clock>
454 queue_op_status sync_timed_queue<T, Clock>::nonblocking_pull(T& elem)
455 {
456 unique_lock<mutex> lk(super::mtx_, try_to_lock);
457 if (! lk.owns_lock()) return queue_op_status::busy;
458 return try_pull(lk, elem);
459 }
460
461} //end concurrent namespace
462
463using concurrent::sync_timed_queue;
464
465} //end boost namespace
466#include <boost/config/abi_suffix.hpp>
467
468#endif