]>
Commit | Line | Data |
---|---|---|
94b46f34 XL |
1 | //! Code that decides when workers should go to sleep. See README.md |
2 | //! for an overview. | |
3 | ||
6a06907d XL |
4 | use crate::log::Event::*; |
5 | use crate::registry::Registry; | |
6 | use crate::DeadlockHandler; | |
94b46f34 XL |
7 | use std::sync::atomic::{AtomicUsize, Ordering}; |
8 | use std::sync::{Condvar, Mutex}; | |
9 | use std::thread; | |
10 | use std::usize; | |
11 | ||
12 | struct SleepData { | |
13 | /// The number of threads in the thread pool. | |
14 | worker_count: usize, | |
15 | ||
16 | /// The number of threads in the thread pool which are running and | |
17 | /// aren't blocked in user code or sleeping. | |
18 | active_threads: usize, | |
19 | ||
20 | /// The number of threads which are blocked in user code. | |
21 | /// This doesn't include threads blocked by this module. | |
22 | blocked_threads: usize, | |
23 | } | |
24 | ||
25 | impl SleepData { | |
26 | /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler | |
27 | #[inline] | |
28 | pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) { | |
29 | if self.active_threads == 0 && self.blocked_threads > 0 { | |
30 | (deadlock_handler.as_ref().unwrap())(); | |
31 | } | |
32 | } | |
33 | } | |
34 | ||
e74abb32 | 35 | pub(super) struct Sleep { |
94b46f34 XL |
36 | state: AtomicUsize, |
37 | data: Mutex<SleepData>, | |
38 | tickle: Condvar, | |
39 | } | |
40 | ||
41 | const AWAKE: usize = 0; | |
42 | const SLEEPING: usize = 1; | |
43 | ||
44 | const ROUNDS_UNTIL_SLEEPY: usize = 32; | |
45 | const ROUNDS_UNTIL_ASLEEP: usize = 64; | |
46 | ||
47 | impl Sleep { | |
e74abb32 | 48 | pub(super) fn new(worker_count: usize) -> Sleep { |
94b46f34 XL |
49 | Sleep { |
50 | state: AtomicUsize::new(AWAKE), | |
51 | data: Mutex::new(SleepData { | |
52 | worker_count, | |
53 | active_threads: worker_count, | |
54 | blocked_threads: 0, | |
55 | }), | |
56 | tickle: Condvar::new(), | |
57 | } | |
58 | } | |
59 | ||
60 | /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler | |
61 | /// if no other worker thread is active | |
62 | #[inline] | |
63 | pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) { | |
64 | let mut data = self.data.lock().unwrap(); | |
65 | debug_assert!(data.active_threads > 0); | |
66 | debug_assert!(data.blocked_threads < data.worker_count); | |
67 | debug_assert!(data.active_threads > 0); | |
68 | data.active_threads -= 1; | |
69 | data.blocked_threads += 1; | |
70 | ||
71 | data.deadlock_check(deadlock_handler); | |
72 | } | |
73 | ||
74 | /// Mark a previously blocked Rayon worker thread as unblocked | |
75 | #[inline] | |
76 | pub fn mark_unblocked(&self) { | |
77 | let mut data = self.data.lock().unwrap(); | |
78 | debug_assert!(data.active_threads < data.worker_count); | |
79 | debug_assert!(data.blocked_threads > 0); | |
80 | data.active_threads += 1; | |
81 | data.blocked_threads -= 1; | |
82 | } | |
83 | ||
84 | fn anyone_sleeping(&self, state: usize) -> bool { | |
85 | state & SLEEPING != 0 | |
86 | } | |
87 | ||
88 | fn any_worker_is_sleepy(&self, state: usize) -> bool { | |
89 | (state >> 1) != 0 | |
90 | } | |
91 | ||
92 | fn worker_is_sleepy(&self, state: usize, worker_index: usize) -> bool { | |
93 | (state >> 1) == (worker_index + 1) | |
94 | } | |
95 | ||
96 | fn with_sleepy_worker(&self, state: usize, worker_index: usize) -> usize { | |
97 | debug_assert!(state == AWAKE || state == SLEEPING); | |
98 | ((worker_index + 1) << 1) + state | |
99 | } | |
100 | ||
101 | #[inline] | |
e74abb32 | 102 | pub(super) fn work_found(&self, worker_index: usize, yields: usize) -> usize { |
94b46f34 XL |
103 | log!(FoundWork { |
104 | worker: worker_index, | |
105 | yields: yields, | |
106 | }); | |
107 | if yields > ROUNDS_UNTIL_SLEEPY { | |
108 | // FIXME tickling here is a bit extreme; mostly we want to "release the lock" | |
109 | // from us being sleepy, we don't necessarily need to wake others | |
110 | // who are sleeping | |
111 | self.tickle(worker_index); | |
112 | } | |
113 | 0 | |
114 | } | |
115 | ||
116 | #[inline] | |
e74abb32 | 117 | pub(super) fn no_work_found( |
532ac7d7 XL |
118 | &self, |
119 | worker_index: usize, | |
120 | yields: usize, | |
121 | registry: &Registry, | |
122 | ) -> usize { | |
94b46f34 XL |
123 | log!(DidNotFindWork { |
124 | worker: worker_index, | |
125 | yields: yields, | |
126 | }); | |
127 | if yields < ROUNDS_UNTIL_SLEEPY { | |
128 | thread::yield_now(); | |
129 | yields + 1 | |
130 | } else if yields == ROUNDS_UNTIL_SLEEPY { | |
131 | thread::yield_now(); | |
132 | if self.get_sleepy(worker_index) { | |
133 | yields + 1 | |
134 | } else { | |
135 | yields | |
136 | } | |
137 | } else if yields < ROUNDS_UNTIL_ASLEEP { | |
138 | thread::yield_now(); | |
139 | if self.still_sleepy(worker_index) { | |
140 | yields + 1 | |
141 | } else { | |
532ac7d7 XL |
142 | log!(GotInterrupted { |
143 | worker: worker_index | |
144 | }); | |
94b46f34 XL |
145 | 0 |
146 | } | |
147 | } else { | |
148 | debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP); | |
532ac7d7 | 149 | self.sleep(worker_index, registry); |
94b46f34 XL |
150 | 0 |
151 | } | |
152 | } | |
153 | ||
e74abb32 | 154 | pub(super) fn tickle(&self, worker_index: usize) { |
94b46f34 XL |
155 | // As described in README.md, this load must be SeqCst so as to ensure that: |
156 | // - if anyone is sleepy or asleep, we *definitely* see that now (and not eventually); | |
157 | // - if anyone after us becomes sleepy or asleep, they see memory events that | |
158 | // precede the call to `tickle()`, even though we did not do a write. | |
159 | let old_state = self.state.load(Ordering::SeqCst); | |
160 | if old_state != AWAKE { | |
161 | self.tickle_cold(worker_index); | |
162 | } | |
163 | } | |
164 | ||
165 | #[cold] | |
166 | fn tickle_cold(&self, worker_index: usize) { | |
167 | // The `Release` ordering here suffices. The reasoning is that | |
168 | // the atomic's own natural ordering ensure that any attempt | |
169 | // to become sleepy/asleep either will come before/after this | |
170 | // swap. If it comes *after*, then Release is good because we | |
171 | // want it to see the action that generated this tickle. If it | |
172 | // comes *before*, then we will see it here (but not other | |
173 | // memory writes from that thread). If the other worker was | |
174 | // becoming sleepy, the other writes don't matter. If they | |
175 | // were were going to sleep, we will acquire lock and hence | |
176 | // acquire their reads. | |
177 | let old_state = self.state.swap(AWAKE, Ordering::Release); | |
178 | log!(Tickle { | |
179 | worker: worker_index, | |
180 | old_state: old_state, | |
181 | }); | |
182 | if self.anyone_sleeping(old_state) { | |
183 | let mut data = self.data.lock().unwrap(); | |
184 | // Set the active threads to the number of workers, | |
185 | // excluding threads blocked by the user since we won't wake those up | |
186 | data.active_threads = data.worker_count - data.blocked_threads; | |
187 | self.tickle.notify_all(); | |
188 | } | |
189 | } | |
190 | ||
191 | fn get_sleepy(&self, worker_index: usize) -> bool { | |
192 | loop { | |
193 | // Acquire ordering suffices here. If some other worker | |
194 | // was sleepy but no longer is, we will eventually see | |
195 | // that, and until then it doesn't hurt to spin. | |
196 | // Otherwise, we will do a compare-exchange which will | |
197 | // assert a stronger order and acquire any reads etc that | |
198 | // we must see. | |
199 | let state = self.state.load(Ordering::Acquire); | |
200 | log!(GetSleepy { | |
201 | worker: worker_index, | |
202 | state: state, | |
203 | }); | |
204 | if self.any_worker_is_sleepy(state) { | |
205 | // somebody else is already sleepy, so we'll just wait our turn | |
532ac7d7 XL |
206 | debug_assert!( |
207 | !self.worker_is_sleepy(state, worker_index), | |
208 | "worker {} called `is_sleepy()`, \ | |
209 | but they are already sleepy (state={})", | |
210 | worker_index, | |
211 | state | |
212 | ); | |
94b46f34 XL |
213 | return false; |
214 | } else { | |
215 | // make ourselves the sleepy one | |
216 | let new_state = self.with_sleepy_worker(state, worker_index); | |
217 | ||
218 | // This must be SeqCst on success because we want to | |
219 | // ensure: | |
220 | // | |
221 | // - That we observe any writes that preceded | |
222 | // some prior tickle, and that tickle may have only | |
223 | // done a SeqCst load on `self.state`. | |
224 | // - That any subsequent tickle *definitely* sees this store. | |
225 | // | |
226 | // See the section on "Ensuring Sequentially | |
227 | // Consistency" in README.md for more details. | |
228 | // | |
229 | // The failure ordering doesn't matter since we are | |
230 | // about to spin around and do a fresh load. | |
532ac7d7 XL |
231 | if self |
232 | .state | |
94b46f34 | 233 | .compare_exchange(state, new_state, Ordering::SeqCst, Ordering::Relaxed) |
532ac7d7 XL |
234 | .is_ok() |
235 | { | |
94b46f34 XL |
236 | log!(GotSleepy { |
237 | worker: worker_index, | |
238 | old_state: state, | |
239 | new_state: new_state, | |
240 | }); | |
241 | return true; | |
242 | } | |
243 | } | |
244 | } | |
245 | } | |
246 | ||
247 | fn still_sleepy(&self, worker_index: usize) -> bool { | |
248 | let state = self.state.load(Ordering::SeqCst); | |
249 | self.worker_is_sleepy(state, worker_index) | |
250 | } | |
251 | ||
e74abb32 | 252 | fn sleep(&self, worker_index: usize, registry: &Registry) { |
94b46f34 XL |
253 | loop { |
254 | // Acquire here suffices. If we observe that the current worker is still | |
255 | // sleepy, then in fact we know that no writes have occurred, and anyhow | |
256 | // we are going to do a CAS which will synchronize. | |
257 | // | |
258 | // If we observe that the state has changed, it must be | |
259 | // due to a tickle, and then the Acquire means we also see | |
260 | // any events that occured before that. | |
261 | let state = self.state.load(Ordering::Acquire); | |
262 | if self.worker_is_sleepy(state, worker_index) { | |
263 | // It is important that we hold the lock when we do | |
264 | // the CAS. Otherwise, if we were to CAS first, then | |
265 | // the following sequence of events could occur: | |
266 | // | |
267 | // - Thread A (us) sets state to SLEEPING. | |
268 | // - Thread B sets state to AWAKE. | |
269 | // - Thread C sets state to SLEEPY(C). | |
270 | // - Thread C sets state to SLEEPING. | |
271 | // - Thread A reawakens, acquires lock, and goes to sleep. | |
272 | // | |
273 | // Now we missed the wake-up from thread B! But since | |
274 | // we have the lock when we set the state to sleeping, | |
275 | // that cannot happen. Note that the swap `tickle()` | |
276 | // is not part of the lock, though, so let's play that | |
277 | // out: | |
278 | // | |
279 | // # Scenario 1 | |
280 | // | |
281 | // - A loads state and see SLEEPY(A) | |
282 | // - B swaps to AWAKE. | |
283 | // - A locks, fails CAS | |
284 | // | |
285 | // # Scenario 2 | |
286 | // | |
287 | // - A loads state and see SLEEPY(A) | |
288 | // - A locks, performs CAS | |
289 | // - B swaps to AWAKE. | |
290 | // - A waits (releasing lock) | |
291 | // - B locks, notifies | |
292 | // | |
293 | // In general, acquiring the lock inside the loop | |
294 | // seems like it could lead to bad performance, but | |
295 | // actually it should be ok. This is because the only | |
296 | // reason for the `compare_exchange` to fail is if an | |
297 | // awaken comes, in which case the next cycle around | |
298 | // the loop will just return. | |
299 | let mut data = self.data.lock().unwrap(); | |
300 | ||
301 | // This must be SeqCst on success because we want to | |
302 | // ensure: | |
303 | // | |
304 | // - That we observe any writes that preceded | |
305 | // some prior tickle, and that tickle may have only | |
306 | // done a SeqCst load on `self.state`. | |
307 | // - That any subsequent tickle *definitely* sees this store. | |
308 | // | |
309 | // See the section on "Ensuring Sequentially | |
310 | // Consistency" in README.md for more details. | |
311 | // | |
312 | // The failure ordering doesn't matter since we are | |
313 | // about to spin around and do a fresh load. | |
532ac7d7 XL |
314 | if self |
315 | .state | |
94b46f34 | 316 | .compare_exchange(state, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) |
532ac7d7 XL |
317 | .is_ok() |
318 | { | |
94b46f34 XL |
319 | // Don't do this in a loop. If we do it in a loop, we need |
320 | // some way to distinguish the ABA scenario where the pool | |
321 | // was awoken but before we could process it somebody went | |
322 | // to sleep. Note that if we get a false wakeup it's not a | |
323 | // problem for us, we'll just loop around and maybe get | |
324 | // sleepy again. | |
532ac7d7 XL |
325 | log!(FellAsleep { |
326 | worker: worker_index | |
327 | }); | |
94b46f34 XL |
328 | |
329 | // Decrement the number of active threads and check for a deadlock | |
330 | data.active_threads -= 1; | |
532ac7d7 XL |
331 | data.deadlock_check(®istry.deadlock_handler); |
332 | ||
333 | registry.release_thread(); | |
94b46f34 XL |
334 | |
335 | let _ = self.tickle.wait(data).unwrap(); | |
532ac7d7 XL |
336 | log!(GotAwoken { |
337 | worker: worker_index | |
338 | }); | |
339 | registry.acquire_thread(); | |
94b46f34 XL |
340 | return; |
341 | } | |
342 | } else { | |
532ac7d7 XL |
343 | log!(GotInterrupted { |
344 | worker: worker_index | |
345 | }); | |
94b46f34 XL |
346 | return; |
347 | } | |
348 | } | |
349 | } | |
350 | } |