]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/thread/concurrent_queues/sync_timed_queue.hpp
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / boost / boost / thread / concurrent_queues / sync_timed_queue.hpp
1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
10
11 #include <boost/thread/detail/config.hpp>
12
13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
14 #include <boost/chrono/duration.hpp>
15 #include <boost/chrono/time_point.hpp>
16 #include <boost/chrono/system_clocks.hpp>
17 #include <boost/chrono/chrono_io.hpp>
18
19 #include <boost/config/abi_prefix.hpp>
20
21 namespace boost
22 {
23 namespace concurrent
24 {
25 namespace detail
26 {
27 // fixme: shouldn't the timepoint be configurable
28 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
29 struct scheduled_type
30 {
31 typedef T value_type;
32 typedef Clock clock;
33 typedef TimePoint time_point;
34 T data;
35 time_point time;
36
37 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
38
39 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
40 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
41
42 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
43 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
44 data = other.data;
45 time = other.time;
46 return *this;
47 }
48
49 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
50 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
51 data = boost::move(other.data);
52 time = other.time;
53 return *this;
54 }
55
56 bool operator <(const scheduled_type & other) const
57 {
58 return this->time > other.time;
59 }
60 }; //end struct
61
62 } //end detail namespace
63
64 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
65 class sync_timed_queue
66 : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
67 {
68 typedef detail::scheduled_type<T, Clock, TimePoint> stype;
69 typedef sync_priority_queue<stype> super;
70 public:
71 typedef T value_type;
72 typedef Clock clock;
73 typedef typename clock::duration duration;
74 typedef typename clock::time_point time_point;
75 typedef typename super::underlying_queue_type underlying_queue_type;
76 typedef typename super::size_type size_type;
77 typedef typename super::op_status op_status;
78
79 sync_timed_queue() : super() {};
80 ~sync_timed_queue() {}
81
82 using super::size;
83 using super::empty;
84 using super::full;
85 using super::close;
86 using super::closed;
87
88 T pull();
89 void pull(T& elem);
90
91 template <class WClock, class Duration>
92 queue_op_status pull_until(chrono::time_point<WClock,Duration> const& tp, T& elem);
93 template <class Rep, class Period>
94 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
95
96 queue_op_status try_pull(T& elem);
97 queue_op_status wait_pull(T& elem);
98 queue_op_status nonblocking_pull(T& elem);
99
100 template <class Duration>
101 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
102 template <class Rep, class Period>
103 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
104
105 template <class Duration>
106 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
107 template <class Rep, class Period>
108 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
109
110 template <class Duration>
111 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
112 template <class Rep, class Period>
113 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
114
115 template <class Duration>
116 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
117 template <class Rep, class Period>
118 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
119
120 private:
121 inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
122 inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
123
124 bool wait_to_pull(unique_lock<mutex>&);
125 template <class WClock, class Duration>
126 queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp);
127
128 T pull(unique_lock<mutex>&);
129 T pull(lock_guard<mutex>&);
130
131 void pull(unique_lock<mutex>&, T& elem);
132 void pull(lock_guard<mutex>&, T& elem);
133
134 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
135 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
136
137 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
138
139 sync_timed_queue(const sync_timed_queue&);
140 sync_timed_queue& operator=(const sync_timed_queue&);
141 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
142 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
143 }; //end class
144
145
146 template <class T, class Clock, class TimePoint>
147 template <class Duration>
148 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
149 {
150 super::push(stype(elem,tp));
151 }
152
153 template <class T, class Clock, class TimePoint>
154 template <class Rep, class Period>
155 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
156 {
157 push(elem, clock::now() + dura);
158 }
159
160 template <class T, class Clock, class TimePoint>
161 template <class Duration>
162 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
163 {
164 super::push(stype(boost::move(elem),tp));
165 }
166
167 template <class T, class Clock, class TimePoint>
168 template <class Rep, class Period>
169 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
170 {
171 push(boost::move(elem), clock::now() + dura);
172 }
173
174
175
176 template <class T, class Clock, class TimePoint>
177 template <class Duration>
178 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
179 {
180 return super::try_push(stype(elem,tp));
181 }
182
183 template <class T, class Clock, class TimePoint>
184 template <class Rep, class Period>
185 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
186 {
187 return try_push(elem,clock::now() + dura);
188 }
189
190 template <class T, class Clock, class TimePoint>
191 template <class Duration>
192 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
193 {
194 return super::try_push(stype(boost::move(elem), tp));
195 }
196
197 template <class T, class Clock, class TimePoint>
198 template <class Rep, class Period>
199 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
200 {
201 return try_push(boost::move(elem), clock::now() + dura);
202 }
203
204 ///////////////////////////
205 template <class T, class Clock, class TimePoint>
206 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
207 {
208 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
209 }
210
211 template <class T, class Clock, class TimePoint>
212 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
213 {
214 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
215 }
216
217 ///////////////////////////
218 template <class T, class Clock, class TimePoint>
219 bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
220 {
221 for (;;)
222 {
223 if (not_empty_and_time_reached(lk)) return false; // success
224 if (super::closed(lk)) return true; // closed
225
226 super::wait_until_not_empty_or_closed(lk);
227
228 if (not_empty_and_time_reached(lk)) return false; // success
229 if (super::closed(lk)) return true; // closed
230
231 const time_point tp(super::data_.top().time);
232 super::wait_until_closed_until(lk, tp);
233 }
234 }
235
236 template <class T, class Clock, class TimePoint>
237 template <class WClock, class Duration>
238 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp)
239 {
240 for (;;)
241 {
242 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
243 if (super::closed(lk)) return queue_op_status::closed;
244 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
245
246 super::wait_until_not_empty_or_closed_until(lk, tp);
247
248 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
249 if (super::closed(lk)) return queue_op_status::closed;
250 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
251
252 const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time);
253 super::wait_until_closed_until(lk, tpmin);
254 }
255 }
256
257 ///////////////////////////
258 template <class T, class Clock, class TimePoint>
259 T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
260 {
261 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
262 return boost::move(super::data_.pull().data);
263 #else
264 return super::data_.pull().data;
265 #endif
266 }
267
268 template <class T, class Clock, class TimePoint>
269 T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
270 {
271 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
272 return boost::move(super::data_.pull().data);
273 #else
274 return super::data_.pull().data;
275 #endif
276 }
277 template <class T, class Clock, class TimePoint>
278 T sync_timed_queue<T, Clock, TimePoint>::pull()
279 {
280 unique_lock<mutex> lk(super::mtx_);
281 const bool has_been_closed = wait_to_pull(lk);
282 if (has_been_closed) super::throw_if_closed(lk);
283 return pull(lk);
284 }
285
286 ///////////////////////////
287 template <class T, class Clock, class TimePoint>
288 void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
289 {
290 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
291 elem = boost::move(super::data_.pull().data);
292 #else
293 elem = super::data_.pull().data;
294 #endif
295 }
296
297 template <class T, class Clock, class TimePoint>
298 void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
299 {
300 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
301 elem = boost::move(super::data_.pull().data);
302 #else
303 elem = super::data_.pull().data;
304 #endif
305 }
306
307 template <class T, class Clock, class TimePoint>
308 void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
309 {
310 unique_lock<mutex> lk(super::mtx_);
311 const bool has_been_closed = wait_to_pull(lk);
312 if (has_been_closed) super::throw_if_closed(lk);
313 pull(lk, elem);
314 }
315
316 //////////////////////
317 template <class T, class Clock, class TimePoint>
318 template <class WClock, class Duration>
319 queue_op_status
320 sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem)
321 {
322 unique_lock<mutex> lk(super::mtx_);
323 const queue_op_status rc = wait_to_pull_until(lk, tp);
324 if (rc == queue_op_status::success) pull(lk, elem);
325 return rc;
326 }
327
328 //////////////////////
329 template <class T, class Clock, class TimePoint>
330 template <class Rep, class Period>
331 queue_op_status
332 sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
333 {
334 return pull_until(chrono::steady_clock::now() + dura, elem);
335 }
336
337 ///////////////////////////
338 template <class T, class Clock, class TimePoint>
339 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
340 {
341 if (not_empty_and_time_reached(lk))
342 {
343 pull(lk, elem);
344 return queue_op_status::success;
345 }
346 if (super::closed(lk)) return queue_op_status::closed;
347 if (super::empty(lk)) return queue_op_status::empty;
348 return queue_op_status::not_ready;
349 }
350 template <class T, class Clock, class TimePoint>
351 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
352 {
353 if (not_empty_and_time_reached(lk))
354 {
355 pull(lk, elem);
356 return queue_op_status::success;
357 }
358 if (super::closed(lk)) return queue_op_status::closed;
359 if (super::empty(lk)) return queue_op_status::empty;
360 return queue_op_status::not_ready;
361 }
362
363 template <class T, class Clock, class TimePoint>
364 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
365 {
366 lock_guard<mutex> lk(super::mtx_);
367 return try_pull(lk, elem);
368 }
369
370 ///////////////////////////
371 template <class T, class Clock, class TimePoint>
372 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
373 {
374 const bool has_been_closed = wait_to_pull(lk);
375 if (has_been_closed) return queue_op_status::closed;
376 pull(lk, elem);
377 return queue_op_status::success;
378 }
379
380 template <class T, class Clock, class TimePoint>
381 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
382 {
383 unique_lock<mutex> lk(super::mtx_);
384 return wait_pull(lk, elem);
385 }
386
387 ///////////////////////////
388 template <class T, class Clock, class TimePoint>
389 queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
390 {
391 unique_lock<mutex> lk(super::mtx_, try_to_lock);
392 if (! lk.owns_lock()) return queue_op_status::busy;
393 return try_pull(lk, elem);
394 }
395
396 } //end concurrent namespace
397
398 using concurrent::sync_timed_queue;
399
400 } //end boost namespace
401 #include <boost/config/abi_suffix.hpp>
402
403 #endif