1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
11 #include <boost/thread/detail/config.hpp>
13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
14 #include <boost/chrono/duration.hpp>
15 #include <boost/chrono/time_point.hpp>
16 #include <boost/chrono/system_clocks.hpp>
17 #include <boost/chrono/chrono_io.hpp>
19 #include <boost/config/abi_prefix.hpp>
27 // fixme: shouldn't the timepoint be configurable
28 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
33 typedef TimePoint time_point;
37 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
39 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
40 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
42 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
43 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
49 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
50 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
51 data = boost::move(other.data);
56 bool operator <(const scheduled_type & other) const
58 return this->time > other.time;
62 } //end detail namespace
64 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
65 class sync_timed_queue
66 : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
68 typedef detail::scheduled_type<T, Clock, TimePoint> stype;
69 typedef sync_priority_queue<stype> super;
73 typedef typename clock::duration duration;
74 typedef typename clock::time_point time_point;
75 typedef typename super::underlying_queue_type underlying_queue_type;
76 typedef typename super::size_type size_type;
77 typedef typename super::op_status op_status;
79 sync_timed_queue() : super() {};
80 ~sync_timed_queue() {}
91 template <class WClock, class Duration>
92 queue_op_status pull_until(chrono::time_point<WClock,Duration> const& tp, T& elem);
93 template <class Rep, class Period>
94 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
96 queue_op_status try_pull(T& elem);
97 queue_op_status wait_pull(T& elem);
98 queue_op_status nonblocking_pull(T& elem);
100 template <class Duration>
101 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
102 template <class Rep, class Period>
103 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
105 template <class Duration>
106 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
107 template <class Rep, class Period>
108 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
110 template <class Duration>
111 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
112 template <class Rep, class Period>
113 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
115 template <class Duration>
116 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
117 template <class Rep, class Period>
118 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
121 inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
122 inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
124 bool wait_to_pull(unique_lock<mutex>&);
125 template <class WClock, class Duration>
126 queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp);
128 T pull(unique_lock<mutex>&);
129 T pull(lock_guard<mutex>&);
131 void pull(unique_lock<mutex>&, T& elem);
132 void pull(lock_guard<mutex>&, T& elem);
134 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
135 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
137 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
139 sync_timed_queue(const sync_timed_queue&);
140 sync_timed_queue& operator=(const sync_timed_queue&);
141 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
142 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
146 template <class T, class Clock, class TimePoint>
147 template <class Duration>
148 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
150 super::push(stype(elem,tp));
153 template <class T, class Clock, class TimePoint>
154 template <class Rep, class Period>
155 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
157 push(elem, clock::now() + dura);
160 template <class T, class Clock, class TimePoint>
161 template <class Duration>
162 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
164 super::push(stype(boost::move(elem),tp));
167 template <class T, class Clock, class TimePoint>
168 template <class Rep, class Period>
169 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
171 push(boost::move(elem), clock::now() + dura);
176 template <class T, class Clock, class TimePoint>
177 template <class Duration>
178 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
180 return super::try_push(stype(elem,tp));
183 template <class T, class Clock, class TimePoint>
184 template <class Rep, class Period>
185 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
187 return try_push(elem,clock::now() + dura);
190 template <class T, class Clock, class TimePoint>
191 template <class Duration>
192 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
194 return super::try_push(stype(boost::move(elem), tp));
197 template <class T, class Clock, class TimePoint>
198 template <class Rep, class Period>
199 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
201 return try_push(boost::move(elem), clock::now() + dura);
204 ///////////////////////////
205 template <class T, class Clock, class TimePoint>
206 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
208 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
211 template <class T, class Clock, class TimePoint>
212 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
214 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
217 ///////////////////////////
218 template <class T, class Clock, class TimePoint>
219 bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
223 if (not_empty_and_time_reached(lk)) return false; // success
224 if (super::closed(lk)) return true; // closed
226 super::wait_until_not_empty_or_closed(lk);
228 if (not_empty_and_time_reached(lk)) return false; // success
229 if (super::closed(lk)) return true; // closed
231 const time_point tp(super::data_.top().time);
232 super::wait_until_closed_until(lk, tp);
236 template <class T, class Clock, class TimePoint>
237 template <class WClock, class Duration>
238 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp)
242 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
243 if (super::closed(lk)) return queue_op_status::closed;
244 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
246 super::wait_until_not_empty_or_closed_until(lk, tp);
248 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
249 if (super::closed(lk)) return queue_op_status::closed;
250 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
252 const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time);
253 super::wait_until_closed_until(lk, tpmin);
257 ///////////////////////////
258 template <class T, class Clock, class TimePoint>
259 T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
261 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
262 return boost::move(super::data_.pull().data);
264 return super::data_.pull().data;
268 template <class T, class Clock, class TimePoint>
269 T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
271 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
272 return boost::move(super::data_.pull().data);
274 return super::data_.pull().data;
277 template <class T, class Clock, class TimePoint>
278 T sync_timed_queue<T, Clock, TimePoint>::pull()
280 unique_lock<mutex> lk(super::mtx_);
281 const bool has_been_closed = wait_to_pull(lk);
282 if (has_been_closed) super::throw_if_closed(lk);
286 ///////////////////////////
287 template <class T, class Clock, class TimePoint>
288 void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
290 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
291 elem = boost::move(super::data_.pull().data);
293 elem = super::data_.pull().data;
297 template <class T, class Clock, class TimePoint>
298 void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
300 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
301 elem = boost::move(super::data_.pull().data);
303 elem = super::data_.pull().data;
307 template <class T, class Clock, class TimePoint>
308 void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
310 unique_lock<mutex> lk(super::mtx_);
311 const bool has_been_closed = wait_to_pull(lk);
312 if (has_been_closed) super::throw_if_closed(lk);
316 //////////////////////
317 template <class T, class Clock, class TimePoint>
318 template <class WClock, class Duration>
320 sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem)
322 unique_lock<mutex> lk(super::mtx_);
323 const queue_op_status rc = wait_to_pull_until(lk, tp);
324 if (rc == queue_op_status::success) pull(lk, elem);
328 //////////////////////
329 template <class T, class Clock, class TimePoint>
330 template <class Rep, class Period>
332 sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
334 return pull_until(chrono::steady_clock::now() + dura, elem);
337 ///////////////////////////
338 template <class T, class Clock, class TimePoint>
339 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
341 if (not_empty_and_time_reached(lk))
344 return queue_op_status::success;
346 if (super::closed(lk)) return queue_op_status::closed;
347 if (super::empty(lk)) return queue_op_status::empty;
348 return queue_op_status::not_ready;
350 template <class T, class Clock, class TimePoint>
351 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
353 if (not_empty_and_time_reached(lk))
356 return queue_op_status::success;
358 if (super::closed(lk)) return queue_op_status::closed;
359 if (super::empty(lk)) return queue_op_status::empty;
360 return queue_op_status::not_ready;
363 template <class T, class Clock, class TimePoint>
364 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
366 lock_guard<mutex> lk(super::mtx_);
367 return try_pull(lk, elem);
370 ///////////////////////////
371 template <class T, class Clock, class TimePoint>
372 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
374 const bool has_been_closed = wait_to_pull(lk);
375 if (has_been_closed) return queue_op_status::closed;
377 return queue_op_status::success;
380 template <class T, class Clock, class TimePoint>
381 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
383 unique_lock<mutex> lk(super::mtx_);
384 return wait_pull(lk, elem);
387 ///////////////////////////
388 template <class T, class Clock, class TimePoint>
389 queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
391 unique_lock<mutex> lk(super::mtx_, try_to_lock);
392 if (! lk.owns_lock()) return queue_op_status::busy;
393 return try_pull(lk, elem);
396 } //end concurrent namespace
398 using concurrent::sync_timed_queue;
400 } //end boost namespace
401 #include <boost/config/abi_suffix.hpp>