]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | // Copyright 2014 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
10 | ||
11 | use sync::{Mutex, Condvar}; | |
12 | ||
13 | /// A barrier enables multiple tasks to synchronize the beginning | |
14 | /// of some computation. | |
15 | /// | |
c34b1796 | 16 | /// ``` |
1a4d82fc | 17 | /// use std::sync::{Arc, Barrier}; |
85aaf69f | 18 | /// use std::thread; |
1a4d82fc JJ |
19 | /// |
20 | /// let barrier = Arc::new(Barrier::new(10)); | |
85aaf69f | 21 | /// for _ in 0..10 { |
1a4d82fc JJ |
22 | /// let c = barrier.clone(); |
23 | /// // The same messages will be printed together. | |
24 | /// // You will NOT see any interleaving. | |
85aaf69f | 25 | /// thread::spawn(move|| { |
1a4d82fc JJ |
26 | /// println!("before wait"); |
27 | /// c.wait(); | |
28 | /// println!("after wait"); | |
29 | /// }); | |
30 | /// } | |
31 | /// ``` | |
85aaf69f | 32 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
33 | pub struct Barrier { |
34 | lock: Mutex<BarrierState>, | |
35 | cvar: Condvar, | |
c34b1796 | 36 | num_threads: usize, |
1a4d82fc JJ |
37 | } |
38 | ||
39 | // The inner state of a double barrier | |
40 | struct BarrierState { | |
c34b1796 AL |
41 | count: usize, |
42 | generation_id: usize, | |
1a4d82fc JJ |
43 | } |
44 | ||
45 | /// A result returned from wait. | |
46 | /// | |
47 | /// Currently this opaque structure only has one method, `.is_leader()`. Only | |
48 | /// one thread will receive a result that will return `true` from this function. | |
1a4d82fc JJ |
49 | pub struct BarrierWaitResult(bool); |
50 | ||
51 | impl Barrier { | |
9346a6ac | 52 | /// Creates a new barrier that can block a given number of threads. |
1a4d82fc JJ |
53 | /// |
54 | /// A barrier will block `n`-1 threads which call `wait` and then wake up | |
55 | /// all threads at once when the `n`th thread calls `wait`. | |
85aaf69f | 56 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 57 | pub fn new(n: usize) -> Barrier { |
1a4d82fc JJ |
58 | Barrier { |
59 | lock: Mutex::new(BarrierState { | |
60 | count: 0, | |
61 | generation_id: 0, | |
62 | }), | |
63 | cvar: Condvar::new(), | |
64 | num_threads: n, | |
65 | } | |
66 | } | |
67 | ||
9346a6ac | 68 | /// Blocks the current thread until all threads has rendezvoused here. |
1a4d82fc JJ |
69 | /// |
70 | /// Barriers are re-usable after all threads have rendezvoused once, and can | |
71 | /// be used continuously. | |
72 | /// | |
73 | /// A single (arbitrary) thread will receive a `BarrierWaitResult` that | |
74 | /// returns `true` from `is_leader` when returning from this function, and | |
75 | /// all other threads will receive a result that will return `false` from | |
76 | /// `is_leader` | |
85aaf69f | 77 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
78 | pub fn wait(&self) -> BarrierWaitResult { |
79 | let mut lock = self.lock.lock().unwrap(); | |
80 | let local_gen = lock.generation_id; | |
81 | lock.count += 1; | |
82 | if lock.count < self.num_threads { | |
83 | // We need a while loop to guard against spurious wakeups. | |
84 | // http://en.wikipedia.org/wiki/Spurious_wakeup | |
85 | while local_gen == lock.generation_id && | |
86 | lock.count < self.num_threads { | |
87 | lock = self.cvar.wait(lock).unwrap(); | |
88 | } | |
89 | BarrierWaitResult(false) | |
90 | } else { | |
91 | lock.count = 0; | |
92 | lock.generation_id += 1; | |
93 | self.cvar.notify_all(); | |
94 | BarrierWaitResult(true) | |
95 | } | |
96 | } | |
97 | } | |
98 | ||
99 | impl BarrierWaitResult { | |
9346a6ac | 100 | /// Returns whether this thread from `wait` is the "leader thread". |
1a4d82fc JJ |
101 | /// |
102 | /// Only one thread will have `true` returned from their result, all other | |
103 | /// threads will have `false` returned. | |
85aaf69f | 104 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
105 | pub fn is_leader(&self) -> bool { self.0 } |
106 | } | |
107 | ||
108 | #[cfg(test)] | |
109 | mod tests { | |
110 | use prelude::v1::*; | |
111 | ||
112 | use sync::{Arc, Barrier}; | |
113 | use sync::mpsc::{channel, TryRecvError}; | |
85aaf69f | 114 | use thread; |
1a4d82fc JJ |
115 | |
116 | #[test] | |
117 | fn test_barrier() { | |
c34b1796 | 118 | const N: usize = 10; |
1a4d82fc JJ |
119 | |
120 | let barrier = Arc::new(Barrier::new(N)); | |
121 | let (tx, rx) = channel(); | |
122 | ||
85aaf69f | 123 | for _ in 0..N - 1 { |
1a4d82fc JJ |
124 | let c = barrier.clone(); |
125 | let tx = tx.clone(); | |
85aaf69f | 126 | thread::spawn(move|| { |
1a4d82fc JJ |
127 | tx.send(c.wait().is_leader()).unwrap(); |
128 | }); | |
129 | } | |
130 | ||
131 | // At this point, all spawned tasks should be blocked, | |
132 | // so we shouldn't get anything from the port | |
133 | assert!(match rx.try_recv() { | |
134 | Err(TryRecvError::Empty) => true, | |
135 | _ => false, | |
136 | }); | |
137 | ||
138 | let mut leader_found = barrier.wait().is_leader(); | |
139 | ||
140 | // Now, the barrier is cleared and we should get data. | |
85aaf69f | 141 | for _ in 0..N - 1 { |
1a4d82fc JJ |
142 | if rx.recv().unwrap() { |
143 | assert!(!leader_found); | |
144 | leader_found = true; | |
145 | } | |
146 | } | |
147 | assert!(leader_found); | |
148 | } | |
149 | } |