]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/condvar.rs
Imported Upstream version 1.0.0~beta.3
[rustc.git] / src / libstd / sync / condvar.rs
CommitLineData
1a4d82fc
JJ
1// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
11use prelude::v1::*;
12
85aaf69f 13use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
9346a6ac 14use sync::{mutex, MutexGuard, PoisonError};
1a4d82fc
JJ
15use sys_common::condvar as sys;
16use sys_common::mutex as sys_mutex;
9346a6ac
AL
17use sys_common::poison::{self, LockResult};
18use sys::time::SteadyTime;
1a4d82fc 19use time::Duration;
1a4d82fc
JJ
20
21/// A Condition Variable
22///
23/// Condition variables represent the ability to block a thread such that it
24/// consumes no CPU time while waiting for an event to occur. Condition
25/// variables are typically associated with a boolean predicate (a condition)
26/// and a mutex. The predicate is always verified inside of the mutex before
27/// determining that thread must block.
28///
29/// Functions in this module will block the current **thread** of execution and
30/// are bindings to system-provided condition variables where possible. Note
31/// that this module places one additional restriction over the system condition
32/// variables: each condvar can be used with precisely one mutex at runtime. Any
33/// attempt to use multiple mutexes on the same condition variable will result
34/// in a runtime panic. If this is not desired, then the unsafe primitives in
35/// `sys` do not have this restriction but may result in undefined behavior.
36///
c34b1796 37/// # Examples
1a4d82fc
JJ
38///
39/// ```
40/// use std::sync::{Arc, Mutex, Condvar};
85aaf69f 41/// use std::thread;
1a4d82fc
JJ
42///
43/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
44/// let pair2 = pair.clone();
45///
46/// // Inside of our lock, spawn a new thread, and then wait for it to start
85aaf69f 47/// thread::spawn(move|| {
1a4d82fc
JJ
48/// let &(ref lock, ref cvar) = &*pair2;
49/// let mut started = lock.lock().unwrap();
50/// *started = true;
51/// cvar.notify_one();
52/// });
53///
54/// // wait for the thread to start up
55/// let &(ref lock, ref cvar) = &*pair;
56/// let mut started = lock.lock().unwrap();
57/// while !*started {
58/// started = cvar.wait(started).unwrap();
59/// }
60/// ```
85aaf69f 61#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
62pub struct Condvar { inner: Box<StaticCondvar> }
63
1a4d82fc
JJ
64/// Statically allocated condition variables.
65///
66/// This structure is identical to `Condvar` except that it is suitable for use
67/// in static initializers for other structures.
68///
c34b1796 69/// # Examples
1a4d82fc
JJ
70///
71/// ```
c34b1796 72/// # #![feature(std_misc)]
1a4d82fc
JJ
73/// use std::sync::{StaticCondvar, CONDVAR_INIT};
74///
75/// static CVAR: StaticCondvar = CONDVAR_INIT;
76/// ```
85aaf69f
SL
77#[unstable(feature = "std_misc",
78 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
79pub struct StaticCondvar {
80 inner: sys::Condvar,
85aaf69f 81 mutex: AtomicUsize,
1a4d82fc
JJ
82}
83
1a4d82fc 84/// Constant initializer for a statically allocated condition variable.
85aaf69f
SL
85#[unstable(feature = "std_misc",
86 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
87pub const CONDVAR_INIT: StaticCondvar = StaticCondvar {
88 inner: sys::CONDVAR_INIT,
85aaf69f 89 mutex: ATOMIC_USIZE_INIT,
1a4d82fc
JJ
90};
91
92impl Condvar {
93 /// Creates a new condition variable which is ready to be waited on and
94 /// notified.
85aaf69f 95 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
96 pub fn new() -> Condvar {
97 Condvar {
98 inner: box StaticCondvar {
99 inner: unsafe { sys::Condvar::new() },
85aaf69f 100 mutex: AtomicUsize::new(0),
1a4d82fc
JJ
101 }
102 }
103 }
104
9346a6ac 105 /// Blocks the current thread until this condition variable receives a
1a4d82fc
JJ
106 /// notification.
107 ///
108 /// This function will atomically unlock the mutex specified (represented by
109 /// `mutex_guard`) and block the current thread. This means that any calls
110 /// to `notify_*()` which happen logically after the mutex is unlocked are
111 /// candidates to wake this thread up. When this function call returns, the
112 /// lock specified will have been re-acquired.
113 ///
114 /// Note that this function is susceptible to spurious wakeups. Condition
115 /// variables normally have a boolean predicate associated with them, and
116 /// the predicate must always be checked each time this function returns to
117 /// protect against spurious wakeups.
118 ///
119 /// # Failure
120 ///
121 /// This function will return an error if the mutex being waited on is
122 /// poisoned when this thread re-acquires the lock. For more information,
123 /// see information about poisoning on the Mutex type.
124 ///
125 /// # Panics
126 ///
127 /// This function will `panic!()` if it is used with more than one mutex
128 /// over time. Each condition variable is dynamically bound to exactly one
129 /// mutex to ensure defined behavior across platforms. If this functionality
130 /// is not desired, then unsafe primitives in `sys` are provided.
85aaf69f 131 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
132 pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>)
133 -> LockResult<MutexGuard<'a, T>> {
134 unsafe {
135 let me: &'static Condvar = &*(self as *const _);
136 me.inner.wait(guard)
137 }
138 }
139
9346a6ac 140 /// Waits on this condition variable for a notification, timing out after a
1a4d82fc
JJ
141 /// specified duration.
142 ///
c34b1796
AL
143 /// The semantics of this function are equivalent to `wait()`
144 /// except that the thread will be blocked for roughly no longer
145 /// than `ms` milliseconds. This method should not be used for
146 /// precise timing due to anomalies such as preemption or platform
147 /// differences that may not cause the maximum amount of time
148 /// waited to be precisely `ms`.
1a4d82fc 149 ///
c34b1796
AL
150 /// The returned boolean is `false` only if the timeout is known
151 /// to have elapsed.
1a4d82fc
JJ
152 ///
153 /// Like `wait`, the lock specified will be re-acquired when this function
154 /// returns, regardless of whether the timeout elapsed or not.
c34b1796
AL
155 #[stable(feature = "rust1", since = "1.0.0")]
156 pub fn wait_timeout_ms<'a, T>(&self, guard: MutexGuard<'a, T>, ms: u32)
157 -> LockResult<(MutexGuard<'a, T>, bool)> {
1a4d82fc
JJ
158 unsafe {
159 let me: &'static Condvar = &*(self as *const _);
c34b1796 160 me.inner.wait_timeout_ms(guard, ms)
1a4d82fc
JJ
161 }
162 }
163
9346a6ac 164 /// Waits on this condition variable for a notification, timing out after a
85aaf69f
SL
165 /// specified duration.
166 ///
167 /// The semantics of this function are equivalent to `wait_timeout` except
168 /// that the implementation will repeatedly wait while the duration has not
169 /// passed and the provided function returns `false`.
c34b1796
AL
170 #[unstable(feature = "wait_timeout_with",
171 reason = "unsure if this API is broadly needed or what form it should take")]
85aaf69f
SL
172 pub fn wait_timeout_with<'a, T, F>(&self,
173 guard: MutexGuard<'a, T>,
174 dur: Duration,
175 f: F)
176 -> LockResult<(MutexGuard<'a, T>, bool)>
177 where F: FnMut(LockResult<&mut T>) -> bool {
178 unsafe {
179 let me: &'static Condvar = &*(self as *const _);
180 me.inner.wait_timeout_with(guard, dur, f)
181 }
182 }
183
9346a6ac 184 /// Wakes up one blocked thread on this condvar.
1a4d82fc
JJ
185 ///
186 /// If there is a blocked thread on this condition variable, then it will
187 /// be woken up from its call to `wait` or `wait_timeout`. Calls to
188 /// `notify_one` are not buffered in any way.
189 ///
85aaf69f
SL
190 /// To wake up all threads, see `notify_all()`.
191 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
192 pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } }
193
9346a6ac 194 /// Wakes up all blocked threads on this condvar.
1a4d82fc
JJ
195 ///
196 /// This method will ensure that any current waiters on the condition
197 /// variable are awoken. Calls to `notify_all()` are not buffered in any
198 /// way.
199 ///
200 /// To wake up only one thread, see `notify_one()`.
85aaf69f 201 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
202 pub fn notify_all(&self) { unsafe { self.inner.inner.notify_all() } }
203}
204
85aaf69f 205#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
206impl Drop for Condvar {
207 fn drop(&mut self) {
208 unsafe { self.inner.inner.destroy() }
209 }
210}
211
212impl StaticCondvar {
9346a6ac 213 /// Blocks the current thread until this condition variable receives a
1a4d82fc
JJ
214 /// notification.
215 ///
216 /// See `Condvar::wait`.
85aaf69f
SL
217 #[unstable(feature = "std_misc",
218 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
219 pub fn wait<'a, T>(&'static self, guard: MutexGuard<'a, T>)
220 -> LockResult<MutexGuard<'a, T>> {
221 let poisoned = unsafe {
222 let lock = mutex::guard_lock(&guard);
223 self.verify(lock);
224 self.inner.wait(lock);
225 mutex::guard_poison(&guard).get()
226 };
227 if poisoned {
85aaf69f 228 Err(PoisonError::new(guard))
1a4d82fc
JJ
229 } else {
230 Ok(guard)
231 }
232 }
233
9346a6ac 234 /// Waits on this condition variable for a notification, timing out after a
1a4d82fc
JJ
235 /// specified duration.
236 ///
237 /// See `Condvar::wait_timeout`.
85aaf69f
SL
238 #[unstable(feature = "std_misc",
239 reason = "may be merged with Condvar in the future")]
c34b1796
AL
240 pub fn wait_timeout_ms<'a, T>(&'static self, guard: MutexGuard<'a, T>, ms: u32)
241 -> LockResult<(MutexGuard<'a, T>, bool)> {
1a4d82fc
JJ
242 let (poisoned, success) = unsafe {
243 let lock = mutex::guard_lock(&guard);
244 self.verify(lock);
c34b1796 245 let success = self.inner.wait_timeout(lock, Duration::milliseconds(ms as i64));
1a4d82fc
JJ
246 (mutex::guard_poison(&guard).get(), success)
247 };
248 if poisoned {
85aaf69f 249 Err(PoisonError::new((guard, success)))
1a4d82fc
JJ
250 } else {
251 Ok((guard, success))
252 }
253 }
254
9346a6ac 255 /// Waits on this condition variable for a notification, timing out after a
85aaf69f
SL
256 /// specified duration.
257 ///
258 /// The implementation will repeatedly wait while the duration has not
259 /// passed and the function returns `false`.
260 ///
261 /// See `Condvar::wait_timeout_with`.
262 #[unstable(feature = "std_misc",
263 reason = "may be merged with Condvar in the future")]
264 pub fn wait_timeout_with<'a, T, F>(&'static self,
265 guard: MutexGuard<'a, T>,
266 dur: Duration,
267 mut f: F)
268 -> LockResult<(MutexGuard<'a, T>, bool)>
269 where F: FnMut(LockResult<&mut T>) -> bool {
270 // This could be made more efficient by pushing the implementation into sys::condvar
271 let start = SteadyTime::now();
272 let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
273 while !f(guard_result
274 .as_mut()
275 .map(|g| &mut **g)
276 .map_err(|e| PoisonError::new(&mut **e.get_mut()))) {
277 let now = SteadyTime::now();
278 let consumed = &now - &start;
279 let guard = guard_result.unwrap_or_else(|e| e.into_inner());
c34b1796
AL
280 let res = self.wait_timeout_ms(guard, (dur - consumed).num_milliseconds() as u32);
281 let (new_guard_result, no_timeout) = match res {
85aaf69f
SL
282 Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
283 Err(err) => {
284 let (new_guard, no_timeout) = err.into_inner();
285 (Err(PoisonError::new(new_guard)), no_timeout)
286 }
287 };
288 guard_result = new_guard_result;
289 if !no_timeout {
290 let result = f(guard_result
291 .as_mut()
292 .map(|g| &mut **g)
293 .map_err(|e| PoisonError::new(&mut **e.get_mut())));
294 return poison::map_result(guard_result, |g| (g, result));
295 }
296 }
297
298 poison::map_result(guard_result, |g| (g, true))
299 }
300
9346a6ac 301 /// Wakes up one blocked thread on this condvar.
1a4d82fc
JJ
302 ///
303 /// See `Condvar::notify_one`.
85aaf69f
SL
304 #[unstable(feature = "std_misc",
305 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
306 pub fn notify_one(&'static self) { unsafe { self.inner.notify_one() } }
307
9346a6ac 308 /// Wakes up all blocked threads on this condvar.
1a4d82fc
JJ
309 ///
310 /// See `Condvar::notify_all`.
85aaf69f
SL
311 #[unstable(feature = "std_misc",
312 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
313 pub fn notify_all(&'static self) { unsafe { self.inner.notify_all() } }
314
9346a6ac 315 /// Deallocates all resources associated with this static condvar.
1a4d82fc
JJ
316 ///
317 /// This method is unsafe to call as there is no guarantee that there are no
318 /// active users of the condvar, and this also doesn't prevent any future
319 /// users of the condvar. This method is required to be called to not leak
320 /// memory on all platforms.
85aaf69f
SL
321 #[unstable(feature = "std_misc",
322 reason = "may be merged with Condvar in the future")]
1a4d82fc
JJ
323 pub unsafe fn destroy(&'static self) {
324 self.inner.destroy()
325 }
326
327 fn verify(&self, mutex: &sys_mutex::Mutex) {
c34b1796 328 let addr = mutex as *const _ as usize;
1a4d82fc
JJ
329 match self.mutex.compare_and_swap(0, addr, Ordering::SeqCst) {
330 // If we got out 0, then we have successfully bound the mutex to
331 // this cvar.
332 0 => {}
333
334 // If we get out a value that's the same as `addr`, then someone
335 // already beat us to the punch.
336 n if n == addr => {}
337
338 // Anything else and we're using more than one mutex on this cvar,
339 // which is currently disallowed.
340 _ => panic!("attempted to use a condition variable with two \
341 mutexes"),
342 }
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use prelude::v1::*;
349
350 use super::{StaticCondvar, CONDVAR_INIT};
351 use sync::mpsc::channel;
352 use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
85aaf69f
SL
353 use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
354 use thread;
1a4d82fc 355 use time::Duration;
c34b1796 356 use u32;
1a4d82fc
JJ
357
358 #[test]
359 fn smoke() {
360 let c = Condvar::new();
361 c.notify_one();
362 c.notify_all();
363 }
364
365 #[test]
366 fn static_smoke() {
367 static C: StaticCondvar = CONDVAR_INIT;
368 C.notify_one();
369 C.notify_all();
370 unsafe { C.destroy(); }
371 }
372
373 #[test]
374 fn notify_one() {
375 static C: StaticCondvar = CONDVAR_INIT;
376 static M: StaticMutex = MUTEX_INIT;
377
378 let g = M.lock().unwrap();
85aaf69f 379 let _t = thread::spawn(move|| {
1a4d82fc
JJ
380 let _g = M.lock().unwrap();
381 C.notify_one();
382 });
383 let g = C.wait(g).unwrap();
384 drop(g);
385 unsafe { C.destroy(); M.destroy(); }
386 }
387
388 #[test]
389 fn notify_all() {
c34b1796 390 const N: usize = 10;
1a4d82fc
JJ
391
392 let data = Arc::new((Mutex::new(0), Condvar::new()));
393 let (tx, rx) = channel();
85aaf69f 394 for _ in 0..N {
1a4d82fc
JJ
395 let data = data.clone();
396 let tx = tx.clone();
85aaf69f 397 thread::spawn(move|| {
1a4d82fc
JJ
398 let &(ref lock, ref cond) = &*data;
399 let mut cnt = lock.lock().unwrap();
400 *cnt += 1;
401 if *cnt == N {
402 tx.send(()).unwrap();
403 }
404 while *cnt != 0 {
405 cnt = cond.wait(cnt).unwrap();
406 }
407 tx.send(()).unwrap();
408 });
409 }
410 drop(tx);
411
412 let &(ref lock, ref cond) = &*data;
413 rx.recv().unwrap();
414 let mut cnt = lock.lock().unwrap();
415 *cnt = 0;
416 cond.notify_all();
417 drop(cnt);
418
85aaf69f 419 for _ in 0..N {
1a4d82fc
JJ
420 rx.recv().unwrap();
421 }
422 }
423
424 #[test]
c34b1796 425 fn wait_timeout_ms() {
1a4d82fc
JJ
426 static C: StaticCondvar = CONDVAR_INIT;
427 static M: StaticMutex = MUTEX_INIT;
428
429 let g = M.lock().unwrap();
c34b1796 430 let (g, _no_timeout) = C.wait_timeout_ms(g, 1).unwrap();
85aaf69f
SL
431 // spurious wakeups mean this isn't necessarily true
432 // assert!(!no_timeout);
433 let _t = thread::spawn(move || {
1a4d82fc
JJ
434 let _g = M.lock().unwrap();
435 C.notify_one();
436 });
c34b1796 437 let (g, no_timeout) = C.wait_timeout_ms(g, u32::MAX).unwrap();
85aaf69f 438 assert!(no_timeout);
1a4d82fc
JJ
439 drop(g);
440 unsafe { C.destroy(); M.destroy(); }
441 }
442
85aaf69f
SL
443 #[test]
444 fn wait_timeout_with() {
445 static C: StaticCondvar = CONDVAR_INIT;
446 static M: StaticMutex = MUTEX_INIT;
447 static S: AtomicUsize = ATOMIC_USIZE_INIT;
448
449 let g = M.lock().unwrap();
450 let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap();
451 assert!(!success);
452
453 let (tx, rx) = channel();
454 let _t = thread::spawn(move || {
455 rx.recv().unwrap();
456 let g = M.lock().unwrap();
457 S.store(1, Ordering::SeqCst);
458 C.notify_one();
459 drop(g);
460
461 rx.recv().unwrap();
462 let g = M.lock().unwrap();
463 S.store(2, Ordering::SeqCst);
464 C.notify_one();
465 drop(g);
466
467 rx.recv().unwrap();
468 let _g = M.lock().unwrap();
469 S.store(3, Ordering::SeqCst);
470 C.notify_one();
471 });
472
473 let mut state = 0;
474 let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| {
475 assert_eq!(state, S.load(Ordering::SeqCst));
476 tx.send(()).unwrap();
477 state += 1;
478 match state {
479 1|2 => false,
480 _ => true,
481 }
482 }).unwrap();
483 assert!(success);
484 }
485
1a4d82fc 486 #[test]
c34b1796 487 #[should_panic]
1a4d82fc
JJ
488 fn two_mutexes() {
489 static M1: StaticMutex = MUTEX_INIT;
490 static M2: StaticMutex = MUTEX_INIT;
491 static C: StaticCondvar = CONDVAR_INIT;
492
493 let mut g = M1.lock().unwrap();
85aaf69f 494 let _t = thread::spawn(move|| {
1a4d82fc
JJ
495 let _g = M1.lock().unwrap();
496 C.notify_one();
497 });
498 g = C.wait(g).unwrap();
499 drop(g);
500
501 let _ = C.wait(M2.lock().unwrap()).unwrap();
502 }
503}