use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
-use crate::join;
-use crate::thread_pool::ThreadPool;
-
#[allow(deprecated)]
use crate::Configuration;
-use crate::ThreadPoolBuilder;
+use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
#[test]
#[should_panic(expected = "Hello, world!")]
// do some work on these threads
join_a_lot(22);
- thread_pool.registry.clone()
+ Arc::clone(&thread_pool.registry)
});
assert_eq!(registry.num_threads(), 22);
}
{
// once we exit this block, thread-pool will be dropped
let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
- registry = thread_pool.registry.clone();
+ registry = Arc::clone(&thread_pool.registry);
// Give time for at least some of the thread pool to fall asleep.
thread::sleep(time::Duration::from_secs(1));
registry.wait_until_stopped();
}
-/// Create a start/exit handler that increments an atomic counter.
+/// Creates a start/exit handler that increments an atomic counter.
fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
let count = Arc::new(AtomicUsize::new(0));
- (count.clone(), move |_| {
+ (Arc::clone(&count), move |_| {
count.fetch_add(1, Ordering::SeqCst);
})
}
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
assert_eq!(vec, expected);
}
+
+#[test]
+fn nested_scopes() {
+ // Create matching scopes for every thread pool.
+ fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
+ where
+ OP: FnOnce(&[&Scope<'scope>]) + Send,
+ {
+ if let Some((pool, tail)) = pools.split_first() {
+ pool.scope(move |s| {
+ // This move reduces the reference lifetimes by variance to match s,
+ // but the actual scopes are still tied to the invariant 'scope.
+ let mut scopes = scopes;
+ scopes.push(s);
+ nest(tail, scopes, op)
+ })
+ } else {
+ (op)(&scopes)
+ }
+ }
+
+ let pools: Vec<_> = (0..10)
+ .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
+ .collect();
+
+ let counter = AtomicUsize::new(0);
+ nest(&pools, vec![], |scopes| {
+ for &s in scopes {
+ s.spawn(|_| {
+ // Our 'scope lets us borrow the counter in every pool.
+ counter.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ assert_eq!(counter.into_inner(), pools.len());
+}
+
+#[test]
+fn nested_fifo_scopes() {
+ // Create matching fifo scopes for every thread pool.
+ fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
+ where
+ OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
+ {
+ if let Some((pool, tail)) = pools.split_first() {
+ pool.scope_fifo(move |s| {
+ // This move reduces the reference lifetimes by variance to match s,
+ // but the actual scopes are still tied to the invariant 'scope.
+ let mut scopes = scopes;
+ scopes.push(s);
+ nest(tail, scopes, op)
+ })
+ } else {
+ (op)(&scopes)
+ }
+ }
+
+ let pools: Vec<_> = (0..10)
+ .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
+ .collect();
+
+ let counter = AtomicUsize::new(0);
+ nest(&pools, vec![], |scopes| {
+ for &s in scopes {
+ s.spawn_fifo(|_| {
+ // Our 'scope lets us borrow the counter in every pool.
+ counter.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ assert_eq!(counter.into_inner(), pools.len());
+}
+
+#[test]
+fn in_place_scope_no_deadlock() {
+ let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let (tx, rx) = channel();
+ let rx_ref = ℞
+ pool.in_place_scope(move |s| {
+ // With regular scopes this closure would never run because this scope op
+ // itself would block the only worker thread.
+ s.spawn(move |_| {
+ tx.send(()).unwrap();
+ });
+ rx_ref.recv().unwrap();
+ });
+}
+
+#[test]
+fn in_place_scope_fifo_no_deadlock() {
+ let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let (tx, rx) = channel();
+ let rx_ref = ℞
+ pool.in_place_scope_fifo(move |s| {
+ // With regular scopes this closure would never run because this scope op
+ // itself would block the only worker thread.
+ s.spawn_fifo(move |_| {
+ tx.send(()).unwrap();
+ });
+ rx_ref.recv().unwrap();
+ });
+}