1 // Copyright 2016 Amanieu d'Antras
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
8 use std
::sync
::atomic
::{AtomicPtr, Ordering}
;
9 use std
::time
::{Duration, Instant}
;
11 use parking_lot_core
::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN}
;
12 use mutex
::{guard_lock, MutexGuard}
;
13 use raw_mutex
::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL}
;
16 /// A type indicating whether a timed wait on a condition variable returned
17 /// due to a time out or not.
18 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
19 pub struct WaitTimeoutResult(bool
);
21 impl WaitTimeoutResult
{
22 /// Returns whether the wait was known to have timed out.
24 pub fn timed_out(&self) -> bool
{
29 /// A Condition Variable
31 /// Condition variables represent the ability to block a thread such that it
32 /// consumes no CPU time while waiting for an event to occur. Condition
33 /// variables are typically associated with a boolean predicate (a condition)
34 /// and a mutex. The predicate is always verified inside of the mutex before
35 /// determining that thread must block.
37 /// Note that this module places one additional restriction over the system
38 /// condition variables: each condvar can be used with only one mutex at a
39 /// time. Any attempt to use multiple mutexes on the same condition variable
40 /// simultaneously will result in a runtime panic. However it is possible to
41 /// switch to a different mutex if there are no threads currently waiting on
42 /// the condition variable.
44 /// # Differences from the standard library `Condvar`
46 /// - No spurious wakeups: A wait will only return a non-timeout result if it
47 /// was woken up by `notify_one` or `notify_all`.
48 /// - `Condvar::notify_all` will only wake up a single thread, the rest are
49 /// requeued to wait for the `Mutex` to be unlocked by the thread that was
51 /// - Only requires 1 word of space, whereas the standard library boxes the
52 /// `Condvar` due to platform limitations.
53 /// - Can be statically constructed (requires the `const_fn` nightly feature).
54 /// - Does not require any drop glue when dropped.
55 /// - Inline fast path for the uncontended case.
60 /// use parking_lot::{Mutex, Condvar};
61 /// use std::sync::Arc;
64 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
65 /// let pair2 = pair.clone();
67 /// // Inside of our lock, spawn a new thread, and then wait for it to start
68 /// thread::spawn(move|| {
69 /// let &(ref lock, ref cvar) = &*pair2;
70 /// let mut started = lock.lock();
72 /// cvar.notify_one();
75 /// // wait for the thread to start up
76 /// let &(ref lock, ref cvar) = &*pair;
77 /// let mut started = lock.lock();
79 /// cvar.wait(&mut started);
83 state
: AtomicPtr
<RawMutex
>,
87 /// Creates a new condition variable which is ready to be waited on and
89 #[cfg(feature = "nightly")]
91 pub const fn new() -> Condvar
{
93 state
: AtomicPtr
::new(ptr
::null_mut()),
97 /// Creates a new condition variable which is ready to be waited on and
99 #[cfg(not(feature = "nightly"))]
101 pub fn new() -> Condvar
{
103 state
: AtomicPtr
::new(ptr
::null_mut()),
107 /// Wakes up one blocked thread on this condvar.
109 /// If there is a blocked thread on this condition variable, then it will
110 /// be woken up from its call to `wait` or `wait_timeout`. Calls to
111 /// `notify_one` are not buffered in any way.
113 /// To wake up all threads, see `notify_all()`.
115 pub fn notify_one(&self) {
116 // Nothing to do if there are no waiting threads
117 if self.state
.load(Ordering
::Relaxed
).is_null() {
121 self.notify_one_slow();
126 fn notify_one_slow(&self) {
129 let addr
= self as *const _
as usize;
130 let callback
= |result
: UnparkResult
| {
131 // Clear our state if there are no more waiting threads
132 if !result
.have_more_threads
{
133 self.state
.store(ptr
::null_mut(), Ordering
::Relaxed
);
137 parking_lot_core
::unpark_one(addr
, callback
);
141 /// Wakes up all blocked threads on this condvar.
143 /// This method will ensure that any current waiters on the condition
144 /// variable are awoken. Calls to `notify_all()` are not buffered in any
147 /// To wake up only one thread, see `notify_one()`.
149 pub fn notify_all(&self) {
150 // Nothing to do if there are no waiting threads
151 let state
= self.state
.load(Ordering
::Relaxed
);
156 self.notify_all_slow(state
);
161 fn notify_all_slow(&self, mutex
: *mut RawMutex
) {
163 // Unpark one thread and requeue the rest onto the mutex
164 let from
= self as *const _
as usize;
165 let to
= mutex
as usize;
167 // Make sure that our atomic state still points to the same
168 // mutex. If not then it means that all threads on the current
169 // mutex were woken up and a new waiting thread switched to a
170 // different mutex. In that case we can get away with doing
172 if self.state
.load(Ordering
::Relaxed
) != mutex
{
173 return RequeueOp
::Abort
;
176 // Clear our state since we are going to unpark or requeue all
178 self.state
.store(ptr
::null_mut(), Ordering
::Relaxed
);
180 // Unpark one thread if the mutex is unlocked, otherwise just
181 // requeue everything to the mutex. This is safe to do here
182 // since unlocking the mutex when the parked bit is set requires
183 // locking the queue. There is the possibility of a race if the
184 // mutex gets locked after we check, but that doesn't matter in
186 if (*mutex
).mark_parked_if_locked() {
187 RequeueOp
::RequeueAll
189 RequeueOp
::UnparkOneRequeueRest
192 let callback
= |op
, result
: UnparkResult
| {
193 // If we requeued threads to the mutex, mark it as having
194 // parked threads. The RequeueAll case is already handled above.
195 if op
== RequeueOp
::UnparkOneRequeueRest
&& result
.have_more_threads
{
196 (*mutex
).mark_parked();
200 parking_lot_core
::unpark_requeue(from
, to
, validate
, callback
);
204 /// Blocks the current thread until this condition variable receives a
207 /// This function will atomically unlock the mutex specified (represented by
208 /// `mutex_guard`) and block the current thread. This means that any calls
209 /// to `notify_*()` which happen logically after the mutex is unlocked are
210 /// candidates to wake this thread up. When this function call returns, the
211 /// lock specified will have been re-acquired.
215 /// This function will panic if another thread is waiting on the `Condvar`
216 /// with a different `Mutex` object.
218 pub fn wait
<T
: ?Sized
>(&self, mutex_guard
: &mut MutexGuard
<T
>) {
219 self.wait_until_internal(guard_lock(mutex_guard
), None
);
222 /// Waits on this condition variable for a notification, timing out after
223 /// the specified time instant.
225 /// The semantics of this function are equivalent to `wait()` except that
226 /// the thread will be blocked roughly until `timeout` is reached. This
227 /// method should not be used for precise timing due to anomalies such as
228 /// preemption or platform differences that may not cause the maximum
229 /// amount of time waited to be precisely `timeout`.
231 /// Note that the best effort is made to ensure that the time waited is
232 /// measured with a monotonic clock, and not affected by the changes made to
235 /// The returned `WaitTimeoutResult` value indicates if the timeout is
236 /// known to have elapsed.
238 /// Like `wait`, the lock specified will be re-acquired when this function
239 /// returns, regardless of whether the timeout elapsed or not.
243 /// This function will panic if another thread is waiting on the `Condvar`
244 /// with a different `Mutex` object.
246 pub fn wait_until
<T
: ?Sized
>(
248 mutex_guard
: &mut MutexGuard
<T
>,
250 ) -> WaitTimeoutResult
{
251 self.wait_until_internal(guard_lock(mutex_guard
), Some(timeout
))
254 // This is a non-generic function to reduce the monomorphization cost of
255 // using `wait_until`.
256 fn wait_until_internal(&self, mutex
: &RawMutex
, timeout
: Option
<Instant
>) -> WaitTimeoutResult
{
259 let mut bad_mutex
= false;
260 let mut requeued
= false;
262 let addr
= self as *const _
as usize;
263 let lock_addr
= mutex
as *const _
as *mut _
;
265 // Ensure we don't use two different mutexes with the same
266 // Condvar at the same time. This is done while locked to
267 // avoid races with notify_one
268 let state
= self.state
.load(Ordering
::Relaxed
);
270 self.state
.store(lock_addr
, Ordering
::Relaxed
);
271 } else if state
!= lock_addr
{
277 let before_sleep
= || {
278 // Unlock the mutex before sleeping...
281 let timed_out
= |k
, was_last_thread
| {
282 // If we were requeued to a mutex, then we did not time out.
283 // We'll just park ourselves on the mutex again when we try
285 requeued
= k
!= addr
;
287 // If we were the last thread on the queue then we need to
288 // clear our state. This is normally done by the
289 // notify_{one,all} functions when not timing out.
290 if !requeued
&& was_last_thread
{
291 self.state
.store(ptr
::null_mut(), Ordering
::Relaxed
);
294 result
= parking_lot_core
::park(
304 // Panic if we tried to use multiple mutexes with a Condvar. Note
305 // that at this point the MutexGuard is still locked. It will be
306 // unlocked by the unwinding logic.
308 panic
!("attempted to use a condition variable with more than one mutex");
311 // ... and re-lock it once we are done sleeping
312 if result
== ParkResult
::Unparked(TOKEN_HANDOFF
) {
313 deadlock
::acquire_resource(mutex
as *const _
as usize);
318 WaitTimeoutResult(!(result
.is_unparked() || requeued
))
322 /// Waits on this condition variable for a notification, timing out after a
323 /// specified duration.
325 /// The semantics of this function are equivalent to `wait()` except that
326 /// the thread will be blocked for roughly no longer than `timeout`. This
327 /// method should not be used for precise timing due to anomalies such as
328 /// preemption or platform differences that may not cause the maximum
329 /// amount of time waited to be precisely `timeout`.
331 /// Note that the best effort is made to ensure that the time waited is
332 /// measured with a monotonic clock, and not affected by the changes made to
335 /// The returned `WaitTimeoutResult` value indicates if the timeout is
336 /// known to have elapsed.
338 /// Like `wait`, the lock specified will be re-acquired when this function
339 /// returns, regardless of whether the timeout elapsed or not.
341 pub fn wait_for
<T
: ?Sized
>(
343 guard
: &mut MutexGuard
<T
>,
345 ) -> WaitTimeoutResult
{
346 self.wait_until(guard
, Instant
::now() + timeout
)
350 impl Default
for Condvar
{
352 fn default() -> Condvar
{
357 impl fmt
::Debug
for Condvar
{
358 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
359 f
.pad("Condvar { .. }")
365 use std
::sync
::mpsc
::channel
;
368 use std
::time
::{Duration, Instant}
;
369 use {Condvar, Mutex}
;
373 let c
= Condvar
::new();
380 let m
= Arc
::new(Mutex
::new(()));
382 let c
= Arc
::new(Condvar
::new());
385 let mut g
= m
.lock();
386 let _t
= thread
::spawn(move || {
397 let data
= Arc
::new((Mutex
::new(0), Condvar
::new()));
398 let (tx
, rx
) = channel();
400 let data
= data
.clone();
402 thread
::spawn(move || {
403 let &(ref lock
, ref cond
) = &*data
;
404 let mut cnt
= lock
.lock();
407 tx
.send(()).unwrap();
412 tx
.send(()).unwrap();
417 let &(ref lock
, ref cond
) = &*data
;
419 let mut cnt
= lock
.lock();
431 let m
= Arc
::new(Mutex
::new(()));
433 let c
= Arc
::new(Condvar
::new());
436 let mut g
= m
.lock();
437 let no_timeout
= c
.wait_for(&mut g
, Duration
::from_millis(1));
438 assert
!(no_timeout
.timed_out());
439 let _t
= thread
::spawn(move || {
443 let timeout_res
= c
.wait_for(&mut g
, Duration
::from_millis(u32::max_value() as u64));
444 assert
!(!timeout_res
.timed_out());
450 let m
= Arc
::new(Mutex
::new(()));
452 let c
= Arc
::new(Condvar
::new());
455 let mut g
= m
.lock();
456 let no_timeout
= c
.wait_until(&mut g
, Instant
::now() + Duration
::from_millis(1));
457 assert
!(no_timeout
.timed_out());
458 let _t
= thread
::spawn(move || {
462 let timeout_res
= c
.wait_until(
464 Instant
::now() + Duration
::from_millis(u32::max_value() as u64),
466 assert
!(!timeout_res
.timed_out());
473 let m
= Arc
::new(Mutex
::new(()));
475 let m3
= Arc
::new(Mutex
::new(()));
476 let c
= Arc
::new(Condvar
::new());
479 // Make sure we don't leave the child thread dangling
480 struct PanicGuard
<'a
>(&'a Condvar
);
481 impl<'a
> Drop
for PanicGuard
<'a
> {
487 let (tx
, rx
) = channel();
489 let _t
= thread
::spawn(move || {
490 let mut g
= m2
.lock();
491 tx
.send(()).unwrap();
497 let _guard
= PanicGuard(&*c
);
498 let _
= c
.wait(&mut m3
.lock());
502 fn two_mutexes_disjoint() {
503 let m
= Arc
::new(Mutex
::new(()));
505 let m3
= Arc
::new(Mutex
::new(()));
506 let c
= Arc
::new(Condvar
::new());
509 let mut g
= m
.lock();
510 let _t
= thread
::spawn(move || {
517 let _
= c
.wait_for(&mut m3
.lock(), Duration
::from_millis(1));
521 fn test_debug_condvar() {
522 let c
= Condvar
::new();
523 assert_eq
!(format
!("{:?}", c
), "Condvar { .. }");