1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014 Vicente J. Botet Escriba
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
11 #include <boost/thread/detail/config.hpp>
13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
14 #include <boost/chrono/duration.hpp>
15 #include <boost/chrono/time_point.hpp>
16 #include <boost/chrono/system_clocks.hpp>
17 #include <boost/chrono/chrono_io.hpp>
19 #include <boost/config/abi_prefix.hpp>
27 template <class T, class Clock = chrono::steady_clock>
32 typedef typename clock::time_point time_point;
36 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
38 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
39 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
41 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
42 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
48 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
49 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
50 data = boost::move(other.data);
55 bool time_not_reached() const
57 return time > clock::now();
60 bool operator <(const scheduled_type<T> other) const
62 return this->time > other.time;
66 } //end detail namespace
68 template <class T, class Clock = chrono::steady_clock>
69 class sync_timed_queue
70 : private sync_priority_queue<detail::scheduled_type<T, Clock> >
72 typedef detail::scheduled_type<T, Clock> stype;
73 typedef sync_priority_queue<stype> super;
77 typedef typename clock::duration duration;
78 typedef typename clock::time_point time_point;
79 typedef typename super::underlying_queue_type underlying_queue_type;
80 typedef typename super::size_type size_type;
81 typedef typename super::op_status op_status;
83 sync_timed_queue() : super() {};
84 ~sync_timed_queue() {}
95 template <class Duration>
96 queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
97 template <class Rep, class Period>
98 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
100 queue_op_status try_pull(T& elem);
101 queue_op_status wait_pull(T& elem);
102 queue_op_status nonblocking_pull(T& elem);
104 template <class Duration>
105 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
106 template <class Rep, class Period>
107 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
109 template <class Duration>
110 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
111 template <class Rep, class Period>
112 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
114 template <class Duration>
115 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
116 template <class Rep, class Period>
117 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
119 template <class Duration>
120 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
121 template <class Rep, class Period>
122 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
125 T pull(unique_lock<mutex>&);
126 T pull(lock_guard<mutex>&);
128 void pull(unique_lock<mutex>&, T& elem);
129 void pull(lock_guard<mutex>&, T& elem);
131 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
132 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
134 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
136 bool wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>&);
137 T pull_when_time_reached(unique_lock<mutex>&);
138 template <class Duration>
139 queue_op_status pull_when_time_reached_until(unique_lock<mutex>&, chrono::time_point<clock,Duration> const& tp, T& elem);
140 bool time_not_reached(unique_lock<mutex>&);
141 bool time_not_reached(lock_guard<mutex>&);
142 bool empty_or_time_not_reached(unique_lock<mutex>&);
143 bool empty_or_time_not_reached(lock_guard<mutex>&);
145 sync_timed_queue(const sync_timed_queue&);
146 sync_timed_queue& operator=(const sync_timed_queue&);
147 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
148 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
152 template <class T, class Clock>
153 template <class Duration>
154 void sync_timed_queue<T, Clock>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
156 super::push(stype(elem,tp));
159 template <class T, class Clock>
160 template <class Rep, class Period>
161 void sync_timed_queue<T, Clock>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
163 push(elem, clock::now() + dura);
166 template <class T, class Clock>
167 template <class Duration>
168 void sync_timed_queue<T, Clock>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
170 super::push(stype(boost::move(elem),tp));
173 template <class T, class Clock>
174 template <class Rep, class Period>
175 void sync_timed_queue<T, Clock>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
177 push(boost::move(elem), clock::now() + dura);
182 template <class T, class Clock>
183 template <class Duration>
184 queue_op_status sync_timed_queue<T, Clock>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
186 return super::try_push(stype(elem,tp));
189 template <class T, class Clock>
190 template <class Rep, class Period>
191 queue_op_status sync_timed_queue<T, Clock>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
193 return try_push(elem,clock::now() + dura);
196 template <class T, class Clock>
197 template <class Duration>
198 queue_op_status sync_timed_queue<T, Clock>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
200 return super::try_push(stype(boost::move(elem), tp));
203 template <class T, class Clock>
204 template <class Rep, class Period>
205 queue_op_status sync_timed_queue<T, Clock>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
207 return try_push(boost::move(elem), clock::now() + dura);
210 ///////////////////////////
211 template <class T, class Clock>
212 bool sync_timed_queue<T, Clock>::time_not_reached(unique_lock<mutex>&)
214 return super::data_.top().time_not_reached();
217 template <class T, class Clock>
218 bool sync_timed_queue<T, Clock>::time_not_reached(lock_guard<mutex>&)
220 return super::data_.top().time_not_reached();
223 ///////////////////////////
224 template <class T, class Clock>
225 bool sync_timed_queue<T, Clock>::wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>& lk)
229 if (super::closed(lk)) return true;
230 while (! super::empty(lk)) {
231 if (! time_not_reached(lk)) return false;
232 time_point tp = super::data_.top().time;
233 super::not_empty_.wait_until(lk, tp);
234 if (super::closed(lk)) return true;
236 if (super::closed(lk)) return true;
237 super::not_empty_.wait(lk);
242 ///////////////////////////
243 template <class T, class Clock>
244 T sync_timed_queue<T, Clock>::pull_when_time_reached(unique_lock<mutex>& lk)
246 while (time_not_reached(lk))
248 super::throw_if_closed(lk);
249 time_point tp = super::data_.top().time;
250 super::not_empty_.wait_until(lk,tp);
251 super::wait_until_not_empty(lk);
256 template <class T, class Clock>
257 template <class Duration>
259 sync_timed_queue<T, Clock>::pull_when_time_reached_until(unique_lock<mutex>& lk, chrono::time_point<clock,Duration> const& tp, T& elem)
261 chrono::time_point<clock,Duration> tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time;
262 while (time_not_reached(lk))
264 super::throw_if_closed(lk);
265 if (cv_status::timeout == super::not_empty_.wait_until(lk, tpmin)) {
266 if (time_not_reached(lk)) return queue_op_status::not_ready;
267 return queue_op_status::timeout;
271 return queue_op_status::success;
274 ///////////////////////////
275 template <class T, class Clock>
276 bool sync_timed_queue<T, Clock>::empty_or_time_not_reached(unique_lock<mutex>& lk)
278 if ( super::empty(lk) ) return true;
279 if ( time_not_reached(lk) ) return true;
282 template <class T, class Clock>
283 bool sync_timed_queue<T, Clock>::empty_or_time_not_reached(lock_guard<mutex>& lk)
285 if ( super::empty(lk) ) return true;
286 if ( time_not_reached(lk) ) return true;
290 ///////////////////////////
291 template <class T, class Clock>
292 T sync_timed_queue<T, Clock>::pull(unique_lock<mutex>&)
294 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
295 return boost::move(super::data_.pull().data);
297 return super::data_.pull().data;
301 template <class T, class Clock>
302 T sync_timed_queue<T, Clock>::pull(lock_guard<mutex>&)
304 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
305 return boost::move(super::data_.pull().data);
307 return super::data_.pull().data;
310 template <class T, class Clock>
311 T sync_timed_queue<T, Clock>::pull()
313 unique_lock<mutex> lk(super::mtx_);
314 super::wait_until_not_empty(lk);
315 return pull_when_time_reached(lk);
318 ///////////////////////////
319 template <class T, class Clock>
320 void sync_timed_queue<T, Clock>::pull(unique_lock<mutex>&, T& elem)
322 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
323 elem = boost::move(super::data_.pull().data);
325 elem = super::data_.pull().data;
329 template <class T, class Clock>
330 void sync_timed_queue<T, Clock>::pull(lock_guard<mutex>&, T& elem)
332 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
333 elem = boost::move(super::data_.pull().data);
335 elem = super::data_.pull().data;
339 template <class T, class Clock>
340 void sync_timed_queue<T, Clock>::pull(T& elem)
342 unique_lock<mutex> lk(super::mtx_);
343 super::wait_until_not_empty(lk);
344 elem = pull_when_time_reached(lk);
347 //////////////////////
348 template <class T, class Clock>
349 template <class Duration>
351 sync_timed_queue<T, Clock>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
353 unique_lock<mutex> lk(super::mtx_);
355 if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
356 return queue_op_status::timeout;
357 return pull_when_time_reached_until(lk, tp, elem);
360 //////////////////////
361 template <class T, class Clock>
362 template <class Rep, class Period>
364 sync_timed_queue<T, Clock>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
366 return pull_until(clock::now() + dura, elem);
369 ///////////////////////////
370 template <class T, class Clock>
371 queue_op_status sync_timed_queue<T, Clock>::try_pull(unique_lock<mutex>& lk, T& elem)
373 if ( super::empty(lk) )
375 if (super::closed(lk)) return queue_op_status::closed;
376 return queue_op_status::empty;
378 if ( time_not_reached(lk) )
380 if (super::closed(lk)) return queue_op_status::closed;
381 return queue_op_status::not_ready;
385 return queue_op_status::success;
387 template <class T, class Clock>
388 queue_op_status sync_timed_queue<T, Clock>::try_pull(lock_guard<mutex>& lk, T& elem)
390 if ( super::empty(lk) )
392 if (super::closed(lk)) return queue_op_status::closed;
393 return queue_op_status::empty;
395 if ( time_not_reached(lk) )
397 if (super::closed(lk)) return queue_op_status::closed;
398 return queue_op_status::not_ready;
401 return queue_op_status::success;
404 template <class T, class Clock>
405 queue_op_status sync_timed_queue<T, Clock>::try_pull(T& elem)
407 lock_guard<mutex> lk(super::mtx_);
408 return try_pull(lk, elem);
411 ///////////////////////////
412 template <class T, class Clock>
413 queue_op_status sync_timed_queue<T, Clock>::wait_pull(unique_lock<mutex>& lk, T& elem)
415 if (super::empty(lk))
417 if (super::closed(lk)) return queue_op_status::closed;
419 bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk);
420 if (has_been_closed) return queue_op_status::closed;
422 return queue_op_status::success;
425 template <class T, class Clock>
426 queue_op_status sync_timed_queue<T, Clock>::wait_pull(T& elem)
428 unique_lock<mutex> lk(super::mtx_);
429 return wait_pull(lk, elem);
432 // ///////////////////////////
433 // template <class T, class Clock>
434 // queue_op_status sync_timed_queue<T, Clock>::wait_pull(unique_lock<mutex> &lk, T& elem)
436 // if (super::empty(lk))
438 // if (super::closed(lk)) return queue_op_status::closed;
440 // bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
441 // if (has_been_closed) return queue_op_status::closed;
443 // return queue_op_status::success;
445 // template <class T>
446 // queue_op_status sync_timed_queue<T, Clock>::wait_pull(T& elem)
448 // unique_lock<mutex> lk(super::mtx_);
449 // return wait_pull(lk, elem);
452 ///////////////////////////
453 template <class T, class Clock>
454 queue_op_status sync_timed_queue<T, Clock>::nonblocking_pull(T& elem)
456 unique_lock<mutex> lk(super::mtx_, try_to_lock);
457 if (! lk.owns_lock()) return queue_op_status::busy;
458 return try_pull(lk, elem);
461 } //end concurrent namespace
463 using concurrent::sync_timed_queue;
465 } //end boost namespace
466 #include <boost/config/abi_suffix.hpp>