]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | // Copyright 2014 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
10 | ||
11 | use sync::{Mutex, Condvar}; | |
12 | ||
bd371182 | 13 | /// A barrier enables multiple threads to synchronize the beginning |
1a4d82fc JJ |
14 | /// of some computation. |
15 | /// | |
c34b1796 | 16 | /// ``` |
1a4d82fc | 17 | /// use std::sync::{Arc, Barrier}; |
85aaf69f | 18 | /// use std::thread; |
1a4d82fc JJ |
19 | /// |
20 | /// let barrier = Arc::new(Barrier::new(10)); | |
85aaf69f | 21 | /// for _ in 0..10 { |
1a4d82fc JJ |
22 | /// let c = barrier.clone(); |
23 | /// // The same messages will be printed together. | |
24 | /// // You will NOT see any interleaving. | |
85aaf69f | 25 | /// thread::spawn(move|| { |
1a4d82fc JJ |
26 | /// println!("before wait"); |
27 | /// c.wait(); | |
28 | /// println!("after wait"); | |
29 | /// }); | |
30 | /// } | |
31 | /// ``` | |
85aaf69f | 32 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
33 | pub struct Barrier { |
34 | lock: Mutex<BarrierState>, | |
35 | cvar: Condvar, | |
c34b1796 | 36 | num_threads: usize, |
1a4d82fc JJ |
37 | } |
38 | ||
39 | // The inner state of a double barrier | |
40 | struct BarrierState { | |
c34b1796 AL |
41 | count: usize, |
42 | generation_id: usize, | |
1a4d82fc JJ |
43 | } |
44 | ||
45 | /// A result returned from wait. | |
46 | /// | |
47 | /// Currently this opaque structure only has one method, `.is_leader()`. Only | |
48 | /// one thread will receive a result that will return `true` from this function. | |
92a42be0 | 49 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
50 | pub struct BarrierWaitResult(bool); |
51 | ||
52 | impl Barrier { | |
9346a6ac | 53 | /// Creates a new barrier that can block a given number of threads. |
1a4d82fc JJ |
54 | /// |
55 | /// A barrier will block `n`-1 threads which call `wait` and then wake up | |
56 | /// all threads at once when the `n`th thread calls `wait`. | |
85aaf69f | 57 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 58 | pub fn new(n: usize) -> Barrier { |
1a4d82fc JJ |
59 | Barrier { |
60 | lock: Mutex::new(BarrierState { | |
61 | count: 0, | |
62 | generation_id: 0, | |
63 | }), | |
64 | cvar: Condvar::new(), | |
65 | num_threads: n, | |
66 | } | |
67 | } | |
68 | ||
9346a6ac | 69 | /// Blocks the current thread until all threads has rendezvoused here. |
1a4d82fc JJ |
70 | /// |
71 | /// Barriers are re-usable after all threads have rendezvoused once, and can | |
72 | /// be used continuously. | |
73 | /// | |
74 | /// A single (arbitrary) thread will receive a `BarrierWaitResult` that | |
75 | /// returns `true` from `is_leader` when returning from this function, and | |
76 | /// all other threads will receive a result that will return `false` from | |
77 | /// `is_leader` | |
85aaf69f | 78 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
79 | pub fn wait(&self) -> BarrierWaitResult { |
80 | let mut lock = self.lock.lock().unwrap(); | |
81 | let local_gen = lock.generation_id; | |
82 | lock.count += 1; | |
83 | if lock.count < self.num_threads { | |
84 | // We need a while loop to guard against spurious wakeups. | |
85 | // http://en.wikipedia.org/wiki/Spurious_wakeup | |
86 | while local_gen == lock.generation_id && | |
87 | lock.count < self.num_threads { | |
88 | lock = self.cvar.wait(lock).unwrap(); | |
89 | } | |
90 | BarrierWaitResult(false) | |
91 | } else { | |
92 | lock.count = 0; | |
93 | lock.generation_id += 1; | |
94 | self.cvar.notify_all(); | |
95 | BarrierWaitResult(true) | |
96 | } | |
97 | } | |
98 | } | |
99 | ||
100 | impl BarrierWaitResult { | |
9346a6ac | 101 | /// Returns whether this thread from `wait` is the "leader thread". |
1a4d82fc JJ |
102 | /// |
103 | /// Only one thread will have `true` returned from their result, all other | |
104 | /// threads will have `false` returned. | |
85aaf69f | 105 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
106 | pub fn is_leader(&self) -> bool { self.0 } |
107 | } | |
108 | ||
109 | #[cfg(test)] | |
110 | mod tests { | |
111 | use prelude::v1::*; | |
112 | ||
113 | use sync::{Arc, Barrier}; | |
114 | use sync::mpsc::{channel, TryRecvError}; | |
85aaf69f | 115 | use thread; |
1a4d82fc JJ |
116 | |
117 | #[test] | |
118 | fn test_barrier() { | |
c34b1796 | 119 | const N: usize = 10; |
1a4d82fc JJ |
120 | |
121 | let barrier = Arc::new(Barrier::new(N)); | |
122 | let (tx, rx) = channel(); | |
123 | ||
85aaf69f | 124 | for _ in 0..N - 1 { |
1a4d82fc JJ |
125 | let c = barrier.clone(); |
126 | let tx = tx.clone(); | |
85aaf69f | 127 | thread::spawn(move|| { |
1a4d82fc JJ |
128 | tx.send(c.wait().is_leader()).unwrap(); |
129 | }); | |
130 | } | |
131 | ||
bd371182 | 132 | // At this point, all spawned threads should be blocked, |
1a4d82fc JJ |
133 | // so we shouldn't get anything from the port |
134 | assert!(match rx.try_recv() { | |
135 | Err(TryRecvError::Empty) => true, | |
136 | _ => false, | |
137 | }); | |
138 | ||
139 | let mut leader_found = barrier.wait().is_leader(); | |
140 | ||
141 | // Now, the barrier is cleared and we should get data. | |
85aaf69f | 142 | for _ in 0..N - 1 { |
1a4d82fc JJ |
143 | if rx.recv().unwrap() { |
144 | assert!(!leader_found); | |
145 | leader_found = true; | |
146 | } | |
147 | } | |
148 | assert!(leader_found); | |
149 | } | |
150 | } |