]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/thread/concurrent_queues/sync_timed_queue.hpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / boost / thread / concurrent_queues / sync_timed_queue.hpp
1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
10
11 #include <boost/thread/detail/config.hpp>
12
13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
14 #include <boost/chrono/duration.hpp>
15 #include <boost/chrono/time_point.hpp>
16 #include <boost/chrono/system_clocks.hpp>
17 #include <boost/chrono/chrono_io.hpp>
18
19 #include <boost/config/abi_prefix.hpp>
20
21 namespace boost
22 {
23 namespace concurrent
24 {
25 namespace detail
26 {
27 // fixme: shouldn't the timepoint be configurable
28 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
29 struct scheduled_type
30 {
31 typedef T value_type;
32 typedef Clock clock;
33 typedef TimePoint time_point;
34 T data;
35 time_point time;
36
37 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
38
39 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
40 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
41
42 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
43 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
44 data = other.data;
45 time = other.time;
46 return *this;
47 }
48
49 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
50 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
51 data = boost::move(other.data);
52 time = other.time;
53 return *this;
54 }
55
56 bool time_not_reached() const
57 {
58 return time > clock::now();
59 }
60
61 bool operator <(const scheduled_type & other) const
62 {
63 return this->time > other.time;
64 }
65 }; //end struct
66
67 } //end detail namespace
68
69 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
70 class sync_timed_queue
71 : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
72 {
73 typedef detail::scheduled_type<T, Clock, TimePoint> stype;
74 typedef sync_priority_queue<stype> super;
75 public:
76 typedef T value_type;
77 typedef Clock clock;
78 typedef typename clock::duration duration;
79 typedef typename clock::time_point time_point;
80 typedef typename super::underlying_queue_type underlying_queue_type;
81 typedef typename super::size_type size_type;
82 typedef typename super::op_status op_status;
83
84 sync_timed_queue() : super() {};
85 ~sync_timed_queue() {}
86
87 using super::size;
88 using super::empty;
89 using super::full;
90 using super::close;
91 using super::closed;
92
93 T pull();
94 void pull(T& elem);
95
96 template <class WClock, class Duration>
97 queue_op_status pull_until(chrono::time_point<WClock,Duration> const& tp, T& elem);
98 template <class Rep, class Period>
99 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
100
101 queue_op_status try_pull(T& elem);
102 queue_op_status wait_pull(T& elem);
103 queue_op_status nonblocking_pull(T& elem);
104
105 template <class Duration>
106 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
107 template <class Rep, class Period>
108 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
109
110 template <class Duration>
111 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
112 template <class Rep, class Period>
113 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
114
115 template <class Duration>
116 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
117 template <class Rep, class Period>
118 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
119
120 template <class Duration>
121 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
122 template <class Rep, class Period>
123 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
124
125 private:
126 T pull(unique_lock<mutex>&);
127 T pull(lock_guard<mutex>&);
128
129 void pull(unique_lock<mutex>&, T& elem);
130 void pull(lock_guard<mutex>&, T& elem);
131
132 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
133 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
134
135 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
136
137 bool wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>&);
138 T pull_when_time_reached(unique_lock<mutex>&);
139 template <class WClock, class Duration>
140 queue_op_status pull_when_time_reached_until(unique_lock<mutex>&, chrono::time_point<WClock,Duration> const& tp, T& elem);
141 bool time_not_reached(unique_lock<mutex>&);
142 bool time_not_reached(lock_guard<mutex>&);
143 bool empty_or_time_not_reached(unique_lock<mutex>&);
144 bool empty_or_time_not_reached(lock_guard<mutex>&);
145
146 sync_timed_queue(const sync_timed_queue&);
147 sync_timed_queue& operator=(const sync_timed_queue&);
148 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
149 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
150 }; //end class
151
152
153 template <class T, class Clock, class TimePoint>
154 template <class Duration>
155 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
156 {
157 super::push(stype(elem,tp));
158 }
159
160 template <class T, class Clock, class TimePoint>
161 template <class Rep, class Period>
162 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
163 {
164 push(elem, clock::now() + dura);
165 }
166
167 template <class T, class Clock, class TimePoint>
168 template <class Duration>
169 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
170 {
171 super::push(stype(boost::move(elem),tp));
172 }
173
174 template <class T, class Clock, class TimePoint>
175 template <class Rep, class Period>
176 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
177 {
178 push(boost::move(elem), clock::now() + dura);
179 }
180
181
182
183 template <class T, class Clock, class TimePoint>
184 template <class Duration>
185 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
186 {
187 return super::try_push(stype(elem,tp));
188 }
189
190 template <class T, class Clock, class TimePoint>
191 template <class Rep, class Period>
192 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
193 {
194 return try_push(elem,clock::now() + dura);
195 }
196
197 template <class T, class Clock, class TimePoint>
198 template <class Duration>
199 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
200 {
201 return super::try_push(stype(boost::move(elem), tp));
202 }
203
204 template <class T, class Clock, class TimePoint>
205 template <class Rep, class Period>
206 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
207 {
208 return try_push(boost::move(elem), clock::now() + dura);
209 }
210
211 ///////////////////////////
212 template <class T, class Clock, class TimePoint>
213 bool sync_timed_queue<T, Clock, TimePoint>::time_not_reached(unique_lock<mutex>&)
214 {
215 return super::data_.top().time_not_reached();
216 }
217
218 template <class T, class Clock, class TimePoint>
219 bool sync_timed_queue<T, Clock, TimePoint>::time_not_reached(lock_guard<mutex>&)
220 {
221 return super::data_.top().time_not_reached();
222 }
223
224 ///////////////////////////
225 template <class T, class Clock, class TimePoint>
226 bool sync_timed_queue<T, Clock, TimePoint>::wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>& lk)
227 {
228 for (;;)
229 {
230 if (super::closed(lk)) return true;
231 while (! super::empty(lk)) {
232 if (! time_not_reached(lk)) return false;
233 time_point tp = super::data_.top().time;
234 super::not_empty_.wait_until(lk, tp);
235 if (super::closed(lk)) return true;
236 }
237 if (super::closed(lk)) return true;
238 super::not_empty_.wait(lk);
239 }
240 //return false;
241 }
242
243 ///////////////////////////
244 template <class T, class Clock, class TimePoint>
245 T sync_timed_queue<T, Clock, TimePoint>::pull_when_time_reached(unique_lock<mutex>& lk)
246 {
247 while (time_not_reached(lk))
248 {
249 super::throw_if_closed(lk);
250 time_point tp = super::data_.top().time;
251 super::not_empty_.wait_until(lk,tp);
252 super::wait_until_not_empty(lk);
253 }
254 return pull(lk);
255 }
256
257 template <class T, class Clock, class TimePoint>
258 template <class WClock, class Duration>
259 queue_op_status
260 sync_timed_queue<T, Clock, TimePoint>::pull_when_time_reached_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp, T& elem)
261 {
262 chrono::time_point<WClock, Duration> tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time;
263 while (time_not_reached(lk))
264 {
265 super::throw_if_closed(lk);
266 if (cv_status::timeout == super::not_empty_.wait_until(lk, tpmin)) {
267 if (time_not_reached(lk)) return queue_op_status::not_ready;
268 return queue_op_status::timeout;
269 }
270 }
271 pull(lk, elem);
272 return queue_op_status::success;
273 }
274
275 ///////////////////////////
276 template <class T, class Clock, class TimePoint>
277 bool sync_timed_queue<T, Clock, TimePoint>::empty_or_time_not_reached(unique_lock<mutex>& lk)
278 {
279 if ( super::empty(lk) ) return true;
280 if ( time_not_reached(lk) ) return true;
281 return false;
282 }
283 template <class T, class Clock, class TimePoint>
284 bool sync_timed_queue<T, Clock, TimePoint>::empty_or_time_not_reached(lock_guard<mutex>& lk)
285 {
286 if ( super::empty(lk) ) return true;
287 if ( time_not_reached(lk) ) return true;
288 return false;
289 }
290
291 ///////////////////////////
292 template <class T, class Clock, class TimePoint>
293 T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
294 {
295 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
296 return boost::move(super::data_.pull().data);
297 #else
298 return super::data_.pull().data;
299 #endif
300 }
301
302 template <class T, class Clock, class TimePoint>
303 T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
304 {
305 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
306 return boost::move(super::data_.pull().data);
307 #else
308 return super::data_.pull().data;
309 #endif
310 }
311 template <class T, class Clock, class TimePoint>
312 T sync_timed_queue<T, Clock, TimePoint>::pull()
313 {
314 unique_lock<mutex> lk(super::mtx_);
315 super::wait_until_not_empty(lk);
316 return pull_when_time_reached(lk);
317 }
318
319 ///////////////////////////
320 template <class T, class Clock, class TimePoint>
321 void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
322 {
323 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
324 elem = boost::move(super::data_.pull().data);
325 #else
326 elem = super::data_.pull().data;
327 #endif
328 }
329
330 template <class T, class Clock, class TimePoint>
331 void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
332 {
333 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
334 elem = boost::move(super::data_.pull().data);
335 #else
336 elem = super::data_.pull().data;
337 #endif
338 }
339
340 template <class T, class Clock, class TimePoint>
341 void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
342 {
343 unique_lock<mutex> lk(super::mtx_);
344 super::wait_until_not_empty(lk);
345 elem = pull_when_time_reached(lk);
346 }
347
348 //////////////////////
349 template <class T, class Clock, class TimePoint>
350 template <class WClock, class Duration>
351 queue_op_status
352 sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem)
353 {
354 unique_lock<mutex> lk(super::mtx_);
355
356 if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
357 return queue_op_status::timeout;
358 return pull_when_time_reached_until(lk, tp, elem);
359 }
360
361 //////////////////////
362 template <class T, class Clock, class TimePoint>
363 template <class Rep, class Period>
364 queue_op_status
365 sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
366 {
367 return pull_until(chrono::steady_clock::now() + dura, elem);
368 }
369
370 ///////////////////////////
371 template <class T, class Clock, class TimePoint>
372 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
373 {
374 if ( super::empty(lk) )
375 {
376 if (super::closed(lk)) return queue_op_status::closed;
377 return queue_op_status::empty;
378 }
379 if ( time_not_reached(lk) )
380 {
381 if (super::closed(lk)) return queue_op_status::closed;
382 return queue_op_status::not_ready;
383 }
384
385 pull(lk, elem);
386 return queue_op_status::success;
387 }
388 template <class T, class Clock, class TimePoint>
389 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
390 {
391 if ( super::empty(lk) )
392 {
393 if (super::closed(lk)) return queue_op_status::closed;
394 return queue_op_status::empty;
395 }
396 if ( time_not_reached(lk) )
397 {
398 if (super::closed(lk)) return queue_op_status::closed;
399 return queue_op_status::not_ready;
400 }
401 pull(lk, elem);
402 return queue_op_status::success;
403 }
404
405 template <class T, class Clock, class TimePoint>
406 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
407 {
408 lock_guard<mutex> lk(super::mtx_);
409 return try_pull(lk, elem);
410 }
411
412 ///////////////////////////
413 template <class T, class Clock, class TimePoint>
414 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
415 {
416 if (super::empty(lk))
417 {
418 if (super::closed(lk)) return queue_op_status::closed;
419 }
420 bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk);
421 if (has_been_closed) return queue_op_status::closed;
422 pull(lk, elem);
423 return queue_op_status::success;
424 }
425
426 template <class T, class Clock, class TimePoint>
427 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
428 {
429 unique_lock<mutex> lk(super::mtx_);
430 return wait_pull(lk, elem);
431 }
432
433 // ///////////////////////////
434 // template <class T, class Clock, class TimePoint>
435 // queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex> &lk, T& elem)
436 // {
437 // if (super::empty(lk))
438 // {
439 // if (super::closed(lk)) return queue_op_status::closed;
440 // }
441 // bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
442 // if (has_been_closed) return queue_op_status::closed;
443 // pull(lk, elem);
444 // return queue_op_status::success;
445 // }
446 // template <class T, class Clock, class TimePoint>
447 // queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
448 // {
449 // unique_lock<mutex> lk(super::mtx_);
450 // return wait_pull(lk, elem);
451 // }
452
453 ///////////////////////////
454 template <class T, class Clock, class TimePoint>
455 queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
456 {
457 unique_lock<mutex> lk(super::mtx_, try_to_lock);
458 if (! lk.owns_lock()) return queue_op_status::busy;
459 return try_pull(lk, elem);
460 }
461
462 } //end concurrent namespace
463
464 using concurrent::sync_timed_queue;
465
466 } //end boost namespace
467 #include <boost/config/abi_suffix.hpp>
468
469 #endif