]> git.proxmox.com Git - rustc.git/blob - src/vendor/parking_lot/src/condvar.rs
New upstream version 1.27.1+dfsg1
[rustc.git] / src / vendor / parking_lot / src / condvar.rs
1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7
8 use std::sync::atomic::{AtomicPtr, Ordering};
9 use std::time::{Duration, Instant};
10 use std::{ptr, fmt};
11 use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
12 use mutex::{guard_lock, MutexGuard};
13 use raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
14 use deadlock;
15
16 /// A type indicating whether a timed wait on a condition variable returned
17 /// due to a time out or not.
18 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
19 pub struct WaitTimeoutResult(bool);
20
21 impl WaitTimeoutResult {
22 /// Returns whether the wait was known to have timed out.
23 #[inline]
24 pub fn timed_out(&self) -> bool {
25 self.0
26 }
27 }
28
29 /// A Condition Variable
30 ///
31 /// Condition variables represent the ability to block a thread such that it
32 /// consumes no CPU time while waiting for an event to occur. Condition
33 /// variables are typically associated with a boolean predicate (a condition)
34 /// and a mutex. The predicate is always verified inside of the mutex before
35 /// determining that thread must block.
36 ///
37 /// Note that this module places one additional restriction over the system
38 /// condition variables: each condvar can be used with only one mutex at a
39 /// time. Any attempt to use multiple mutexes on the same condition variable
40 /// simultaneously will result in a runtime panic. However it is possible to
41 /// switch to a different mutex if there are no threads currently waiting on
42 /// the condition variable.
43 ///
44 /// # Differences from the standard library `Condvar`
45 ///
46 /// - No spurious wakeups: A wait will only return a non-timeout result if it
47 /// was woken up by `notify_one` or `notify_all`.
48 /// - `Condvar::notify_all` will only wake up a single thread, the rest are
49 /// requeued to wait for the `Mutex` to be unlocked by the thread that was
50 /// woken up.
51 /// - Only requires 1 word of space, whereas the standard library boxes the
52 /// `Condvar` due to platform limitations.
53 /// - Can be statically constructed (requires the `const_fn` nightly feature).
54 /// - Does not require any drop glue when dropped.
55 /// - Inline fast path for the uncontended case.
56 ///
57 /// # Examples
58 ///
59 /// ```
60 /// use parking_lot::{Mutex, Condvar};
61 /// use std::sync::Arc;
62 /// use std::thread;
63 ///
64 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
65 /// let pair2 = pair.clone();
66 ///
67 /// // Inside of our lock, spawn a new thread, and then wait for it to start
68 /// thread::spawn(move|| {
69 /// let &(ref lock, ref cvar) = &*pair2;
70 /// let mut started = lock.lock();
71 /// *started = true;
72 /// cvar.notify_one();
73 /// });
74 ///
75 /// // wait for the thread to start up
76 /// let &(ref lock, ref cvar) = &*pair;
77 /// let mut started = lock.lock();
78 /// while !*started {
79 /// cvar.wait(&mut started);
80 /// }
81 /// ```
82 pub struct Condvar {
83 state: AtomicPtr<RawMutex>,
84 }
85
86 impl Condvar {
87 /// Creates a new condition variable which is ready to be waited on and
88 /// notified.
89 #[cfg(feature = "nightly")]
90 #[inline]
91 pub const fn new() -> Condvar {
92 Condvar {
93 state: AtomicPtr::new(ptr::null_mut()),
94 }
95 }
96
97 /// Creates a new condition variable which is ready to be waited on and
98 /// notified.
99 #[cfg(not(feature = "nightly"))]
100 #[inline]
101 pub fn new() -> Condvar {
102 Condvar {
103 state: AtomicPtr::new(ptr::null_mut()),
104 }
105 }
106
107 /// Wakes up one blocked thread on this condvar.
108 ///
109 /// If there is a blocked thread on this condition variable, then it will
110 /// be woken up from its call to `wait` or `wait_timeout`. Calls to
111 /// `notify_one` are not buffered in any way.
112 ///
113 /// To wake up all threads, see `notify_all()`.
114 #[inline]
115 pub fn notify_one(&self) {
116 // Nothing to do if there are no waiting threads
117 if self.state.load(Ordering::Relaxed).is_null() {
118 return;
119 }
120
121 self.notify_one_slow();
122 }
123
124 #[cold]
125 #[inline(never)]
126 fn notify_one_slow(&self) {
127 unsafe {
128 // Unpark one thread
129 let addr = self as *const _ as usize;
130 let callback = |result: UnparkResult| {
131 // Clear our state if there are no more waiting threads
132 if !result.have_more_threads {
133 self.state.store(ptr::null_mut(), Ordering::Relaxed);
134 }
135 TOKEN_NORMAL
136 };
137 parking_lot_core::unpark_one(addr, callback);
138 }
139 }
140
141 /// Wakes up all blocked threads on this condvar.
142 ///
143 /// This method will ensure that any current waiters on the condition
144 /// variable are awoken. Calls to `notify_all()` are not buffered in any
145 /// way.
146 ///
147 /// To wake up only one thread, see `notify_one()`.
148 #[inline]
149 pub fn notify_all(&self) {
150 // Nothing to do if there are no waiting threads
151 let state = self.state.load(Ordering::Relaxed);
152 if state.is_null() {
153 return;
154 }
155
156 self.notify_all_slow(state);
157 }
158
159 #[cold]
160 #[inline(never)]
161 fn notify_all_slow(&self, mutex: *mut RawMutex) {
162 unsafe {
163 // Unpark one thread and requeue the rest onto the mutex
164 let from = self as *const _ as usize;
165 let to = mutex as usize;
166 let validate = || {
167 // Make sure that our atomic state still points to the same
168 // mutex. If not then it means that all threads on the current
169 // mutex were woken up and a new waiting thread switched to a
170 // different mutex. In that case we can get away with doing
171 // nothing.
172 if self.state.load(Ordering::Relaxed) != mutex {
173 return RequeueOp::Abort;
174 }
175
176 // Clear our state since we are going to unpark or requeue all
177 // threads.
178 self.state.store(ptr::null_mut(), Ordering::Relaxed);
179
180 // Unpark one thread if the mutex is unlocked, otherwise just
181 // requeue everything to the mutex. This is safe to do here
182 // since unlocking the mutex when the parked bit is set requires
183 // locking the queue. There is the possibility of a race if the
184 // mutex gets locked after we check, but that doesn't matter in
185 // this case.
186 if (*mutex).mark_parked_if_locked() {
187 RequeueOp::RequeueAll
188 } else {
189 RequeueOp::UnparkOneRequeueRest
190 }
191 };
192 let callback = |op, result: UnparkResult| {
193 // If we requeued threads to the mutex, mark it as having
194 // parked threads. The RequeueAll case is already handled above.
195 if op == RequeueOp::UnparkOneRequeueRest && result.have_more_threads {
196 (*mutex).mark_parked();
197 }
198 TOKEN_NORMAL
199 };
200 parking_lot_core::unpark_requeue(from, to, validate, callback);
201 }
202 }
203
204 /// Blocks the current thread until this condition variable receives a
205 /// notification.
206 ///
207 /// This function will atomically unlock the mutex specified (represented by
208 /// `mutex_guard`) and block the current thread. This means that any calls
209 /// to `notify_*()` which happen logically after the mutex is unlocked are
210 /// candidates to wake this thread up. When this function call returns, the
211 /// lock specified will have been re-acquired.
212 ///
213 /// # Panics
214 ///
215 /// This function will panic if another thread is waiting on the `Condvar`
216 /// with a different `Mutex` object.
217 #[inline]
218 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<T>) {
219 self.wait_until_internal(guard_lock(mutex_guard), None);
220 }
221
222 /// Waits on this condition variable for a notification, timing out after
223 /// the specified time instant.
224 ///
225 /// The semantics of this function are equivalent to `wait()` except that
226 /// the thread will be blocked roughly until `timeout` is reached. This
227 /// method should not be used for precise timing due to anomalies such as
228 /// preemption or platform differences that may not cause the maximum
229 /// amount of time waited to be precisely `timeout`.
230 ///
231 /// Note that the best effort is made to ensure that the time waited is
232 /// measured with a monotonic clock, and not affected by the changes made to
233 /// the system time.
234 ///
235 /// The returned `WaitTimeoutResult` value indicates if the timeout is
236 /// known to have elapsed.
237 ///
238 /// Like `wait`, the lock specified will be re-acquired when this function
239 /// returns, regardless of whether the timeout elapsed or not.
240 ///
241 /// # Panics
242 ///
243 /// This function will panic if another thread is waiting on the `Condvar`
244 /// with a different `Mutex` object.
245 #[inline]
246 pub fn wait_until<T: ?Sized>(
247 &self,
248 mutex_guard: &mut MutexGuard<T>,
249 timeout: Instant,
250 ) -> WaitTimeoutResult {
251 self.wait_until_internal(guard_lock(mutex_guard), Some(timeout))
252 }
253
254 // This is a non-generic function to reduce the monomorphization cost of
255 // using `wait_until`.
256 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
257 unsafe {
258 let result;
259 let mut bad_mutex = false;
260 let mut requeued = false;
261 {
262 let addr = self as *const _ as usize;
263 let lock_addr = mutex as *const _ as *mut _;
264 let validate = || {
265 // Ensure we don't use two different mutexes with the same
266 // Condvar at the same time. This is done while locked to
267 // avoid races with notify_one
268 let state = self.state.load(Ordering::Relaxed);
269 if state.is_null() {
270 self.state.store(lock_addr, Ordering::Relaxed);
271 } else if state != lock_addr {
272 bad_mutex = true;
273 return false;
274 }
275 true
276 };
277 let before_sleep = || {
278 // Unlock the mutex before sleeping...
279 mutex.unlock(false);
280 };
281 let timed_out = |k, was_last_thread| {
282 // If we were requeued to a mutex, then we did not time out.
283 // We'll just park ourselves on the mutex again when we try
284 // to lock it later.
285 requeued = k != addr;
286
287 // If we were the last thread on the queue then we need to
288 // clear our state. This is normally done by the
289 // notify_{one,all} functions when not timing out.
290 if !requeued && was_last_thread {
291 self.state.store(ptr::null_mut(), Ordering::Relaxed);
292 }
293 };
294 result = parking_lot_core::park(
295 addr,
296 validate,
297 before_sleep,
298 timed_out,
299 DEFAULT_PARK_TOKEN,
300 timeout,
301 );
302 }
303
304 // Panic if we tried to use multiple mutexes with a Condvar. Note
305 // that at this point the MutexGuard is still locked. It will be
306 // unlocked by the unwinding logic.
307 if bad_mutex {
308 panic!("attempted to use a condition variable with more than one mutex");
309 }
310
311 // ... and re-lock it once we are done sleeping
312 if result == ParkResult::Unparked(TOKEN_HANDOFF) {
313 deadlock::acquire_resource(mutex as *const _ as usize);
314 } else {
315 mutex.lock();
316 }
317
318 WaitTimeoutResult(!(result.is_unparked() || requeued))
319 }
320 }
321
322 /// Waits on this condition variable for a notification, timing out after a
323 /// specified duration.
324 ///
325 /// The semantics of this function are equivalent to `wait()` except that
326 /// the thread will be blocked for roughly no longer than `timeout`. This
327 /// method should not be used for precise timing due to anomalies such as
328 /// preemption or platform differences that may not cause the maximum
329 /// amount of time waited to be precisely `timeout`.
330 ///
331 /// Note that the best effort is made to ensure that the time waited is
332 /// measured with a monotonic clock, and not affected by the changes made to
333 /// the system time.
334 ///
335 /// The returned `WaitTimeoutResult` value indicates if the timeout is
336 /// known to have elapsed.
337 ///
338 /// Like `wait`, the lock specified will be re-acquired when this function
339 /// returns, regardless of whether the timeout elapsed or not.
340 #[inline]
341 pub fn wait_for<T: ?Sized>(
342 &self,
343 guard: &mut MutexGuard<T>,
344 timeout: Duration,
345 ) -> WaitTimeoutResult {
346 self.wait_until(guard, Instant::now() + timeout)
347 }
348 }
349
350 impl Default for Condvar {
351 #[inline]
352 fn default() -> Condvar {
353 Condvar::new()
354 }
355 }
356
357 impl fmt::Debug for Condvar {
358 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
359 f.pad("Condvar { .. }")
360 }
361 }
362
363 #[cfg(test)]
364 mod tests {
365 use std::sync::mpsc::channel;
366 use std::sync::Arc;
367 use std::thread;
368 use std::time::{Duration, Instant};
369 use {Condvar, Mutex};
370
371 #[test]
372 fn smoke() {
373 let c = Condvar::new();
374 c.notify_one();
375 c.notify_all();
376 }
377
378 #[test]
379 fn notify_one() {
380 let m = Arc::new(Mutex::new(()));
381 let m2 = m.clone();
382 let c = Arc::new(Condvar::new());
383 let c2 = c.clone();
384
385 let mut g = m.lock();
386 let _t = thread::spawn(move || {
387 let _g = m2.lock();
388 c2.notify_one();
389 });
390 c.wait(&mut g);
391 }
392
393 #[test]
394 fn notify_all() {
395 const N: usize = 10;
396
397 let data = Arc::new((Mutex::new(0), Condvar::new()));
398 let (tx, rx) = channel();
399 for _ in 0..N {
400 let data = data.clone();
401 let tx = tx.clone();
402 thread::spawn(move || {
403 let &(ref lock, ref cond) = &*data;
404 let mut cnt = lock.lock();
405 *cnt += 1;
406 if *cnt == N {
407 tx.send(()).unwrap();
408 }
409 while *cnt != 0 {
410 cond.wait(&mut cnt);
411 }
412 tx.send(()).unwrap();
413 });
414 }
415 drop(tx);
416
417 let &(ref lock, ref cond) = &*data;
418 rx.recv().unwrap();
419 let mut cnt = lock.lock();
420 *cnt = 0;
421 cond.notify_all();
422 drop(cnt);
423
424 for _ in 0..N {
425 rx.recv().unwrap();
426 }
427 }
428
429 #[test]
430 fn wait_for() {
431 let m = Arc::new(Mutex::new(()));
432 let m2 = m.clone();
433 let c = Arc::new(Condvar::new());
434 let c2 = c.clone();
435
436 let mut g = m.lock();
437 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
438 assert!(no_timeout.timed_out());
439 let _t = thread::spawn(move || {
440 let _g = m2.lock();
441 c2.notify_one();
442 });
443 let timeout_res = c.wait_for(&mut g, Duration::from_millis(u32::max_value() as u64));
444 assert!(!timeout_res.timed_out());
445 drop(g);
446 }
447
448 #[test]
449 fn wait_until() {
450 let m = Arc::new(Mutex::new(()));
451 let m2 = m.clone();
452 let c = Arc::new(Condvar::new());
453 let c2 = c.clone();
454
455 let mut g = m.lock();
456 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
457 assert!(no_timeout.timed_out());
458 let _t = thread::spawn(move || {
459 let _g = m2.lock();
460 c2.notify_one();
461 });
462 let timeout_res = c.wait_until(
463 &mut g,
464 Instant::now() + Duration::from_millis(u32::max_value() as u64),
465 );
466 assert!(!timeout_res.timed_out());
467 drop(g);
468 }
469
470 #[test]
471 #[should_panic]
472 fn two_mutexes() {
473 let m = Arc::new(Mutex::new(()));
474 let m2 = m.clone();
475 let m3 = Arc::new(Mutex::new(()));
476 let c = Arc::new(Condvar::new());
477 let c2 = c.clone();
478
479 // Make sure we don't leave the child thread dangling
480 struct PanicGuard<'a>(&'a Condvar);
481 impl<'a> Drop for PanicGuard<'a> {
482 fn drop(&mut self) {
483 self.0.notify_one();
484 }
485 }
486
487 let (tx, rx) = channel();
488 let g = m.lock();
489 let _t = thread::spawn(move || {
490 let mut g = m2.lock();
491 tx.send(()).unwrap();
492 c2.wait(&mut g);
493 });
494 drop(g);
495 rx.recv().unwrap();
496 let _g = m.lock();
497 let _guard = PanicGuard(&*c);
498 let _ = c.wait(&mut m3.lock());
499 }
500
501 #[test]
502 fn two_mutexes_disjoint() {
503 let m = Arc::new(Mutex::new(()));
504 let m2 = m.clone();
505 let m3 = Arc::new(Mutex::new(()));
506 let c = Arc::new(Condvar::new());
507 let c2 = c.clone();
508
509 let mut g = m.lock();
510 let _t = thread::spawn(move || {
511 let _g = m2.lock();
512 c2.notify_one();
513 });
514 c.wait(&mut g);
515 drop(g);
516
517 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
518 }
519
520 #[test]
521 fn test_debug_condvar() {
522 let c = Condvar::new();
523 assert_eq!(format!("{:?}", c), "Condvar { .. }");
524 }
525 }