]> git.proxmox.com Git - rustc.git/blame - vendor/parking_lot/src/condvar.rs
New upstream version 1.51.0+dfsg1
[rustc.git] / vendor / parking_lot / src / condvar.rs
CommitLineData
ba9703b0
XL
1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::mutex::MutexGuard;
9use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::{deadlock, util};
11use core::{
12 fmt, ptr,
13 sync::atomic::{AtomicPtr, Ordering},
14};
f035d41b 15use instant::Instant;
ba9703b0
XL
16use lock_api::RawMutex as RawMutex_;
17use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
f035d41b 18use std::time::Duration;
ba9703b0
XL
19
20/// A type indicating whether a timed wait on a condition variable returned
21/// due to a time out or not.
22#[derive(Debug, PartialEq, Eq, Copy, Clone)]
23pub struct WaitTimeoutResult(bool);
24
25impl WaitTimeoutResult {
26 /// Returns whether the wait was known to have timed out.
27 #[inline]
28 pub fn timed_out(self) -> bool {
29 self.0
30 }
31}
32
33/// A Condition Variable
34///
35/// Condition variables represent the ability to block a thread such that it
36/// consumes no CPU time while waiting for an event to occur. Condition
37/// variables are typically associated with a boolean predicate (a condition)
38/// and a mutex. The predicate is always verified inside of the mutex before
39/// determining that thread must block.
40///
41/// Note that this module places one additional restriction over the system
42/// condition variables: each condvar can be used with only one mutex at a
43/// time. Any attempt to use multiple mutexes on the same condition variable
44/// simultaneously will result in a runtime panic. However it is possible to
45/// switch to a different mutex if there are no threads currently waiting on
46/// the condition variable.
47///
48/// # Differences from the standard library `Condvar`
49///
50/// - No spurious wakeups: A wait will only return a non-timeout result if it
51/// was woken up by `notify_one` or `notify_all`.
52/// - `Condvar::notify_all` will only wake up a single thread, the rest are
53/// requeued to wait for the `Mutex` to be unlocked by the thread that was
54/// woken up.
55/// - Only requires 1 word of space, whereas the standard library boxes the
56/// `Condvar` due to platform limitations.
57/// - Can be statically constructed (requires the `const_fn` nightly feature).
58/// - Does not require any drop glue when dropped.
59/// - Inline fast path for the uncontended case.
60///
61/// # Examples
62///
63/// ```
64/// use parking_lot::{Mutex, Condvar};
65/// use std::sync::Arc;
66/// use std::thread;
67///
68/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
69/// let pair2 = pair.clone();
70///
71/// // Inside of our lock, spawn a new thread, and then wait for it to start
72/// thread::spawn(move|| {
73/// let &(ref lock, ref cvar) = &*pair2;
74/// let mut started = lock.lock();
75/// *started = true;
76/// cvar.notify_one();
77/// });
78///
79/// // wait for the thread to start up
80/// let &(ref lock, ref cvar) = &*pair;
81/// let mut started = lock.lock();
82/// if !*started {
83/// cvar.wait(&mut started);
84/// }
85/// // Note that we used an if instead of a while loop above. This is only
86/// // possible because parking_lot's Condvar will never spuriously wake up.
87/// // This means that wait() will only return after notify_one or notify_all is
88/// // called.
89/// ```
90pub struct Condvar {
91 state: AtomicPtr<RawMutex>,
92}
93
94impl Condvar {
95 /// Creates a new condition variable which is ready to be waited on and
96 /// notified.
97 #[inline]
98 pub const fn new() -> Condvar {
99 Condvar {
100 state: AtomicPtr::new(ptr::null_mut()),
101 }
102 }
103
104 /// Wakes up one blocked thread on this condvar.
105 ///
106 /// Returns whether a thread was woken up.
107 ///
108 /// If there is a blocked thread on this condition variable, then it will
109 /// be woken up from its call to `wait` or `wait_timeout`. Calls to
110 /// `notify_one` are not buffered in any way.
111 ///
112 /// To wake up all threads, see `notify_all()`.
113 ///
114 /// # Examples
115 ///
116 /// ```
117 /// use parking_lot::Condvar;
118 ///
119 /// let condvar = Condvar::new();
120 ///
121 /// // do something with condvar, share it with other threads
122 ///
123 /// if !condvar.notify_one() {
124 /// println!("Nobody was listening for this.");
125 /// }
126 /// ```
127 #[inline]
128 pub fn notify_one(&self) -> bool {
129 // Nothing to do if there are no waiting threads
130 let state = self.state.load(Ordering::Relaxed);
131 if state.is_null() {
132 return false;
133 }
134
135 self.notify_one_slow(state)
136 }
137
138 #[cold]
139 fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
140 unsafe {
141 // Unpark one thread and requeue the rest onto the mutex
142 let from = self as *const _ as usize;
143 let to = mutex as usize;
144 let validate = || {
145 // Make sure that our atomic state still points to the same
146 // mutex. If not then it means that all threads on the current
147 // mutex were woken up and a new waiting thread switched to a
148 // different mutex. In that case we can get away with doing
149 // nothing.
150 if self.state.load(Ordering::Relaxed) != mutex {
151 return RequeueOp::Abort;
152 }
153
154 // Unpark one thread if the mutex is unlocked, otherwise just
155 // requeue everything to the mutex. This is safe to do here
156 // since unlocking the mutex when the parked bit is set requires
157 // locking the queue. There is the possibility of a race if the
158 // mutex gets locked after we check, but that doesn't matter in
159 // this case.
160 if (*mutex).mark_parked_if_locked() {
161 RequeueOp::RequeueOne
162 } else {
163 RequeueOp::UnparkOne
164 }
165 };
166 let callback = |_op, result: UnparkResult| {
167 // Clear our state if there are no more waiting threads
168 if !result.have_more_threads {
169 self.state.store(ptr::null_mut(), Ordering::Relaxed);
170 }
171 TOKEN_NORMAL
172 };
173 let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
174
175 res.unparked_threads + res.requeued_threads != 0
176 }
177 }
178
179 /// Wakes up all blocked threads on this condvar.
180 ///
181 /// Returns the number of threads woken up.
182 ///
183 /// This method will ensure that any current waiters on the condition
184 /// variable are awoken. Calls to `notify_all()` are not buffered in any
185 /// way.
186 ///
187 /// To wake up only one thread, see `notify_one()`.
188 #[inline]
189 pub fn notify_all(&self) -> usize {
190 // Nothing to do if there are no waiting threads
191 let state = self.state.load(Ordering::Relaxed);
192 if state.is_null() {
193 return 0;
194 }
195
196 self.notify_all_slow(state)
197 }
198
199 #[cold]
200 fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
201 unsafe {
202 // Unpark one thread and requeue the rest onto the mutex
203 let from = self as *const _ as usize;
204 let to = mutex as usize;
205 let validate = || {
206 // Make sure that our atomic state still points to the same
207 // mutex. If not then it means that all threads on the current
208 // mutex were woken up and a new waiting thread switched to a
209 // different mutex. In that case we can get away with doing
210 // nothing.
211 if self.state.load(Ordering::Relaxed) != mutex {
212 return RequeueOp::Abort;
213 }
214
215 // Clear our state since we are going to unpark or requeue all
216 // threads.
217 self.state.store(ptr::null_mut(), Ordering::Relaxed);
218
219 // Unpark one thread if the mutex is unlocked, otherwise just
220 // requeue everything to the mutex. This is safe to do here
221 // since unlocking the mutex when the parked bit is set requires
222 // locking the queue. There is the possibility of a race if the
223 // mutex gets locked after we check, but that doesn't matter in
224 // this case.
225 if (*mutex).mark_parked_if_locked() {
226 RequeueOp::RequeueAll
227 } else {
228 RequeueOp::UnparkOneRequeueRest
229 }
230 };
231 let callback = |op, result: UnparkResult| {
232 // If we requeued threads to the mutex, mark it as having
233 // parked threads. The RequeueAll case is already handled above.
234 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
235 (*mutex).mark_parked();
236 }
237 TOKEN_NORMAL
238 };
239 let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
240
241 res.unparked_threads + res.requeued_threads
242 }
243 }
244
245 /// Blocks the current thread until this condition variable receives a
246 /// notification.
247 ///
248 /// This function will atomically unlock the mutex specified (represented by
249 /// `mutex_guard`) and block the current thread. This means that any calls
250 /// to `notify_*()` which happen logically after the mutex is unlocked are
251 /// candidates to wake this thread up. When this function call returns, the
252 /// lock specified will have been re-acquired.
253 ///
254 /// # Panics
255 ///
256 /// This function will panic if another thread is waiting on the `Condvar`
257 /// with a different `Mutex` object.
258 #[inline]
259 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
260 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
261 }
262
263 /// Waits on this condition variable for a notification, timing out after
264 /// the specified time instant.
265 ///
266 /// The semantics of this function are equivalent to `wait()` except that
267 /// the thread will be blocked roughly until `timeout` is reached. This
268 /// method should not be used for precise timing due to anomalies such as
269 /// preemption or platform differences that may not cause the maximum
270 /// amount of time waited to be precisely `timeout`.
271 ///
272 /// Note that the best effort is made to ensure that the time waited is
273 /// measured with a monotonic clock, and not affected by the changes made to
274 /// the system time.
275 ///
276 /// The returned `WaitTimeoutResult` value indicates if the timeout is
277 /// known to have elapsed.
278 ///
279 /// Like `wait`, the lock specified will be re-acquired when this function
280 /// returns, regardless of whether the timeout elapsed or not.
281 ///
282 /// # Panics
283 ///
284 /// This function will panic if another thread is waiting on the `Condvar`
285 /// with a different `Mutex` object.
286 #[inline]
287 pub fn wait_until<T: ?Sized>(
288 &self,
289 mutex_guard: &mut MutexGuard<'_, T>,
290 timeout: Instant,
291 ) -> WaitTimeoutResult {
292 self.wait_until_internal(
293 unsafe { MutexGuard::mutex(mutex_guard).raw() },
294 Some(timeout),
295 )
296 }
297
298 // This is a non-generic function to reduce the monomorphization cost of
299 // using `wait_until`.
300 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
301 unsafe {
302 let result;
303 let mut bad_mutex = false;
304 let mut requeued = false;
305 {
306 let addr = self as *const _ as usize;
307 let lock_addr = mutex as *const _ as *mut _;
308 let validate = || {
309 // Ensure we don't use two different mutexes with the same
310 // Condvar at the same time. This is done while locked to
311 // avoid races with notify_one
312 let state = self.state.load(Ordering::Relaxed);
313 if state.is_null() {
314 self.state.store(lock_addr, Ordering::Relaxed);
315 } else if state != lock_addr {
316 bad_mutex = true;
317 return false;
318 }
319 true
320 };
321 let before_sleep = || {
322 // Unlock the mutex before sleeping...
323 mutex.unlock();
324 };
325 let timed_out = |k, was_last_thread| {
326 // If we were requeued to a mutex, then we did not time out.
327 // We'll just park ourselves on the mutex again when we try
328 // to lock it later.
329 requeued = k != addr;
330
331 // If we were the last thread on the queue then we need to
332 // clear our state. This is normally done by the
333 // notify_{one,all} functions when not timing out.
334 if !requeued && was_last_thread {
335 self.state.store(ptr::null_mut(), Ordering::Relaxed);
336 }
337 };
338 result = parking_lot_core::park(
339 addr,
340 validate,
341 before_sleep,
342 timed_out,
343 DEFAULT_PARK_TOKEN,
344 timeout,
345 );
346 }
347
348 // Panic if we tried to use multiple mutexes with a Condvar. Note
349 // that at this point the MutexGuard is still locked. It will be
350 // unlocked by the unwinding logic.
351 if bad_mutex {
352 panic!("attempted to use a condition variable with more than one mutex");
353 }
354
355 // ... and re-lock it once we are done sleeping
356 if result == ParkResult::Unparked(TOKEN_HANDOFF) {
357 deadlock::acquire_resource(mutex as *const _ as usize);
358 } else {
359 mutex.lock();
360 }
361
362 WaitTimeoutResult(!(result.is_unparked() || requeued))
363 }
364 }
365
366 /// Waits on this condition variable for a notification, timing out after a
367 /// specified duration.
368 ///
369 /// The semantics of this function are equivalent to `wait()` except that
370 /// the thread will be blocked for roughly no longer than `timeout`. This
371 /// method should not be used for precise timing due to anomalies such as
372 /// preemption or platform differences that may not cause the maximum
373 /// amount of time waited to be precisely `timeout`.
374 ///
375 /// Note that the best effort is made to ensure that the time waited is
376 /// measured with a monotonic clock, and not affected by the changes made to
377 /// the system time.
378 ///
379 /// The returned `WaitTimeoutResult` value indicates if the timeout is
380 /// known to have elapsed.
381 ///
382 /// Like `wait`, the lock specified will be re-acquired when this function
383 /// returns, regardless of whether the timeout elapsed or not.
384 ///
385 /// # Panics
386 ///
387 /// Panics if the given `timeout` is so large that it can't be added to the current time.
388 /// This panic is not possible if the crate is built with the `nightly` feature, then a too
389 /// large `timeout` becomes equivalent to just calling `wait`.
390 #[inline]
391 pub fn wait_for<T: ?Sized>(
392 &self,
393 mutex_guard: &mut MutexGuard<'_, T>,
394 timeout: Duration,
395 ) -> WaitTimeoutResult {
396 let deadline = util::to_deadline(timeout);
397 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
398 }
399}
400
401impl Default for Condvar {
402 #[inline]
403 fn default() -> Condvar {
404 Condvar::new()
405 }
406}
407
408impl fmt::Debug for Condvar {
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 f.pad("Condvar { .. }")
411 }
412}
413
414#[cfg(test)]
415mod tests {
416 use crate::{Condvar, Mutex, MutexGuard};
f035d41b 417 use instant::Instant;
ba9703b0
XL
418 use std::sync::mpsc::channel;
419 use std::sync::Arc;
420 use std::thread;
f035d41b 421 use std::time::Duration;
ba9703b0
XL
422
423 #[test]
424 fn smoke() {
425 let c = Condvar::new();
426 c.notify_one();
427 c.notify_all();
428 }
429
430 #[test]
431 fn notify_one() {
432 let m = Arc::new(Mutex::new(()));
433 let m2 = m.clone();
434 let c = Arc::new(Condvar::new());
435 let c2 = c.clone();
436
437 let mut g = m.lock();
438 let _t = thread::spawn(move || {
439 let _g = m2.lock();
440 c2.notify_one();
441 });
442 c.wait(&mut g);
443 }
444
445 #[test]
446 fn notify_all() {
447 const N: usize = 10;
448
449 let data = Arc::new((Mutex::new(0), Condvar::new()));
450 let (tx, rx) = channel();
451 for _ in 0..N {
452 let data = data.clone();
453 let tx = tx.clone();
454 thread::spawn(move || {
455 let &(ref lock, ref cond) = &*data;
456 let mut cnt = lock.lock();
457 *cnt += 1;
458 if *cnt == N {
459 tx.send(()).unwrap();
460 }
461 while *cnt != 0 {
462 cond.wait(&mut cnt);
463 }
464 tx.send(()).unwrap();
465 });
466 }
467 drop(tx);
468
469 let &(ref lock, ref cond) = &*data;
470 rx.recv().unwrap();
471 let mut cnt = lock.lock();
472 *cnt = 0;
473 cond.notify_all();
474 drop(cnt);
475
476 for _ in 0..N {
477 rx.recv().unwrap();
478 }
479 }
480
481 #[test]
482 fn notify_one_return_true() {
483 let m = Arc::new(Mutex::new(()));
484 let m2 = m.clone();
485 let c = Arc::new(Condvar::new());
486 let c2 = c.clone();
487
488 let mut g = m.lock();
489 let _t = thread::spawn(move || {
490 let _g = m2.lock();
491 assert!(c2.notify_one());
492 });
493 c.wait(&mut g);
494 }
495
496 #[test]
497 fn notify_one_return_false() {
498 let m = Arc::new(Mutex::new(()));
499 let c = Arc::new(Condvar::new());
500
501 let _t = thread::spawn(move || {
502 let _g = m.lock();
503 assert!(!c.notify_one());
504 });
505 }
506
507 #[test]
508 fn notify_all_return() {
509 const N: usize = 10;
510
511 let data = Arc::new((Mutex::new(0), Condvar::new()));
512 let (tx, rx) = channel();
513 for _ in 0..N {
514 let data = data.clone();
515 let tx = tx.clone();
516 thread::spawn(move || {
517 let &(ref lock, ref cond) = &*data;
518 let mut cnt = lock.lock();
519 *cnt += 1;
520 if *cnt == N {
521 tx.send(()).unwrap();
522 }
523 while *cnt != 0 {
524 cond.wait(&mut cnt);
525 }
526 tx.send(()).unwrap();
527 });
528 }
529 drop(tx);
530
531 let &(ref lock, ref cond) = &*data;
532 rx.recv().unwrap();
533 let mut cnt = lock.lock();
534 *cnt = 0;
535 assert_eq!(cond.notify_all(), N);
536 drop(cnt);
537
538 for _ in 0..N {
539 rx.recv().unwrap();
540 }
541
542 assert_eq!(cond.notify_all(), 0);
543 }
544
545 #[test]
546 fn wait_for() {
547 let m = Arc::new(Mutex::new(()));
548 let m2 = m.clone();
549 let c = Arc::new(Condvar::new());
550 let c2 = c.clone();
551
552 let mut g = m.lock();
553 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
554 assert!(no_timeout.timed_out());
555
556 let _t = thread::spawn(move || {
557 let _g = m2.lock();
558 c2.notify_one();
559 });
560 // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait.
561 let very_long_timeout = if cfg!(feature = "nightly") {
562 Duration::from_secs(u64::max_value())
563 } else {
564 Duration::from_millis(u32::max_value() as u64)
565 };
566
567 let timeout_res = c.wait_for(&mut g, very_long_timeout);
568 assert!(!timeout_res.timed_out());
569
570 drop(g);
571 }
572
573 #[test]
574 fn wait_until() {
575 let m = Arc::new(Mutex::new(()));
576 let m2 = m.clone();
577 let c = Arc::new(Condvar::new());
578 let c2 = c.clone();
579
580 let mut g = m.lock();
581 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
582 assert!(no_timeout.timed_out());
583 let _t = thread::spawn(move || {
584 let _g = m2.lock();
585 c2.notify_one();
586 });
587 let timeout_res = c.wait_until(
588 &mut g,
589 Instant::now() + Duration::from_millis(u32::max_value() as u64),
590 );
591 assert!(!timeout_res.timed_out());
592 drop(g);
593 }
594
595 #[test]
596 #[should_panic]
597 fn two_mutexes() {
598 let m = Arc::new(Mutex::new(()));
599 let m2 = m.clone();
600 let m3 = Arc::new(Mutex::new(()));
601 let c = Arc::new(Condvar::new());
602 let c2 = c.clone();
603
604 // Make sure we don't leave the child thread dangling
605 struct PanicGuard<'a>(&'a Condvar);
606 impl<'a> Drop for PanicGuard<'a> {
607 fn drop(&mut self) {
608 self.0.notify_one();
609 }
610 }
611
612 let (tx, rx) = channel();
613 let g = m.lock();
614 let _t = thread::spawn(move || {
615 let mut g = m2.lock();
616 tx.send(()).unwrap();
617 c2.wait(&mut g);
618 });
619 drop(g);
620 rx.recv().unwrap();
621 let _g = m.lock();
622 let _guard = PanicGuard(&*c);
623 c.wait(&mut m3.lock());
624 }
625
626 #[test]
627 fn two_mutexes_disjoint() {
628 let m = Arc::new(Mutex::new(()));
629 let m2 = m.clone();
630 let m3 = Arc::new(Mutex::new(()));
631 let c = Arc::new(Condvar::new());
632 let c2 = c.clone();
633
634 let mut g = m.lock();
635 let _t = thread::spawn(move || {
636 let _g = m2.lock();
637 c2.notify_one();
638 });
639 c.wait(&mut g);
640 drop(g);
641
642 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
643 }
644
645 #[test]
646 fn test_debug_condvar() {
647 let c = Condvar::new();
648 assert_eq!(format!("{:?}", c), "Condvar { .. }");
649 }
650
651 #[test]
652 fn test_condvar_requeue() {
653 let m = Arc::new(Mutex::new(()));
654 let m2 = m.clone();
655 let c = Arc::new(Condvar::new());
656 let c2 = c.clone();
657 let t = thread::spawn(move || {
658 let mut g = m2.lock();
659 c2.wait(&mut g);
660 });
661
662 let mut g = m.lock();
663 while !c.notify_one() {
664 // Wait for the thread to get into wait()
665 MutexGuard::bump(&mut g);
5869c6ff
XL
666 // Yield, so the other thread gets a chance to do something.
667 // (At least Miri needs this, because it doesn't preempt threads.)
668 thread::yield_now();
ba9703b0
XL
669 }
670 // The thread should have been requeued to the mutex, which we wake up now.
671 drop(g);
672 t.join().unwrap();
673 }
674
675 #[test]
676 fn test_issue_129() {
677 let locks = Arc::new((Mutex::new(()), Condvar::new()));
678
679 let (tx, rx) = channel();
680 for _ in 0..4 {
681 let locks = locks.clone();
682 let tx = tx.clone();
683 thread::spawn(move || {
684 let mut guard = locks.0.lock();
685 locks.1.wait(&mut guard);
686 locks.1.wait_for(&mut guard, Duration::from_millis(1));
687 locks.1.notify_one();
688 tx.send(()).unwrap();
689 });
690 }
691
692 thread::sleep(Duration::from_millis(100));
693 locks.1.notify_one();
694
695 for _ in 0..4 {
696 assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
697 }
698 }
699}
700
701/// This module contains an integration test that is heavily inspired from WebKit's own integration
702/// tests for it's own Condvar.
703#[cfg(test)]
704mod webkit_queue_test {
705 use crate::{Condvar, Mutex, MutexGuard};
706 use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
707
708 #[derive(Clone, Copy)]
709 enum Timeout {
710 Bounded(Duration),
711 Forever,
712 }
713
714 #[derive(Clone, Copy)]
715 enum NotifyStyle {
716 One,
717 All,
718 }
719
720 struct Queue {
721 items: VecDeque<usize>,
722 should_continue: bool,
723 }
724
725 impl Queue {
726 fn new() -> Self {
727 Self {
728 items: VecDeque::new(),
729 should_continue: true,
730 }
731 }
732 }
733
734 fn wait<T: ?Sized>(
735 condition: &Condvar,
736 lock: &mut MutexGuard<'_, T>,
737 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
738 timeout: &Timeout,
739 ) {
740 while !predicate(lock) {
741 match timeout {
742 Timeout::Forever => condition.wait(lock),
743 Timeout::Bounded(bound) => {
744 condition.wait_for(lock, *bound);
745 }
746 }
747 }
748 }
749
750 fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
751 match style {
752 NotifyStyle::One => {
753 condition.notify_one();
754 }
755 NotifyStyle::All => {
756 if should_notify {
757 condition.notify_all();
758 }
759 }
760 }
761 }
762
763 fn run_queue_test(
764 num_producers: usize,
765 num_consumers: usize,
766 max_queue_size: usize,
767 messages_per_producer: usize,
768 notify_style: NotifyStyle,
769 timeout: Timeout,
770 delay: Duration,
771 ) {
772 let input_queue = Arc::new(Mutex::new(Queue::new()));
773 let empty_condition = Arc::new(Condvar::new());
774 let full_condition = Arc::new(Condvar::new());
775
776 let output_vec = Arc::new(Mutex::new(vec![]));
777
778 let consumers = (0..num_consumers)
779 .map(|_| {
780 consumer_thread(
781 input_queue.clone(),
782 empty_condition.clone(),
783 full_condition.clone(),
784 timeout,
785 notify_style,
786 output_vec.clone(),
787 max_queue_size,
788 )
789 })
790 .collect::<Vec<_>>();
791 let producers = (0..num_producers)
792 .map(|_| {
793 producer_thread(
794 messages_per_producer,
795 input_queue.clone(),
796 empty_condition.clone(),
797 full_condition.clone(),
798 timeout,
799 notify_style,
800 max_queue_size,
801 )
802 })
803 .collect::<Vec<_>>();
804
805 thread::sleep(delay);
806
807 for producer in producers.into_iter() {
808 producer.join().expect("Producer thread panicked");
809 }
810
811 {
812 let mut input_queue = input_queue.lock();
813 input_queue.should_continue = false;
814 }
815 empty_condition.notify_all();
816
817 for consumer in consumers.into_iter() {
818 consumer.join().expect("Consumer thread panicked");
819 }
820
821 let mut output_vec = output_vec.lock();
822 assert_eq!(output_vec.len(), num_producers * messages_per_producer);
823 output_vec.sort();
824 for msg_idx in 0..messages_per_producer {
825 for producer_idx in 0..num_producers {
826 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
827 }
828 }
829 }
830
831 fn consumer_thread(
832 input_queue: Arc<Mutex<Queue>>,
833 empty_condition: Arc<Condvar>,
834 full_condition: Arc<Condvar>,
835 timeout: Timeout,
836 notify_style: NotifyStyle,
837 output_queue: Arc<Mutex<Vec<usize>>>,
838 max_queue_size: usize,
839 ) -> thread::JoinHandle<()> {
840 thread::spawn(move || loop {
841 let (should_notify, result) = {
842 let mut queue = input_queue.lock();
843 wait(
844 &*empty_condition,
845 &mut queue,
846 |state| -> bool { !state.items.is_empty() || !state.should_continue },
847 &timeout,
848 );
849 if queue.items.is_empty() && !queue.should_continue {
850 return;
851 }
852 let should_notify = queue.items.len() == max_queue_size;
853 let result = queue.items.pop_front();
854 std::mem::drop(queue);
855 (should_notify, result)
856 };
857 notify(notify_style, &*full_condition, should_notify);
858
859 if let Some(result) = result {
860 output_queue.lock().push(result);
861 }
862 })
863 }
864
865 fn producer_thread(
866 num_messages: usize,
867 queue: Arc<Mutex<Queue>>,
868 empty_condition: Arc<Condvar>,
869 full_condition: Arc<Condvar>,
870 timeout: Timeout,
871 notify_style: NotifyStyle,
872 max_queue_size: usize,
873 ) -> thread::JoinHandle<()> {
874 thread::spawn(move || {
875 for message in 0..num_messages {
876 let should_notify = {
877 let mut queue = queue.lock();
878 wait(
879 &*full_condition,
880 &mut queue,
881 |state| state.items.len() < max_queue_size,
882 &timeout,
883 );
884 let should_notify = queue.items.is_empty();
885 queue.items.push_back(message);
886 std::mem::drop(queue);
887 should_notify
888 };
889 notify(notify_style, &*empty_condition, should_notify);
890 }
891 })
892 }
893
894 macro_rules! run_queue_tests {
895 ( $( $name:ident(
896 num_producers: $num_producers:expr,
897 num_consumers: $num_consumers:expr,
898 max_queue_size: $max_queue_size:expr,
899 messages_per_producer: $messages_per_producer:expr,
900 notification_style: $notification_style:expr,
901 timeout: $timeout:expr,
902 delay_seconds: $delay_seconds:expr);
903 )* ) => {
904 $(#[test]
905 fn $name() {
906 let delay = Duration::from_secs($delay_seconds);
907 run_queue_test(
908 $num_producers,
909 $num_consumers,
910 $max_queue_size,
911 $messages_per_producer,
912 $notification_style,
913 $timeout,
914 delay,
915 );
916 })*
917 };
918 }
919
920 run_queue_tests! {
921 sanity_check_queue(
922 num_producers: 1,
923 num_consumers: 1,
924 max_queue_size: 1,
925 messages_per_producer: 100_000,
926 notification_style: NotifyStyle::All,
927 timeout: Timeout::Bounded(Duration::from_secs(1)),
928 delay_seconds: 0
929 );
930 sanity_check_queue_timeout(
931 num_producers: 1,
932 num_consumers: 1,
933 max_queue_size: 1,
934 messages_per_producer: 100_000,
935 notification_style: NotifyStyle::All,
936 timeout: Timeout::Forever,
937 delay_seconds: 0
938 );
939 new_test_without_timeout_5(
940 num_producers: 1,
941 num_consumers: 5,
942 max_queue_size: 1,
943 messages_per_producer: 100_000,
944 notification_style: NotifyStyle::All,
945 timeout: Timeout::Forever,
946 delay_seconds: 0
947 );
948 one_producer_one_consumer_one_slot(
949 num_producers: 1,
950 num_consumers: 1,
951 max_queue_size: 1,
952 messages_per_producer: 100_000,
953 notification_style: NotifyStyle::All,
954 timeout: Timeout::Forever,
955 delay_seconds: 0
956 );
957 one_producer_one_consumer_one_slot_timeout(
958 num_producers: 1,
959 num_consumers: 1,
960 max_queue_size: 1,
961 messages_per_producer: 100_000,
962 notification_style: NotifyStyle::All,
963 timeout: Timeout::Forever,
964 delay_seconds: 1
965 );
966 one_producer_one_consumer_hundred_slots(
967 num_producers: 1,
968 num_consumers: 1,
969 max_queue_size: 100,
970 messages_per_producer: 1_000_000,
971 notification_style: NotifyStyle::All,
972 timeout: Timeout::Forever,
973 delay_seconds: 0
974 );
975 ten_producers_one_consumer_one_slot(
976 num_producers: 10,
977 num_consumers: 1,
978 max_queue_size: 1,
979 messages_per_producer: 10000,
980 notification_style: NotifyStyle::All,
981 timeout: Timeout::Forever,
982 delay_seconds: 0
983 );
984 ten_producers_one_consumer_hundred_slots_notify_all(
985 num_producers: 10,
986 num_consumers: 1,
987 max_queue_size: 100,
988 messages_per_producer: 10000,
989 notification_style: NotifyStyle::All,
990 timeout: Timeout::Forever,
991 delay_seconds: 0
992 );
993 ten_producers_one_consumer_hundred_slots_notify_one(
994 num_producers: 10,
995 num_consumers: 1,
996 max_queue_size: 100,
997 messages_per_producer: 10000,
998 notification_style: NotifyStyle::One,
999 timeout: Timeout::Forever,
1000 delay_seconds: 0
1001 );
1002 one_producer_ten_consumers_one_slot(
1003 num_producers: 1,
1004 num_consumers: 10,
1005 max_queue_size: 1,
1006 messages_per_producer: 10000,
1007 notification_style: NotifyStyle::All,
1008 timeout: Timeout::Forever,
1009 delay_seconds: 0
1010 );
1011 one_producer_ten_consumers_hundred_slots_notify_all(
1012 num_producers: 1,
1013 num_consumers: 10,
1014 max_queue_size: 100,
1015 messages_per_producer: 100_000,
1016 notification_style: NotifyStyle::All,
1017 timeout: Timeout::Forever,
1018 delay_seconds: 0
1019 );
1020 one_producer_ten_consumers_hundred_slots_notify_one(
1021 num_producers: 1,
1022 num_consumers: 10,
1023 max_queue_size: 100,
1024 messages_per_producer: 100_000,
1025 notification_style: NotifyStyle::One,
1026 timeout: Timeout::Forever,
1027 delay_seconds: 0
1028 );
1029 ten_producers_ten_consumers_one_slot(
1030 num_producers: 10,
1031 num_consumers: 10,
1032 max_queue_size: 1,
1033 messages_per_producer: 50000,
1034 notification_style: NotifyStyle::All,
1035 timeout: Timeout::Forever,
1036 delay_seconds: 0
1037 );
1038 ten_producers_ten_consumers_hundred_slots_notify_all(
1039 num_producers: 10,
1040 num_consumers: 10,
1041 max_queue_size: 100,
1042 messages_per_producer: 50000,
1043 notification_style: NotifyStyle::All,
1044 timeout: Timeout::Forever,
1045 delay_seconds: 0
1046 );
1047 ten_producers_ten_consumers_hundred_slots_notify_one(
1048 num_producers: 10,
1049 num_consumers: 10,
1050 max_queue_size: 100,
1051 messages_per_producer: 50000,
1052 notification_style: NotifyStyle::One,
1053 timeout: Timeout::Forever,
1054 delay_seconds: 0
1055 );
1056 }
1057}