]> git.proxmox.com Git - rustc.git/blame - vendor/rustc-rayon-core/src/sleep/mod.rs
Merge tag 'debian/1.52.1+dfsg1-1_exp2' into proxmox/buster
[rustc.git] / vendor / rustc-rayon-core / src / sleep / mod.rs
CommitLineData
94b46f34
XL
1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
6a06907d
XL
4use crate::log::Event::*;
5use crate::registry::Registry;
6use crate::DeadlockHandler;
94b46f34
XL
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::{Condvar, Mutex};
9use std::thread;
10use std::usize;
11
12struct SleepData {
13 /// The number of threads in the thread pool.
14 worker_count: usize,
15
16 /// The number of threads in the thread pool which are running and
17 /// aren't blocked in user code or sleeping.
18 active_threads: usize,
19
20 /// The number of threads which are blocked in user code.
21 /// This doesn't include threads blocked by this module.
22 blocked_threads: usize,
23}
24
25impl SleepData {
26 /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
27 #[inline]
28 pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
29 if self.active_threads == 0 && self.blocked_threads > 0 {
30 (deadlock_handler.as_ref().unwrap())();
31 }
32 }
33}
34
e74abb32 35pub(super) struct Sleep {
94b46f34
XL
36 state: AtomicUsize,
37 data: Mutex<SleepData>,
38 tickle: Condvar,
39}
40
41const AWAKE: usize = 0;
42const SLEEPING: usize = 1;
43
44const ROUNDS_UNTIL_SLEEPY: usize = 32;
45const ROUNDS_UNTIL_ASLEEP: usize = 64;
46
47impl Sleep {
e74abb32 48 pub(super) fn new(worker_count: usize) -> Sleep {
94b46f34
XL
49 Sleep {
50 state: AtomicUsize::new(AWAKE),
51 data: Mutex::new(SleepData {
52 worker_count,
53 active_threads: worker_count,
54 blocked_threads: 0,
55 }),
56 tickle: Condvar::new(),
57 }
58 }
59
60 /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
61 /// if no other worker thread is active
62 #[inline]
63 pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
64 let mut data = self.data.lock().unwrap();
65 debug_assert!(data.active_threads > 0);
66 debug_assert!(data.blocked_threads < data.worker_count);
67 debug_assert!(data.active_threads > 0);
68 data.active_threads -= 1;
69 data.blocked_threads += 1;
70
71 data.deadlock_check(deadlock_handler);
72 }
73
74 /// Mark a previously blocked Rayon worker thread as unblocked
75 #[inline]
76 pub fn mark_unblocked(&self) {
77 let mut data = self.data.lock().unwrap();
78 debug_assert!(data.active_threads < data.worker_count);
79 debug_assert!(data.blocked_threads > 0);
80 data.active_threads += 1;
81 data.blocked_threads -= 1;
82 }
83
84 fn anyone_sleeping(&self, state: usize) -> bool {
85 state & SLEEPING != 0
86 }
87
88 fn any_worker_is_sleepy(&self, state: usize) -> bool {
89 (state >> 1) != 0
90 }
91
92 fn worker_is_sleepy(&self, state: usize, worker_index: usize) -> bool {
93 (state >> 1) == (worker_index + 1)
94 }
95
96 fn with_sleepy_worker(&self, state: usize, worker_index: usize) -> usize {
97 debug_assert!(state == AWAKE || state == SLEEPING);
98 ((worker_index + 1) << 1) + state
99 }
100
101 #[inline]
e74abb32 102 pub(super) fn work_found(&self, worker_index: usize, yields: usize) -> usize {
94b46f34
XL
103 log!(FoundWork {
104 worker: worker_index,
105 yields: yields,
106 });
107 if yields > ROUNDS_UNTIL_SLEEPY {
108 // FIXME tickling here is a bit extreme; mostly we want to "release the lock"
109 // from us being sleepy, we don't necessarily need to wake others
110 // who are sleeping
111 self.tickle(worker_index);
112 }
113 0
114 }
115
116 #[inline]
e74abb32 117 pub(super) fn no_work_found(
532ac7d7
XL
118 &self,
119 worker_index: usize,
120 yields: usize,
121 registry: &Registry,
122 ) -> usize {
94b46f34
XL
123 log!(DidNotFindWork {
124 worker: worker_index,
125 yields: yields,
126 });
127 if yields < ROUNDS_UNTIL_SLEEPY {
128 thread::yield_now();
129 yields + 1
130 } else if yields == ROUNDS_UNTIL_SLEEPY {
131 thread::yield_now();
132 if self.get_sleepy(worker_index) {
133 yields + 1
134 } else {
135 yields
136 }
137 } else if yields < ROUNDS_UNTIL_ASLEEP {
138 thread::yield_now();
139 if self.still_sleepy(worker_index) {
140 yields + 1
141 } else {
532ac7d7
XL
142 log!(GotInterrupted {
143 worker: worker_index
144 });
94b46f34
XL
145 0
146 }
147 } else {
148 debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
532ac7d7 149 self.sleep(worker_index, registry);
94b46f34
XL
150 0
151 }
152 }
153
e74abb32 154 pub(super) fn tickle(&self, worker_index: usize) {
94b46f34
XL
155 // As described in README.md, this load must be SeqCst so as to ensure that:
156 // - if anyone is sleepy or asleep, we *definitely* see that now (and not eventually);
157 // - if anyone after us becomes sleepy or asleep, they see memory events that
158 // precede the call to `tickle()`, even though we did not do a write.
159 let old_state = self.state.load(Ordering::SeqCst);
160 if old_state != AWAKE {
161 self.tickle_cold(worker_index);
162 }
163 }
164
165 #[cold]
166 fn tickle_cold(&self, worker_index: usize) {
167 // The `Release` ordering here suffices. The reasoning is that
168 // the atomic's own natural ordering ensure that any attempt
169 // to become sleepy/asleep either will come before/after this
170 // swap. If it comes *after*, then Release is good because we
171 // want it to see the action that generated this tickle. If it
172 // comes *before*, then we will see it here (but not other
173 // memory writes from that thread). If the other worker was
174 // becoming sleepy, the other writes don't matter. If they
175 // were were going to sleep, we will acquire lock and hence
176 // acquire their reads.
177 let old_state = self.state.swap(AWAKE, Ordering::Release);
178 log!(Tickle {
179 worker: worker_index,
180 old_state: old_state,
181 });
182 if self.anyone_sleeping(old_state) {
183 let mut data = self.data.lock().unwrap();
184 // Set the active threads to the number of workers,
185 // excluding threads blocked by the user since we won't wake those up
186 data.active_threads = data.worker_count - data.blocked_threads;
187 self.tickle.notify_all();
188 }
189 }
190
191 fn get_sleepy(&self, worker_index: usize) -> bool {
192 loop {
193 // Acquire ordering suffices here. If some other worker
194 // was sleepy but no longer is, we will eventually see
195 // that, and until then it doesn't hurt to spin.
196 // Otherwise, we will do a compare-exchange which will
197 // assert a stronger order and acquire any reads etc that
198 // we must see.
199 let state = self.state.load(Ordering::Acquire);
200 log!(GetSleepy {
201 worker: worker_index,
202 state: state,
203 });
204 if self.any_worker_is_sleepy(state) {
205 // somebody else is already sleepy, so we'll just wait our turn
532ac7d7
XL
206 debug_assert!(
207 !self.worker_is_sleepy(state, worker_index),
208 "worker {} called `is_sleepy()`, \
209 but they are already sleepy (state={})",
210 worker_index,
211 state
212 );
94b46f34
XL
213 return false;
214 } else {
215 // make ourselves the sleepy one
216 let new_state = self.with_sleepy_worker(state, worker_index);
217
218 // This must be SeqCst on success because we want to
219 // ensure:
220 //
221 // - That we observe any writes that preceded
222 // some prior tickle, and that tickle may have only
223 // done a SeqCst load on `self.state`.
224 // - That any subsequent tickle *definitely* sees this store.
225 //
226 // See the section on "Ensuring Sequentially
227 // Consistency" in README.md for more details.
228 //
229 // The failure ordering doesn't matter since we are
230 // about to spin around and do a fresh load.
532ac7d7
XL
231 if self
232 .state
94b46f34 233 .compare_exchange(state, new_state, Ordering::SeqCst, Ordering::Relaxed)
532ac7d7
XL
234 .is_ok()
235 {
94b46f34
XL
236 log!(GotSleepy {
237 worker: worker_index,
238 old_state: state,
239 new_state: new_state,
240 });
241 return true;
242 }
243 }
244 }
245 }
246
247 fn still_sleepy(&self, worker_index: usize) -> bool {
248 let state = self.state.load(Ordering::SeqCst);
249 self.worker_is_sleepy(state, worker_index)
250 }
251
e74abb32 252 fn sleep(&self, worker_index: usize, registry: &Registry) {
94b46f34
XL
253 loop {
254 // Acquire here suffices. If we observe that the current worker is still
255 // sleepy, then in fact we know that no writes have occurred, and anyhow
256 // we are going to do a CAS which will synchronize.
257 //
258 // If we observe that the state has changed, it must be
259 // due to a tickle, and then the Acquire means we also see
260 // any events that occured before that.
261 let state = self.state.load(Ordering::Acquire);
262 if self.worker_is_sleepy(state, worker_index) {
263 // It is important that we hold the lock when we do
264 // the CAS. Otherwise, if we were to CAS first, then
265 // the following sequence of events could occur:
266 //
267 // - Thread A (us) sets state to SLEEPING.
268 // - Thread B sets state to AWAKE.
269 // - Thread C sets state to SLEEPY(C).
270 // - Thread C sets state to SLEEPING.
271 // - Thread A reawakens, acquires lock, and goes to sleep.
272 //
273 // Now we missed the wake-up from thread B! But since
274 // we have the lock when we set the state to sleeping,
275 // that cannot happen. Note that the swap `tickle()`
276 // is not part of the lock, though, so let's play that
277 // out:
278 //
279 // # Scenario 1
280 //
281 // - A loads state and see SLEEPY(A)
282 // - B swaps to AWAKE.
283 // - A locks, fails CAS
284 //
285 // # Scenario 2
286 //
287 // - A loads state and see SLEEPY(A)
288 // - A locks, performs CAS
289 // - B swaps to AWAKE.
290 // - A waits (releasing lock)
291 // - B locks, notifies
292 //
293 // In general, acquiring the lock inside the loop
294 // seems like it could lead to bad performance, but
295 // actually it should be ok. This is because the only
296 // reason for the `compare_exchange` to fail is if an
297 // awaken comes, in which case the next cycle around
298 // the loop will just return.
299 let mut data = self.data.lock().unwrap();
300
301 // This must be SeqCst on success because we want to
302 // ensure:
303 //
304 // - That we observe any writes that preceded
305 // some prior tickle, and that tickle may have only
306 // done a SeqCst load on `self.state`.
307 // - That any subsequent tickle *definitely* sees this store.
308 //
309 // See the section on "Ensuring Sequentially
310 // Consistency" in README.md for more details.
311 //
312 // The failure ordering doesn't matter since we are
313 // about to spin around and do a fresh load.
532ac7d7
XL
314 if self
315 .state
94b46f34 316 .compare_exchange(state, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
532ac7d7
XL
317 .is_ok()
318 {
94b46f34
XL
319 // Don't do this in a loop. If we do it in a loop, we need
320 // some way to distinguish the ABA scenario where the pool
321 // was awoken but before we could process it somebody went
322 // to sleep. Note that if we get a false wakeup it's not a
323 // problem for us, we'll just loop around and maybe get
324 // sleepy again.
532ac7d7
XL
325 log!(FellAsleep {
326 worker: worker_index
327 });
94b46f34
XL
328
329 // Decrement the number of active threads and check for a deadlock
330 data.active_threads -= 1;
532ac7d7
XL
331 data.deadlock_check(&registry.deadlock_handler);
332
333 registry.release_thread();
94b46f34
XL
334
335 let _ = self.tickle.wait(data).unwrap();
532ac7d7
XL
336 log!(GotAwoken {
337 worker: worker_index
338 });
339 registry.acquire_thread();
94b46f34
XL
340 return;
341 }
342 } else {
532ac7d7
XL
343 log!(GotInterrupted {
344 worker: worker_index
345 });
94b46f34
XL
346 return;
347 }
348 }
349 }
350}