]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/condvar.rs
Imported Upstream version 1.1.0+dfsg1
[rustc.git] / src / libstd / sync / condvar.rs
CommitLineData
1a4d82fc
JJ
1// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
11use prelude::v1::*;
12
85aaf69f 13use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
9346a6ac 14use sync::{mutex, MutexGuard, PoisonError};
1a4d82fc
JJ
15use sys_common::condvar as sys;
16use sys_common::mutex as sys_mutex;
9346a6ac
AL
17use sys_common::poison::{self, LockResult};
18use sys::time::SteadyTime;
1a4d82fc 19use time::Duration;
1a4d82fc
JJ
20
21/// A Condition Variable
22///
23/// Condition variables represent the ability to block a thread such that it
24/// consumes no CPU time while waiting for an event to occur. Condition
25/// variables are typically associated with a boolean predicate (a condition)
26/// and a mutex. The predicate is always verified inside of the mutex before
27/// determining that thread must block.
28///
29/// Functions in this module will block the current **thread** of execution and
30/// are bindings to system-provided condition variables where possible. Note
31/// that this module places one additional restriction over the system condition
32/// variables: each condvar can be used with precisely one mutex at runtime. Any
33/// attempt to use multiple mutexes on the same condition variable will result
34/// in a runtime panic. If this is not desired, then the unsafe primitives in
35/// `sys` do not have this restriction but may result in undefined behavior.
36///
c34b1796 37/// # Examples
1a4d82fc
JJ
38///
39/// ```
40/// use std::sync::{Arc, Mutex, Condvar};
85aaf69f 41/// use std::thread;
1a4d82fc
JJ
42///
43/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
44/// let pair2 = pair.clone();
45///
46/// // Inside of our lock, spawn a new thread, and then wait for it to start
85aaf69f 47/// thread::spawn(move|| {
1a4d82fc
JJ
48/// let &(ref lock, ref cvar) = &*pair2;
49/// let mut started = lock.lock().unwrap();
50/// *started = true;
51/// cvar.notify_one();
52/// });
53///
54/// // wait for the thread to start up
55/// let &(ref lock, ref cvar) = &*pair;
56/// let mut started = lock.lock().unwrap();
57/// while !*started {
58/// started = cvar.wait(started).unwrap();
59/// }
60/// ```
85aaf69f 61#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
62pub struct Condvar { inner: Box<StaticCondvar> }
63
1a4d82fc
JJ
64/// Statically allocated condition variables.
65///
66/// This structure is identical to `Condvar` except that it is suitable for use
67/// in static initializers for other structures.
68///
c34b1796 69/// # Examples
1a4d82fc
JJ
70///
71/// ```
d9579d0f 72/// # #![feature(static_condvar)]
1a4d82fc
JJ
73/// use std::sync::{StaticCondvar, CONDVAR_INIT};
74///
75/// static CVAR: StaticCondvar = CONDVAR_INIT;
76/// ```
d9579d0f 77#[unstable(feature = "static_condvar",
85aaf69f 78 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
79pub struct StaticCondvar {
80 inner: sys::Condvar,
85aaf69f 81 mutex: AtomicUsize,
1a4d82fc
JJ
82}
83
1a4d82fc 84/// Constant initializer for a statically allocated condition variable.
d9579d0f 85#[unstable(feature = "static_condvar",
85aaf69f 86 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
87pub const CONDVAR_INIT: StaticCondvar = StaticCondvar {
88 inner: sys::CONDVAR_INIT,
85aaf69f 89 mutex: ATOMIC_USIZE_INIT,
1a4d82fc
JJ
90};
91
92impl Condvar {
93 /// Creates a new condition variable which is ready to be waited on and
94 /// notified.
85aaf69f 95 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
96 pub fn new() -> Condvar {
97 Condvar {
98 inner: box StaticCondvar {
99 inner: unsafe { sys::Condvar::new() },
85aaf69f 100 mutex: AtomicUsize::new(0),
1a4d82fc
JJ
101 }
102 }
103 }
104
9346a6ac 105 /// Blocks the current thread until this condition variable receives a
1a4d82fc
JJ
106 /// notification.
107 ///
108 /// This function will atomically unlock the mutex specified (represented by
109 /// `mutex_guard`) and block the current thread. This means that any calls
110 /// to `notify_*()` which happen logically after the mutex is unlocked are
111 /// candidates to wake this thread up. When this function call returns, the
112 /// lock specified will have been re-acquired.
113 ///
114 /// Note that this function is susceptible to spurious wakeups. Condition
115 /// variables normally have a boolean predicate associated with them, and
116 /// the predicate must always be checked each time this function returns to
117 /// protect against spurious wakeups.
118 ///
119 /// # Failure
120 ///
121 /// This function will return an error if the mutex being waited on is
122 /// poisoned when this thread re-acquires the lock. For more information,
123 /// see information about poisoning on the Mutex type.
124 ///
125 /// # Panics
126 ///
127 /// This function will `panic!()` if it is used with more than one mutex
128 /// over time. Each condition variable is dynamically bound to exactly one
129 /// mutex to ensure defined behavior across platforms. If this functionality
130 /// is not desired, then unsafe primitives in `sys` are provided.
85aaf69f 131 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
132 pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>)
133 -> LockResult<MutexGuard<'a, T>> {
134 unsafe {
135 let me: &'static Condvar = &*(self as *const _);
136 me.inner.wait(guard)
137 }
138 }
139
9346a6ac 140 /// Waits on this condition variable for a notification, timing out after a
1a4d82fc
JJ
141 /// specified duration.
142 ///
c34b1796
AL
143 /// The semantics of this function are equivalent to `wait()`
144 /// except that the thread will be blocked for roughly no longer
145 /// than `ms` milliseconds. This method should not be used for
146 /// precise timing due to anomalies such as preemption or platform
147 /// differences that may not cause the maximum amount of time
148 /// waited to be precisely `ms`.
1a4d82fc 149 ///
c34b1796
AL
150 /// The returned boolean is `false` only if the timeout is known
151 /// to have elapsed.
1a4d82fc
JJ
152 ///
153 /// Like `wait`, the lock specified will be re-acquired when this function
154 /// returns, regardless of whether the timeout elapsed or not.
c34b1796
AL
155 #[stable(feature = "rust1", since = "1.0.0")]
156 pub fn wait_timeout_ms<'a, T>(&self, guard: MutexGuard<'a, T>, ms: u32)
157 -> LockResult<(MutexGuard<'a, T>, bool)> {
1a4d82fc
JJ
158 unsafe {
159 let me: &'static Condvar = &*(self as *const _);
c34b1796 160 me.inner.wait_timeout_ms(guard, ms)
1a4d82fc
JJ
161 }
162 }
163
d9579d0f
AL
164 /// Waits on this condition variable for a notification, timing out after a
165 /// specified duration.
166 ///
167 /// The semantics of this function are equivalent to `wait()` except that
168 /// the thread will be blocked for roughly no longer than `dur`. This
169 /// method should not be used for precise timing due to anomalies such as
170 /// preemption or platform differences that may not cause the maximum
171 /// amount of time waited to be precisely `dur`.
172 ///
173 /// The returned boolean is `false` only if the timeout is known
174 /// to have elapsed.
175 ///
176 /// Like `wait`, the lock specified will be re-acquired when this function
177 /// returns, regardless of whether the timeout elapsed or not.
178 #[unstable(feature = "wait_timeout", reason = "waiting for Duration")]
179 pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>,
180 dur: Duration)
181 -> LockResult<(MutexGuard<'a, T>, bool)> {
182 unsafe {
183 let me: &'static Condvar = &*(self as *const _);
184 me.inner.wait_timeout(guard, dur)
185 }
186 }
187
9346a6ac 188 /// Waits on this condition variable for a notification, timing out after a
85aaf69f
SL
189 /// specified duration.
190 ///
191 /// The semantics of this function are equivalent to `wait_timeout` except
192 /// that the implementation will repeatedly wait while the duration has not
193 /// passed and the provided function returns `false`.
c34b1796
AL
194 #[unstable(feature = "wait_timeout_with",
195 reason = "unsure if this API is broadly needed or what form it should take")]
85aaf69f
SL
196 pub fn wait_timeout_with<'a, T, F>(&self,
197 guard: MutexGuard<'a, T>,
198 dur: Duration,
199 f: F)
200 -> LockResult<(MutexGuard<'a, T>, bool)>
201 where F: FnMut(LockResult<&mut T>) -> bool {
202 unsafe {
203 let me: &'static Condvar = &*(self as *const _);
204 me.inner.wait_timeout_with(guard, dur, f)
205 }
206 }
207
9346a6ac 208 /// Wakes up one blocked thread on this condvar.
1a4d82fc
JJ
209 ///
210 /// If there is a blocked thread on this condition variable, then it will
211 /// be woken up from its call to `wait` or `wait_timeout`. Calls to
212 /// `notify_one` are not buffered in any way.
213 ///
85aaf69f
SL
214 /// To wake up all threads, see `notify_all()`.
215 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
216 pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } }
217
9346a6ac 218 /// Wakes up all blocked threads on this condvar.
1a4d82fc
JJ
219 ///
220 /// This method will ensure that any current waiters on the condition
221 /// variable are awoken. Calls to `notify_all()` are not buffered in any
222 /// way.
223 ///
224 /// To wake up only one thread, see `notify_one()`.
85aaf69f 225 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
226 pub fn notify_all(&self) { unsafe { self.inner.inner.notify_all() } }
227}
228
85aaf69f 229#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
230impl Drop for Condvar {
231 fn drop(&mut self) {
232 unsafe { self.inner.inner.destroy() }
233 }
234}
235
236impl StaticCondvar {
9346a6ac 237 /// Blocks the current thread until this condition variable receives a
1a4d82fc
JJ
238 /// notification.
239 ///
240 /// See `Condvar::wait`.
d9579d0f 241 #[unstable(feature = "static_condvar",
85aaf69f 242 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
243 pub fn wait<'a, T>(&'static self, guard: MutexGuard<'a, T>)
244 -> LockResult<MutexGuard<'a, T>> {
245 let poisoned = unsafe {
246 let lock = mutex::guard_lock(&guard);
247 self.verify(lock);
248 self.inner.wait(lock);
249 mutex::guard_poison(&guard).get()
250 };
251 if poisoned {
85aaf69f 252 Err(PoisonError::new(guard))
1a4d82fc
JJ
253 } else {
254 Ok(guard)
255 }
256 }
257
9346a6ac 258 /// Waits on this condition variable for a notification, timing out after a
1a4d82fc
JJ
259 /// specified duration.
260 ///
261 /// See `Condvar::wait_timeout`.
d9579d0f 262 #[unstable(feature = "static_condvar",
85aaf69f 263 reason = "may be merged with Condvar in the future")]
c34b1796
AL
264 pub fn wait_timeout_ms<'a, T>(&'static self, guard: MutexGuard<'a, T>, ms: u32)
265 -> LockResult<(MutexGuard<'a, T>, bool)> {
d9579d0f
AL
266 self.wait_timeout(guard, Duration::from_millis(ms as u64))
267 }
268
269 /// Waits on this condition variable for a notification, timing out after a
270 /// specified duration.
271 ///
272 /// See `Condvar::wait_timeout`.
273 #[unstable(feature = "static_condvar",
274 reason = "may be merged with Condvar in the future")]
275 pub fn wait_timeout<'a, T>(&'static self,
276 guard: MutexGuard<'a, T>,
277 timeout: Duration)
278 -> LockResult<(MutexGuard<'a, T>, bool)> {
1a4d82fc
JJ
279 let (poisoned, success) = unsafe {
280 let lock = mutex::guard_lock(&guard);
281 self.verify(lock);
d9579d0f 282 let success = self.inner.wait_timeout(lock, timeout);
1a4d82fc
JJ
283 (mutex::guard_poison(&guard).get(), success)
284 };
285 if poisoned {
85aaf69f 286 Err(PoisonError::new((guard, success)))
1a4d82fc
JJ
287 } else {
288 Ok((guard, success))
289 }
290 }
291
9346a6ac 292 /// Waits on this condition variable for a notification, timing out after a
85aaf69f
SL
293 /// specified duration.
294 ///
295 /// The implementation will repeatedly wait while the duration has not
296 /// passed and the function returns `false`.
297 ///
298 /// See `Condvar::wait_timeout_with`.
d9579d0f 299 #[unstable(feature = "static_condvar",
85aaf69f
SL
300 reason = "may be merged with Condvar in the future")]
301 pub fn wait_timeout_with<'a, T, F>(&'static self,
302 guard: MutexGuard<'a, T>,
303 dur: Duration,
304 mut f: F)
305 -> LockResult<(MutexGuard<'a, T>, bool)>
306 where F: FnMut(LockResult<&mut T>) -> bool {
d9579d0f
AL
307 // This could be made more efficient by pushing the implementation into
308 // sys::condvar
85aaf69f
SL
309 let start = SteadyTime::now();
310 let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
311 while !f(guard_result
312 .as_mut()
313 .map(|g| &mut **g)
314 .map_err(|e| PoisonError::new(&mut **e.get_mut()))) {
315 let now = SteadyTime::now();
316 let consumed = &now - &start;
317 let guard = guard_result.unwrap_or_else(|e| e.into_inner());
d9579d0f
AL
318 let (new_guard_result, no_timeout) = if consumed > dur {
319 (Ok(guard), false)
320 } else {
321 match self.wait_timeout(guard, dur - consumed) {
322 Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
323 Err(err) => {
324 let (new_guard, no_timeout) = err.into_inner();
325 (Err(PoisonError::new(new_guard)), no_timeout)
326 }
85aaf69f
SL
327 }
328 };
329 guard_result = new_guard_result;
330 if !no_timeout {
331 let result = f(guard_result
332 .as_mut()
333 .map(|g| &mut **g)
334 .map_err(|e| PoisonError::new(&mut **e.get_mut())));
335 return poison::map_result(guard_result, |g| (g, result));
336 }
337 }
338
339 poison::map_result(guard_result, |g| (g, true))
340 }
341
9346a6ac 342 /// Wakes up one blocked thread on this condvar.
1a4d82fc
JJ
343 ///
344 /// See `Condvar::notify_one`.
d9579d0f 345 #[unstable(feature = "static_condvar",
85aaf69f 346 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
347 pub fn notify_one(&'static self) { unsafe { self.inner.notify_one() } }
348
9346a6ac 349 /// Wakes up all blocked threads on this condvar.
1a4d82fc
JJ
350 ///
351 /// See `Condvar::notify_all`.
d9579d0f 352 #[unstable(feature = "static_condvar",
85aaf69f 353 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
354 pub fn notify_all(&'static self) { unsafe { self.inner.notify_all() } }
355
9346a6ac 356 /// Deallocates all resources associated with this static condvar.
1a4d82fc
JJ
357 ///
358 /// This method is unsafe to call as there is no guarantee that there are no
359 /// active users of the condvar, and this also doesn't prevent any future
360 /// users of the condvar. This method is required to be called to not leak
361 /// memory on all platforms.
d9579d0f 362 #[unstable(feature = "static_condvar",
85aaf69f 363 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
364 pub unsafe fn destroy(&'static self) {
365 self.inner.destroy()
366 }
367
368 fn verify(&self, mutex: &sys_mutex::Mutex) {
c34b1796 369 let addr = mutex as *const _ as usize;
1a4d82fc
JJ
370 match self.mutex.compare_and_swap(0, addr, Ordering::SeqCst) {
371 // If we got out 0, then we have successfully bound the mutex to
372 // this cvar.
373 0 => {}
374
375 // If we get out a value that's the same as `addr`, then someone
376 // already beat us to the punch.
377 n if n == addr => {}
378
379 // Anything else and we're using more than one mutex on this cvar,
380 // which is currently disallowed.
381 _ => panic!("attempted to use a condition variable with two \
382 mutexes"),
383 }
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use prelude::v1::*;
390
391 use super::{StaticCondvar, CONDVAR_INIT};
392 use sync::mpsc::channel;
393 use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
85aaf69f
SL
394 use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
395 use thread;
1a4d82fc 396 use time::Duration;
c34b1796 397 use u32;
1a4d82fc
JJ
398
399 #[test]
400 fn smoke() {
401 let c = Condvar::new();
402 c.notify_one();
403 c.notify_all();
404 }
405
406 #[test]
407 fn static_smoke() {
408 static C: StaticCondvar = CONDVAR_INIT;
409 C.notify_one();
410 C.notify_all();
411 unsafe { C.destroy(); }
412 }
413
414 #[test]
415 fn notify_one() {
416 static C: StaticCondvar = CONDVAR_INIT;
417 static M: StaticMutex = MUTEX_INIT;
418
419 let g = M.lock().unwrap();
85aaf69f 420 let _t = thread::spawn(move|| {
1a4d82fc
JJ
421 let _g = M.lock().unwrap();
422 C.notify_one();
423 });
424 let g = C.wait(g).unwrap();
425 drop(g);
426 unsafe { C.destroy(); M.destroy(); }
427 }
428
429 #[test]
430 fn notify_all() {
c34b1796 431 const N: usize = 10;
1a4d82fc
JJ
432
433 let data = Arc::new((Mutex::new(0), Condvar::new()));
434 let (tx, rx) = channel();
85aaf69f 435 for _ in 0..N {
1a4d82fc
JJ
436 let data = data.clone();
437 let tx = tx.clone();
85aaf69f 438 thread::spawn(move|| {
1a4d82fc
JJ
439 let &(ref lock, ref cond) = &*data;
440 let mut cnt = lock.lock().unwrap();
441 *cnt += 1;
442 if *cnt == N {
443 tx.send(()).unwrap();
444 }
445 while *cnt != 0 {
446 cnt = cond.wait(cnt).unwrap();
447 }
448 tx.send(()).unwrap();
449 });
450 }
451 drop(tx);
452
453 let &(ref lock, ref cond) = &*data;
454 rx.recv().unwrap();
455 let mut cnt = lock.lock().unwrap();
456 *cnt = 0;
457 cond.notify_all();
458 drop(cnt);
459
85aaf69f 460 for _ in 0..N {
1a4d82fc
JJ
461 rx.recv().unwrap();
462 }
463 }
464
465 #[test]
c34b1796 466 fn wait_timeout_ms() {
1a4d82fc
JJ
467 static C: StaticCondvar = CONDVAR_INIT;
468 static M: StaticMutex = MUTEX_INIT;
469
470 let g = M.lock().unwrap();
c34b1796 471 let (g, _no_timeout) = C.wait_timeout_ms(g, 1).unwrap();
85aaf69f
SL
472 // spurious wakeups mean this isn't necessarily true
473 // assert!(!no_timeout);
474 let _t = thread::spawn(move || {
1a4d82fc
JJ
475 let _g = M.lock().unwrap();
476 C.notify_one();
477 });
c34b1796 478 let (g, no_timeout) = C.wait_timeout_ms(g, u32::MAX).unwrap();
85aaf69f 479 assert!(no_timeout);
1a4d82fc
JJ
480 drop(g);
481 unsafe { C.destroy(); M.destroy(); }
482 }
483
85aaf69f
SL
484 #[test]
485 fn wait_timeout_with() {
486 static C: StaticCondvar = CONDVAR_INIT;
487 static M: StaticMutex = MUTEX_INIT;
488 static S: AtomicUsize = ATOMIC_USIZE_INIT;
489
490 let g = M.lock().unwrap();
d9579d0f
AL
491 let (g, success) = C.wait_timeout_with(g, Duration::new(0, 1000), |_| {
492 false
493 }).unwrap();
85aaf69f
SL
494 assert!(!success);
495
496 let (tx, rx) = channel();
497 let _t = thread::spawn(move || {
498 rx.recv().unwrap();
499 let g = M.lock().unwrap();
500 S.store(1, Ordering::SeqCst);
501 C.notify_one();
502 drop(g);
503
504 rx.recv().unwrap();
505 let g = M.lock().unwrap();
506 S.store(2, Ordering::SeqCst);
507 C.notify_one();
508 drop(g);
509
510 rx.recv().unwrap();
511 let _g = M.lock().unwrap();
512 S.store(3, Ordering::SeqCst);
513 C.notify_one();
514 });
515
516 let mut state = 0;
d9579d0f
AL
517 let day = 24 * 60 * 60;
518 let (_g, success) = C.wait_timeout_with(g, Duration::new(day, 0), |_| {
85aaf69f
SL
519 assert_eq!(state, S.load(Ordering::SeqCst));
520 tx.send(()).unwrap();
521 state += 1;
522 match state {
523 1|2 => false,
524 _ => true,
525 }
526 }).unwrap();
527 assert!(success);
528 }
529
1a4d82fc 530 #[test]
c34b1796 531 #[should_panic]
1a4d82fc
JJ
532 fn two_mutexes() {
533 static M1: StaticMutex = MUTEX_INIT;
534 static M2: StaticMutex = MUTEX_INIT;
535 static C: StaticCondvar = CONDVAR_INIT;
536
537 let mut g = M1.lock().unwrap();
85aaf69f 538 let _t = thread::spawn(move|| {
1a4d82fc
JJ
539 let _g = M1.lock().unwrap();
540 C.notify_one();
541 });
542 g = C.wait(g).unwrap();
543 drop(g);
544
545 let _ = C.wait(M2.lock().unwrap()).unwrap();
546 }
547}