1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
11 #include <boost/thread/detail/config.hpp>
13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
14 #include <boost/chrono/duration.hpp>
15 #include <boost/chrono/time_point.hpp>
16 #include <boost/chrono/system_clocks.hpp>
17 #include <boost/chrono/chrono_io.hpp>
19 #include <algorithm> // std::min
21 #include <boost/config/abi_prefix.hpp>
29 // fixme: shouldn't the timepoint be configurable
30 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
35 typedef TimePoint time_point;
39 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
41 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
42 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
44 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
45 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
51 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
52 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
53 data = boost::move(other.data);
58 bool operator <(const scheduled_type & other) const
60 return this->time > other.time;
64 template <class Duration>
65 chrono::time_point<chrono::steady_clock,Duration>
66 limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
68 // Clock == chrono::steady_clock
72 template <class Clock, class Duration>
73 chrono::time_point<Clock,Duration>
74 limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
76 // Clock != chrono::steady_clock
77 // The system time may jump while wait_until() is waiting. To compensate for this and time out near
78 // the correct time, we limit how long wait_until() can wait before going around the loop again.
79 const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
80 return (std::min)(tp, tpmax);
83 template <class Duration>
84 chrono::steady_clock::time_point
85 convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
87 // Clock == chrono::steady_clock
88 return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
91 template <class Clock, class Duration>
92 chrono::steady_clock::time_point
93 convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
95 // Clock != chrono::steady_clock
96 // The system time may jump while wait_until() is waiting. To compensate for this and time out near
97 // the correct time, we limit how long wait_until() can wait before going around the loop again.
98 const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
99 const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
100 return chrono::steady_clock::now() + (std::min)(dura, duramax);
103 } //end detail namespace
105 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
106 class sync_timed_queue
107 : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
109 typedef detail::scheduled_type<T, Clock, TimePoint> stype;
110 typedef sync_priority_queue<stype> super;
112 typedef T value_type;
114 typedef typename clock::duration duration;
115 typedef typename clock::time_point time_point;
116 typedef typename super::underlying_queue_type underlying_queue_type;
117 typedef typename super::size_type size_type;
118 typedef typename super::op_status op_status;
120 sync_timed_queue() : super() {};
121 ~sync_timed_queue() {}
132 template <class Duration>
133 queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
134 template <class Rep, class Period>
135 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
137 queue_op_status try_pull(T& elem);
138 queue_op_status wait_pull(T& elem);
139 queue_op_status nonblocking_pull(T& elem);
141 template <class Duration>
142 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
143 template <class Rep, class Period>
144 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
146 template <class Duration>
147 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
148 template <class Rep, class Period>
149 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
151 template <class Duration>
152 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
153 template <class Rep, class Period>
154 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
156 template <class Duration>
157 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
158 template <class Rep, class Period>
159 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
162 inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
163 inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
165 bool wait_to_pull(unique_lock<mutex>&);
166 queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
167 template <class Rep, class Period>
168 queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
170 T pull(unique_lock<mutex>&);
171 T pull(lock_guard<mutex>&);
173 void pull(unique_lock<mutex>&, T& elem);
174 void pull(lock_guard<mutex>&, T& elem);
176 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
177 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
179 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
181 sync_timed_queue(const sync_timed_queue&);
182 sync_timed_queue& operator=(const sync_timed_queue&);
183 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
184 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
188 template <class T, class Clock, class TimePoint>
189 template <class Duration>
190 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
192 super::push(stype(elem,tp));
195 template <class T, class Clock, class TimePoint>
196 template <class Rep, class Period>
197 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
199 push(elem, clock::now() + dura);
202 template <class T, class Clock, class TimePoint>
203 template <class Duration>
204 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
206 super::push(stype(boost::move(elem),tp));
209 template <class T, class Clock, class TimePoint>
210 template <class Rep, class Period>
211 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
213 push(boost::move(elem), clock::now() + dura);
218 template <class T, class Clock, class TimePoint>
219 template <class Duration>
220 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
222 return super::try_push(stype(elem,tp));
225 template <class T, class Clock, class TimePoint>
226 template <class Rep, class Period>
227 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
229 return try_push(elem,clock::now() + dura);
232 template <class T, class Clock, class TimePoint>
233 template <class Duration>
234 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
236 return super::try_push(stype(boost::move(elem), tp));
239 template <class T, class Clock, class TimePoint>
240 template <class Rep, class Period>
241 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
243 return try_push(boost::move(elem), clock::now() + dura);
246 ///////////////////////////
247 template <class T, class Clock, class TimePoint>
248 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
250 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
253 template <class T, class Clock, class TimePoint>
254 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
256 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
259 ///////////////////////////
260 template <class T, class Clock, class TimePoint>
261 bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
265 if (not_empty_and_time_reached(lk)) return false; // success
266 if (super::closed(lk)) return true; // closed
268 super::wait_until_not_empty_or_closed(lk);
270 if (not_empty_and_time_reached(lk)) return false; // success
271 if (super::closed(lk)) return true; // closed
273 const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
274 super::cond_.wait_until(lk, tpmin);
278 template <class T, class Clock, class TimePoint>
279 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)
283 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
284 if (super::closed(lk)) return queue_op_status::closed;
285 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
287 super::wait_until_not_empty_or_closed_until(lk, tp);
289 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
290 if (super::closed(lk)) return queue_op_status::closed;
291 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
293 const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
294 super::cond_.wait_until(lk, tpmin);
298 template <class T, class Clock, class TimePoint>
299 template <class Rep, class Period>
300 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
302 const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
305 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
306 if (super::closed(lk)) return queue_op_status::closed;
307 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
309 super::wait_until_not_empty_or_closed_until(lk, tp);
311 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
312 if (super::closed(lk)) return queue_op_status::closed;
313 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
315 const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
316 super::cond_.wait_until(lk, tpmin);
320 ///////////////////////////
321 template <class T, class Clock, class TimePoint>
322 T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
324 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
325 return boost::move(super::data_.pull().data);
327 return super::data_.pull().data;
331 template <class T, class Clock, class TimePoint>
332 T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
334 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
335 return boost::move(super::data_.pull().data);
337 return super::data_.pull().data;
340 template <class T, class Clock, class TimePoint>
341 T sync_timed_queue<T, Clock, TimePoint>::pull()
343 unique_lock<mutex> lk(super::mtx_);
344 const bool has_been_closed = wait_to_pull(lk);
345 if (has_been_closed) super::throw_if_closed(lk);
349 ///////////////////////////
350 template <class T, class Clock, class TimePoint>
351 void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
353 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
354 elem = boost::move(super::data_.pull().data);
356 elem = super::data_.pull().data;
360 template <class T, class Clock, class TimePoint>
361 void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
363 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
364 elem = boost::move(super::data_.pull().data);
366 elem = super::data_.pull().data;
370 template <class T, class Clock, class TimePoint>
371 void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
373 unique_lock<mutex> lk(super::mtx_);
374 const bool has_been_closed = wait_to_pull(lk);
375 if (has_been_closed) super::throw_if_closed(lk);
379 //////////////////////
380 template <class T, class Clock, class TimePoint>
381 template <class Duration>
383 sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
385 unique_lock<mutex> lk(super::mtx_);
386 const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));
387 if (rc == queue_op_status::success) pull(lk, elem);
391 //////////////////////
392 template <class T, class Clock, class TimePoint>
393 template <class Rep, class Period>
395 sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
397 unique_lock<mutex> lk(super::mtx_);
398 const queue_op_status rc = wait_to_pull_for(lk, dura);
399 if (rc == queue_op_status::success) pull(lk, elem);
403 ///////////////////////////
404 template <class T, class Clock, class TimePoint>
405 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
407 if (not_empty_and_time_reached(lk))
410 return queue_op_status::success;
412 if (super::closed(lk)) return queue_op_status::closed;
413 if (super::empty(lk)) return queue_op_status::empty;
414 return queue_op_status::not_ready;
416 template <class T, class Clock, class TimePoint>
417 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
419 if (not_empty_and_time_reached(lk))
422 return queue_op_status::success;
424 if (super::closed(lk)) return queue_op_status::closed;
425 if (super::empty(lk)) return queue_op_status::empty;
426 return queue_op_status::not_ready;
429 template <class T, class Clock, class TimePoint>
430 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
432 lock_guard<mutex> lk(super::mtx_);
433 return try_pull(lk, elem);
436 ///////////////////////////
437 template <class T, class Clock, class TimePoint>
438 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
440 const bool has_been_closed = wait_to_pull(lk);
441 if (has_been_closed) return queue_op_status::closed;
443 return queue_op_status::success;
446 template <class T, class Clock, class TimePoint>
447 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
449 unique_lock<mutex> lk(super::mtx_);
450 return wait_pull(lk, elem);
453 ///////////////////////////
454 template <class T, class Clock, class TimePoint>
455 queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
457 unique_lock<mutex> lk(super::mtx_, try_to_lock);
458 if (! lk.owns_lock()) return queue_op_status::busy;
459 return try_pull(lk, elem);
462 } //end concurrent namespace
464 using concurrent::sync_timed_queue;
466 } //end boost namespace
467 #include <boost/config/abi_suffix.hpp>