]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | // Copyright 2014 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
10 | ||
11 | use sync::{Mutex, Condvar}; | |
12 | ||
bd371182 | 13 | /// A barrier enables multiple threads to synchronize the beginning |
1a4d82fc JJ |
14 | /// of some computation. |
15 | /// | |
c34b1796 | 16 | /// ``` |
1a4d82fc | 17 | /// use std::sync::{Arc, Barrier}; |
85aaf69f | 18 | /// use std::thread; |
1a4d82fc | 19 | /// |
54a0048b | 20 | /// let mut handles = Vec::with_capacity(10); |
1a4d82fc | 21 | /// let barrier = Arc::new(Barrier::new(10)); |
85aaf69f | 22 | /// for _ in 0..10 { |
1a4d82fc JJ |
23 | /// let c = barrier.clone(); |
24 | /// // The same messages will be printed together. | |
25 | /// // You will NOT see any interleaving. | |
54a0048b | 26 | /// handles.push(thread::spawn(move|| { |
1a4d82fc JJ |
27 | /// println!("before wait"); |
28 | /// c.wait(); | |
29 | /// println!("after wait"); | |
54a0048b SL |
30 | /// })); |
31 | /// } | |
32 | /// // Wait for other threads to finish. | |
33 | /// for handle in handles { | |
34 | /// handle.join().unwrap(); | |
1a4d82fc JJ |
35 | /// } |
36 | /// ``` | |
85aaf69f | 37 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
38 | pub struct Barrier { |
39 | lock: Mutex<BarrierState>, | |
40 | cvar: Condvar, | |
c34b1796 | 41 | num_threads: usize, |
1a4d82fc JJ |
42 | } |
43 | ||
44 | // The inner state of a double barrier | |
45 | struct BarrierState { | |
c34b1796 AL |
46 | count: usize, |
47 | generation_id: usize, | |
1a4d82fc JJ |
48 | } |
49 | ||
50 | /// A result returned from wait. | |
51 | /// | |
52 | /// Currently this opaque structure only has one method, `.is_leader()`. Only | |
53 | /// one thread will receive a result that will return `true` from this function. | |
92a42be0 | 54 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
55 | pub struct BarrierWaitResult(bool); |
56 | ||
57 | impl Barrier { | |
9346a6ac | 58 | /// Creates a new barrier that can block a given number of threads. |
1a4d82fc JJ |
59 | /// |
60 | /// A barrier will block `n`-1 threads which call `wait` and then wake up | |
61 | /// all threads at once when the `n`th thread calls `wait`. | |
85aaf69f | 62 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 63 | pub fn new(n: usize) -> Barrier { |
1a4d82fc JJ |
64 | Barrier { |
65 | lock: Mutex::new(BarrierState { | |
66 | count: 0, | |
67 | generation_id: 0, | |
68 | }), | |
69 | cvar: Condvar::new(), | |
70 | num_threads: n, | |
71 | } | |
72 | } | |
73 | ||
9346a6ac | 74 | /// Blocks the current thread until all threads has rendezvoused here. |
1a4d82fc JJ |
75 | /// |
76 | /// Barriers are re-usable after all threads have rendezvoused once, and can | |
77 | /// be used continuously. | |
78 | /// | |
79 | /// A single (arbitrary) thread will receive a `BarrierWaitResult` that | |
80 | /// returns `true` from `is_leader` when returning from this function, and | |
81 | /// all other threads will receive a result that will return `false` from | |
82 | /// `is_leader` | |
85aaf69f | 83 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
84 | pub fn wait(&self) -> BarrierWaitResult { |
85 | let mut lock = self.lock.lock().unwrap(); | |
86 | let local_gen = lock.generation_id; | |
87 | lock.count += 1; | |
88 | if lock.count < self.num_threads { | |
89 | // We need a while loop to guard against spurious wakeups. | |
90 | // http://en.wikipedia.org/wiki/Spurious_wakeup | |
91 | while local_gen == lock.generation_id && | |
92 | lock.count < self.num_threads { | |
93 | lock = self.cvar.wait(lock).unwrap(); | |
94 | } | |
95 | BarrierWaitResult(false) | |
96 | } else { | |
97 | lock.count = 0; | |
98 | lock.generation_id += 1; | |
99 | self.cvar.notify_all(); | |
100 | BarrierWaitResult(true) | |
101 | } | |
102 | } | |
103 | } | |
104 | ||
105 | impl BarrierWaitResult { | |
9346a6ac | 106 | /// Returns whether this thread from `wait` is the "leader thread". |
1a4d82fc JJ |
107 | /// |
108 | /// Only one thread will have `true` returned from their result, all other | |
109 | /// threads will have `false` returned. | |
85aaf69f | 110 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
111 | pub fn is_leader(&self) -> bool { self.0 } |
112 | } | |
113 | ||
114 | #[cfg(test)] | |
115 | mod tests { | |
116 | use prelude::v1::*; | |
117 | ||
118 | use sync::{Arc, Barrier}; | |
119 | use sync::mpsc::{channel, TryRecvError}; | |
85aaf69f | 120 | use thread; |
1a4d82fc JJ |
121 | |
122 | #[test] | |
123 | fn test_barrier() { | |
c34b1796 | 124 | const N: usize = 10; |
1a4d82fc JJ |
125 | |
126 | let barrier = Arc::new(Barrier::new(N)); | |
127 | let (tx, rx) = channel(); | |
128 | ||
85aaf69f | 129 | for _ in 0..N - 1 { |
1a4d82fc JJ |
130 | let c = barrier.clone(); |
131 | let tx = tx.clone(); | |
85aaf69f | 132 | thread::spawn(move|| { |
1a4d82fc JJ |
133 | tx.send(c.wait().is_leader()).unwrap(); |
134 | }); | |
135 | } | |
136 | ||
bd371182 | 137 | // At this point, all spawned threads should be blocked, |
1a4d82fc JJ |
138 | // so we shouldn't get anything from the port |
139 | assert!(match rx.try_recv() { | |
140 | Err(TryRecvError::Empty) => true, | |
141 | _ => false, | |
142 | }); | |
143 | ||
144 | let mut leader_found = barrier.wait().is_leader(); | |
145 | ||
146 | // Now, the barrier is cleared and we should get data. | |
85aaf69f | 147 | for _ in 0..N - 1 { |
1a4d82fc JJ |
148 | if rx.recv().unwrap() { |
149 | assert!(!leader_found); | |
150 | leader_found = true; | |
151 | } | |
152 | } | |
153 | assert!(leader_found); | |
154 | } | |
155 | } |