]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/boost/thread/concurrent_queues/sync_timed_queue.hpp
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / boost / boost / thread / concurrent_queues / sync_timed_queue.hpp
CommitLineData
7c673cae 1// Copyright (C) 2014 Ian Forbed
b32b8144 2// Copyright (C) 2014-2017 Vicente J. Botet Escriba
7c673cae
FG
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
9#define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
10
11#include <boost/thread/detail/config.hpp>
12
13#include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
14#include <boost/chrono/duration.hpp>
15#include <boost/chrono/time_point.hpp>
16#include <boost/chrono/system_clocks.hpp>
17#include <boost/chrono/chrono_io.hpp>
18
92f5a8d4
TL
19#include <algorithm> // std::min
20
7c673cae
FG
21#include <boost/config/abi_prefix.hpp>
22
23namespace boost
24{
25namespace concurrent
26{
27namespace detail
28{
b32b8144
FG
29 // fixme: shouldn't the timepoint be configurable
30 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
7c673cae
FG
31 struct scheduled_type
32 {
33 typedef T value_type;
34 typedef Clock clock;
b32b8144 35 typedef TimePoint time_point;
7c673cae
FG
36 T data;
37 time_point time;
38
39 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
40
41 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
42 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
43
44 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
45 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
46 data = other.data;
47 time = other.time;
48 return *this;
49 }
50
51 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
52 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
53 data = boost::move(other.data);
54 time = other.time;
55 return *this;
56 }
57
b32b8144 58 bool operator <(const scheduled_type & other) const
7c673cae
FG
59 {
60 return this->time > other.time;
61 }
62 }; //end struct
63
92f5a8d4
TL
64 template <class Duration>
65 chrono::time_point<chrono::steady_clock,Duration>
66 limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
67 {
68 // Clock == chrono::steady_clock
69 return tp;
70 }
71
72 template <class Clock, class Duration>
73 chrono::time_point<Clock,Duration>
74 limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
75 {
76 // Clock != chrono::steady_clock
77 // The system time may jump while wait_until() is waiting. To compensate for this and time out near
78 // the correct time, we limit how long wait_until() can wait before going around the loop again.
79 const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
80 return (std::min)(tp, tpmax);
81 }
82
83 template <class Duration>
84 chrono::steady_clock::time_point
85 convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
86 {
87 // Clock == chrono::steady_clock
88 return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
89 }
90
91 template <class Clock, class Duration>
92 chrono::steady_clock::time_point
93 convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
94 {
95 // Clock != chrono::steady_clock
96 // The system time may jump while wait_until() is waiting. To compensate for this and time out near
97 // the correct time, we limit how long wait_until() can wait before going around the loop again.
98 const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
99 const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
100 return chrono::steady_clock::now() + (std::min)(dura, duramax);
101 }
102
7c673cae
FG
103} //end detail namespace
104
b32b8144 105 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
7c673cae 106 class sync_timed_queue
b32b8144 107 : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
7c673cae 108 {
b32b8144 109 typedef detail::scheduled_type<T, Clock, TimePoint> stype;
7c673cae
FG
110 typedef sync_priority_queue<stype> super;
111 public:
112 typedef T value_type;
113 typedef Clock clock;
114 typedef typename clock::duration duration;
115 typedef typename clock::time_point time_point;
116 typedef typename super::underlying_queue_type underlying_queue_type;
117 typedef typename super::size_type size_type;
118 typedef typename super::op_status op_status;
119
120 sync_timed_queue() : super() {};
121 ~sync_timed_queue() {}
122
123 using super::size;
124 using super::empty;
125 using super::full;
126 using super::close;
127 using super::closed;
128
129 T pull();
130 void pull(T& elem);
131
92f5a8d4
TL
132 template <class Duration>
133 queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
7c673cae
FG
134 template <class Rep, class Period>
135 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
136
137 queue_op_status try_pull(T& elem);
138 queue_op_status wait_pull(T& elem);
139 queue_op_status nonblocking_pull(T& elem);
140
141 template <class Duration>
142 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
143 template <class Rep, class Period>
144 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
145
146 template <class Duration>
147 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
148 template <class Rep, class Period>
149 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
150
151 template <class Duration>
152 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
153 template <class Rep, class Period>
154 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
155
156 template <class Duration>
157 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
158 template <class Rep, class Period>
159 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
160
161 private:
11fdf7f2
TL
162 inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
163 inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
164
165 bool wait_to_pull(unique_lock<mutex>&);
92f5a8d4
TL
166 queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
167 template <class Rep, class Period>
168 queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
11fdf7f2 169
7c673cae
FG
170 T pull(unique_lock<mutex>&);
171 T pull(lock_guard<mutex>&);
172
173 void pull(unique_lock<mutex>&, T& elem);
174 void pull(lock_guard<mutex>&, T& elem);
175
176 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
177 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
178
179 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
180
7c673cae
FG
181 sync_timed_queue(const sync_timed_queue&);
182 sync_timed_queue& operator=(const sync_timed_queue&);
183 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
184 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
185 }; //end class
186
187
b32b8144 188 template <class T, class Clock, class TimePoint>
7c673cae 189 template <class Duration>
b32b8144 190 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
7c673cae
FG
191 {
192 super::push(stype(elem,tp));
193 }
194
b32b8144 195 template <class T, class Clock, class TimePoint>
7c673cae 196 template <class Rep, class Period>
b32b8144 197 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
7c673cae
FG
198 {
199 push(elem, clock::now() + dura);
200 }
201
b32b8144 202 template <class T, class Clock, class TimePoint>
7c673cae 203 template <class Duration>
b32b8144 204 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
7c673cae
FG
205 {
206 super::push(stype(boost::move(elem),tp));
207 }
208
b32b8144 209 template <class T, class Clock, class TimePoint>
7c673cae 210 template <class Rep, class Period>
b32b8144 211 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
7c673cae
FG
212 {
213 push(boost::move(elem), clock::now() + dura);
214 }
215
216
217
b32b8144 218 template <class T, class Clock, class TimePoint>
7c673cae 219 template <class Duration>
b32b8144 220 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
7c673cae
FG
221 {
222 return super::try_push(stype(elem,tp));
223 }
224
b32b8144 225 template <class T, class Clock, class TimePoint>
7c673cae 226 template <class Rep, class Period>
b32b8144 227 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
7c673cae
FG
228 {
229 return try_push(elem,clock::now() + dura);
230 }
231
b32b8144 232 template <class T, class Clock, class TimePoint>
7c673cae 233 template <class Duration>
b32b8144 234 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
7c673cae
FG
235 {
236 return super::try_push(stype(boost::move(elem), tp));
237 }
238
b32b8144 239 template <class T, class Clock, class TimePoint>
7c673cae 240 template <class Rep, class Period>
b32b8144 241 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
7c673cae
FG
242 {
243 return try_push(boost::move(elem), clock::now() + dura);
244 }
245
246 ///////////////////////////
b32b8144 247 template <class T, class Clock, class TimePoint>
11fdf7f2 248 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
7c673cae 249 {
11fdf7f2 250 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
7c673cae
FG
251 }
252
b32b8144 253 template <class T, class Clock, class TimePoint>
11fdf7f2 254 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
7c673cae 255 {
11fdf7f2 256 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
7c673cae
FG
257 }
258
259 ///////////////////////////
b32b8144 260 template <class T, class Clock, class TimePoint>
11fdf7f2 261 bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
7c673cae
FG
262 {
263 for (;;)
264 {
11fdf7f2
TL
265 if (not_empty_and_time_reached(lk)) return false; // success
266 if (super::closed(lk)) return true; // closed
7c673cae 267
11fdf7f2
TL
268 super::wait_until_not_empty_or_closed(lk);
269
270 if (not_empty_and_time_reached(lk)) return false; // success
271 if (super::closed(lk)) return true; // closed
272
92f5a8d4
TL
273 const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
274 super::cond_.wait_until(lk, tpmin);
7c673cae 275 }
7c673cae
FG
276 }
277
b32b8144 278 template <class T, class Clock, class TimePoint>
92f5a8d4 279 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)
7c673cae 280 {
11fdf7f2 281 for (;;)
7c673cae 282 {
11fdf7f2
TL
283 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
284 if (super::closed(lk)) return queue_op_status::closed;
285 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
7c673cae 286
11fdf7f2
TL
287 super::wait_until_not_empty_or_closed_until(lk, tp);
288
289 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
290 if (super::closed(lk)) return queue_op_status::closed;
291 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
292
92f5a8d4
TL
293 const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
294 super::cond_.wait_until(lk, tpmin);
295 }
296 }
297
298 template <class T, class Clock, class TimePoint>
299 template <class Rep, class Period>
300 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
301 {
302 const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
303 for (;;)
304 {
305 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
306 if (super::closed(lk)) return queue_op_status::closed;
307 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
308
309 super::wait_until_not_empty_or_closed_until(lk, tp);
310
311 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
312 if (super::closed(lk)) return queue_op_status::closed;
313 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
314
315 const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
316 super::cond_.wait_until(lk, tpmin);
11fdf7f2 317 }
7c673cae
FG
318 }
319
320 ///////////////////////////
b32b8144
FG
321 template <class T, class Clock, class TimePoint>
322 T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
7c673cae
FG
323 {
324#if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
325 return boost::move(super::data_.pull().data);
326#else
327 return super::data_.pull().data;
328#endif
329 }
330
b32b8144
FG
331 template <class T, class Clock, class TimePoint>
332 T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
7c673cae
FG
333 {
334#if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
335 return boost::move(super::data_.pull().data);
336#else
337 return super::data_.pull().data;
338#endif
339 }
b32b8144
FG
340 template <class T, class Clock, class TimePoint>
341 T sync_timed_queue<T, Clock, TimePoint>::pull()
7c673cae
FG
342 {
343 unique_lock<mutex> lk(super::mtx_);
11fdf7f2
TL
344 const bool has_been_closed = wait_to_pull(lk);
345 if (has_been_closed) super::throw_if_closed(lk);
346 return pull(lk);
7c673cae
FG
347 }
348
349 ///////////////////////////
b32b8144
FG
350 template <class T, class Clock, class TimePoint>
351 void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
7c673cae
FG
352 {
353#if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
354 elem = boost::move(super::data_.pull().data);
355#else
356 elem = super::data_.pull().data;
357#endif
358 }
359
b32b8144
FG
360 template <class T, class Clock, class TimePoint>
361 void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
7c673cae
FG
362 {
363#if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
364 elem = boost::move(super::data_.pull().data);
365#else
366 elem = super::data_.pull().data;
367#endif
368 }
369
b32b8144
FG
370 template <class T, class Clock, class TimePoint>
371 void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
7c673cae
FG
372 {
373 unique_lock<mutex> lk(super::mtx_);
11fdf7f2
TL
374 const bool has_been_closed = wait_to_pull(lk);
375 if (has_been_closed) super::throw_if_closed(lk);
376 pull(lk, elem);
7c673cae
FG
377 }
378
379 //////////////////////
b32b8144 380 template <class T, class Clock, class TimePoint>
92f5a8d4 381 template <class Duration>
7c673cae 382 queue_op_status
92f5a8d4 383 sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
7c673cae
FG
384 {
385 unique_lock<mutex> lk(super::mtx_);
92f5a8d4 386 const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));
11fdf7f2
TL
387 if (rc == queue_op_status::success) pull(lk, elem);
388 return rc;
7c673cae
FG
389 }
390
391 //////////////////////
b32b8144 392 template <class T, class Clock, class TimePoint>
7c673cae
FG
393 template <class Rep, class Period>
394 queue_op_status
b32b8144 395 sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
7c673cae 396 {
92f5a8d4
TL
397 unique_lock<mutex> lk(super::mtx_);
398 const queue_op_status rc = wait_to_pull_for(lk, dura);
399 if (rc == queue_op_status::success) pull(lk, elem);
400 return rc;
7c673cae
FG
401 }
402
403 ///////////////////////////
b32b8144
FG
404 template <class T, class Clock, class TimePoint>
405 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
7c673cae 406 {
11fdf7f2 407 if (not_empty_and_time_reached(lk))
7c673cae 408 {
11fdf7f2
TL
409 pull(lk, elem);
410 return queue_op_status::success;
7c673cae 411 }
11fdf7f2
TL
412 if (super::closed(lk)) return queue_op_status::closed;
413 if (super::empty(lk)) return queue_op_status::empty;
414 return queue_op_status::not_ready;
7c673cae 415 }
b32b8144
FG
416 template <class T, class Clock, class TimePoint>
417 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
7c673cae 418 {
11fdf7f2 419 if (not_empty_and_time_reached(lk))
7c673cae 420 {
11fdf7f2
TL
421 pull(lk, elem);
422 return queue_op_status::success;
7c673cae 423 }
11fdf7f2
TL
424 if (super::closed(lk)) return queue_op_status::closed;
425 if (super::empty(lk)) return queue_op_status::empty;
426 return queue_op_status::not_ready;
7c673cae
FG
427 }
428
b32b8144
FG
429 template <class T, class Clock, class TimePoint>
430 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
7c673cae
FG
431 {
432 lock_guard<mutex> lk(super::mtx_);
433 return try_pull(lk, elem);
434 }
435
436 ///////////////////////////
b32b8144
FG
437 template <class T, class Clock, class TimePoint>
438 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
7c673cae 439 {
11fdf7f2 440 const bool has_been_closed = wait_to_pull(lk);
7c673cae
FG
441 if (has_been_closed) return queue_op_status::closed;
442 pull(lk, elem);
443 return queue_op_status::success;
444 }
445
b32b8144
FG
446 template <class T, class Clock, class TimePoint>
447 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
7c673cae
FG
448 {
449 unique_lock<mutex> lk(super::mtx_);
450 return wait_pull(lk, elem);
451 }
452
7c673cae 453 ///////////////////////////
b32b8144
FG
454 template <class T, class Clock, class TimePoint>
455 queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
7c673cae
FG
456 {
457 unique_lock<mutex> lk(super::mtx_, try_to_lock);
458 if (! lk.owns_lock()) return queue_op_status::busy;
459 return try_pull(lk, elem);
460 }
461
462} //end concurrent namespace
463
464using concurrent::sync_timed_queue;
465
466} //end boost namespace
467#include <boost/config/abi_suffix.hpp>
468
469#endif