]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/thread/concurrent_queues/sync_timed_queue.hpp
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / boost / boost / thread / concurrent_queues / sync_timed_queue.hpp
1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
10
11 #include <boost/thread/detail/config.hpp>
12
13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
14 #include <boost/chrono/duration.hpp>
15 #include <boost/chrono/time_point.hpp>
16 #include <boost/chrono/system_clocks.hpp>
17 #include <boost/chrono/chrono_io.hpp>
18
19 #include <algorithm> // std::min
20
21 #include <boost/config/abi_prefix.hpp>
22
23 namespace boost
24 {
25 namespace concurrent
26 {
27 namespace detail
28 {
29 // fixme: shouldn't the timepoint be configurable
30 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
31 struct scheduled_type
32 {
33 typedef T value_type;
34 typedef Clock clock;
35 typedef TimePoint time_point;
36 T data;
37 time_point time;
38
39 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
40
41 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
42 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
43
44 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
45 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
46 data = other.data;
47 time = other.time;
48 return *this;
49 }
50
51 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
52 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
53 data = boost::move(other.data);
54 time = other.time;
55 return *this;
56 }
57
58 bool operator <(const scheduled_type & other) const
59 {
60 return this->time > other.time;
61 }
62 }; //end struct
63
64 template <class Duration>
65 chrono::time_point<chrono::steady_clock,Duration>
66 limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
67 {
68 // Clock == chrono::steady_clock
69 return tp;
70 }
71
72 template <class Clock, class Duration>
73 chrono::time_point<Clock,Duration>
74 limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
75 {
76 // Clock != chrono::steady_clock
77 // The system time may jump while wait_until() is waiting. To compensate for this and time out near
78 // the correct time, we limit how long wait_until() can wait before going around the loop again.
79 const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
80 return (std::min)(tp, tpmax);
81 }
82
83 template <class Duration>
84 chrono::steady_clock::time_point
85 convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
86 {
87 // Clock == chrono::steady_clock
88 return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
89 }
90
91 template <class Clock, class Duration>
92 chrono::steady_clock::time_point
93 convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
94 {
95 // Clock != chrono::steady_clock
96 // The system time may jump while wait_until() is waiting. To compensate for this and time out near
97 // the correct time, we limit how long wait_until() can wait before going around the loop again.
98 const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
99 const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
100 return chrono::steady_clock::now() + (std::min)(dura, duramax);
101 }
102
103 } //end detail namespace
104
105 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
106 class sync_timed_queue
107 : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
108 {
109 typedef detail::scheduled_type<T, Clock, TimePoint> stype;
110 typedef sync_priority_queue<stype> super;
111 public:
112 typedef T value_type;
113 typedef Clock clock;
114 typedef typename clock::duration duration;
115 typedef typename clock::time_point time_point;
116 typedef typename super::underlying_queue_type underlying_queue_type;
117 typedef typename super::size_type size_type;
118 typedef typename super::op_status op_status;
119
120 sync_timed_queue() : super() {};
121 ~sync_timed_queue() {}
122
123 using super::size;
124 using super::empty;
125 using super::full;
126 using super::close;
127 using super::closed;
128
129 T pull();
130 void pull(T& elem);
131
132 template <class Duration>
133 queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
134 template <class Rep, class Period>
135 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
136
137 queue_op_status try_pull(T& elem);
138 queue_op_status wait_pull(T& elem);
139 queue_op_status nonblocking_pull(T& elem);
140
141 template <class Duration>
142 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
143 template <class Rep, class Period>
144 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
145
146 template <class Duration>
147 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
148 template <class Rep, class Period>
149 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
150
151 template <class Duration>
152 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
153 template <class Rep, class Period>
154 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
155
156 template <class Duration>
157 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
158 template <class Rep, class Period>
159 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
160
161 private:
162 inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
163 inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
164
165 bool wait_to_pull(unique_lock<mutex>&);
166 queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
167 template <class Rep, class Period>
168 queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
169
170 T pull(unique_lock<mutex>&);
171 T pull(lock_guard<mutex>&);
172
173 void pull(unique_lock<mutex>&, T& elem);
174 void pull(lock_guard<mutex>&, T& elem);
175
176 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
177 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
178
179 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
180
181 sync_timed_queue(const sync_timed_queue&);
182 sync_timed_queue& operator=(const sync_timed_queue&);
183 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
184 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
185 }; //end class
186
187
188 template <class T, class Clock, class TimePoint>
189 template <class Duration>
190 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
191 {
192 super::push(stype(elem,tp));
193 }
194
195 template <class T, class Clock, class TimePoint>
196 template <class Rep, class Period>
197 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
198 {
199 push(elem, clock::now() + dura);
200 }
201
202 template <class T, class Clock, class TimePoint>
203 template <class Duration>
204 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
205 {
206 super::push(stype(boost::move(elem),tp));
207 }
208
209 template <class T, class Clock, class TimePoint>
210 template <class Rep, class Period>
211 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
212 {
213 push(boost::move(elem), clock::now() + dura);
214 }
215
216
217
218 template <class T, class Clock, class TimePoint>
219 template <class Duration>
220 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
221 {
222 return super::try_push(stype(elem,tp));
223 }
224
225 template <class T, class Clock, class TimePoint>
226 template <class Rep, class Period>
227 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
228 {
229 return try_push(elem,clock::now() + dura);
230 }
231
232 template <class T, class Clock, class TimePoint>
233 template <class Duration>
234 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
235 {
236 return super::try_push(stype(boost::move(elem), tp));
237 }
238
239 template <class T, class Clock, class TimePoint>
240 template <class Rep, class Period>
241 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
242 {
243 return try_push(boost::move(elem), clock::now() + dura);
244 }
245
246 ///////////////////////////
247 template <class T, class Clock, class TimePoint>
248 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
249 {
250 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
251 }
252
253 template <class T, class Clock, class TimePoint>
254 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
255 {
256 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
257 }
258
259 ///////////////////////////
260 template <class T, class Clock, class TimePoint>
261 bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
262 {
263 for (;;)
264 {
265 if (not_empty_and_time_reached(lk)) return false; // success
266 if (super::closed(lk)) return true; // closed
267
268 super::wait_until_not_empty_or_closed(lk);
269
270 if (not_empty_and_time_reached(lk)) return false; // success
271 if (super::closed(lk)) return true; // closed
272
273 const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
274 super::cond_.wait_until(lk, tpmin);
275 }
276 }
277
278 template <class T, class Clock, class TimePoint>
279 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)
280 {
281 for (;;)
282 {
283 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
284 if (super::closed(lk)) return queue_op_status::closed;
285 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
286
287 super::wait_until_not_empty_or_closed_until(lk, tp);
288
289 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
290 if (super::closed(lk)) return queue_op_status::closed;
291 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
292
293 const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
294 super::cond_.wait_until(lk, tpmin);
295 }
296 }
297
298 template <class T, class Clock, class TimePoint>
299 template <class Rep, class Period>
300 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
301 {
302 const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
303 for (;;)
304 {
305 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
306 if (super::closed(lk)) return queue_op_status::closed;
307 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
308
309 super::wait_until_not_empty_or_closed_until(lk, tp);
310
311 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
312 if (super::closed(lk)) return queue_op_status::closed;
313 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
314
315 const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
316 super::cond_.wait_until(lk, tpmin);
317 }
318 }
319
320 ///////////////////////////
321 template <class T, class Clock, class TimePoint>
322 T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
323 {
324 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
325 return boost::move(super::data_.pull().data);
326 #else
327 return super::data_.pull().data;
328 #endif
329 }
330
331 template <class T, class Clock, class TimePoint>
332 T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
333 {
334 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
335 return boost::move(super::data_.pull().data);
336 #else
337 return super::data_.pull().data;
338 #endif
339 }
340 template <class T, class Clock, class TimePoint>
341 T sync_timed_queue<T, Clock, TimePoint>::pull()
342 {
343 unique_lock<mutex> lk(super::mtx_);
344 const bool has_been_closed = wait_to_pull(lk);
345 if (has_been_closed) super::throw_if_closed(lk);
346 return pull(lk);
347 }
348
349 ///////////////////////////
350 template <class T, class Clock, class TimePoint>
351 void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
352 {
353 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
354 elem = boost::move(super::data_.pull().data);
355 #else
356 elem = super::data_.pull().data;
357 #endif
358 }
359
360 template <class T, class Clock, class TimePoint>
361 void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
362 {
363 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
364 elem = boost::move(super::data_.pull().data);
365 #else
366 elem = super::data_.pull().data;
367 #endif
368 }
369
370 template <class T, class Clock, class TimePoint>
371 void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
372 {
373 unique_lock<mutex> lk(super::mtx_);
374 const bool has_been_closed = wait_to_pull(lk);
375 if (has_been_closed) super::throw_if_closed(lk);
376 pull(lk, elem);
377 }
378
379 //////////////////////
380 template <class T, class Clock, class TimePoint>
381 template <class Duration>
382 queue_op_status
383 sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
384 {
385 unique_lock<mutex> lk(super::mtx_);
386 const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));
387 if (rc == queue_op_status::success) pull(lk, elem);
388 return rc;
389 }
390
391 //////////////////////
392 template <class T, class Clock, class TimePoint>
393 template <class Rep, class Period>
394 queue_op_status
395 sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
396 {
397 unique_lock<mutex> lk(super::mtx_);
398 const queue_op_status rc = wait_to_pull_for(lk, dura);
399 if (rc == queue_op_status::success) pull(lk, elem);
400 return rc;
401 }
402
403 ///////////////////////////
404 template <class T, class Clock, class TimePoint>
405 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
406 {
407 if (not_empty_and_time_reached(lk))
408 {
409 pull(lk, elem);
410 return queue_op_status::success;
411 }
412 if (super::closed(lk)) return queue_op_status::closed;
413 if (super::empty(lk)) return queue_op_status::empty;
414 return queue_op_status::not_ready;
415 }
416 template <class T, class Clock, class TimePoint>
417 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
418 {
419 if (not_empty_and_time_reached(lk))
420 {
421 pull(lk, elem);
422 return queue_op_status::success;
423 }
424 if (super::closed(lk)) return queue_op_status::closed;
425 if (super::empty(lk)) return queue_op_status::empty;
426 return queue_op_status::not_ready;
427 }
428
429 template <class T, class Clock, class TimePoint>
430 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
431 {
432 lock_guard<mutex> lk(super::mtx_);
433 return try_pull(lk, elem);
434 }
435
436 ///////////////////////////
437 template <class T, class Clock, class TimePoint>
438 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
439 {
440 const bool has_been_closed = wait_to_pull(lk);
441 if (has_been_closed) return queue_op_status::closed;
442 pull(lk, elem);
443 return queue_op_status::success;
444 }
445
446 template <class T, class Clock, class TimePoint>
447 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
448 {
449 unique_lock<mutex> lk(super::mtx_);
450 return wait_pull(lk, elem);
451 }
452
453 ///////////////////////////
454 template <class T, class Clock, class TimePoint>
455 queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
456 {
457 unique_lock<mutex> lk(super::mtx_, try_to_lock);
458 if (! lk.owns_lock()) return queue_op_status::busy;
459 return try_pull(lk, elem);
460 }
461
462 } //end concurrent namespace
463
464 using concurrent::sync_timed_queue;
465
466 } //end boost namespace
467 #include <boost/config/abi_suffix.hpp>
468
469 #endif