]> git.proxmox.com Git - rustc.git/blob - vendor/rustc-rayon-core/src/sleep/mod.rs
New upstream version 1.70.0+dfsg1
[rustc.git] / vendor / rustc-rayon-core / src / sleep / mod.rs
1 //! Code that decides when workers should go to sleep. See README.md
2 //! for an overview.
3
4 use crate::latch::CoreLatch;
5 use crate::log::Event::*;
6 use crate::log::Logger;
7 use crate::registry::WorkerThread;
8 use crate::DeadlockHandler;
9 use crossbeam_utils::CachePadded;
10 use std::sync::atomic::Ordering;
11 use std::sync::{Condvar, Mutex};
12 use std::thread;
13 use std::usize;
14
15 mod counters;
16 pub(crate) use self::counters::THREADS_MAX;
17 use self::counters::{AtomicCounters, JobsEventCounter};
18
19 struct SleepData {
20 /// The number of threads in the thread pool.
21 worker_count: usize,
22
23 /// The number of threads in the thread pool which are running and
24 /// aren't blocked in user code or sleeping.
25 active_threads: usize,
26
27 /// The number of threads which are blocked in user code.
28 /// This doesn't include threads blocked by this module.
29 blocked_threads: usize,
30 }
31
32 impl SleepData {
33 /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
34 #[inline]
35 pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
36 if self.active_threads == 0 && self.blocked_threads > 0 {
37 (deadlock_handler.as_ref().unwrap())();
38 }
39 }
40 }
41
42 /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
43 /// of workers. It has callbacks that are invoked periodically at significant events,
44 /// such as when workers are looping and looking for work, when latches are set, or when
45 /// jobs are published, and it either blocks threads or wakes them in response to these
46 /// events. See the [`README.md`] in this module for more details.
47 ///
48 /// [`README.md`] README.md
49 pub(super) struct Sleep {
50 logger: Logger,
51
52 /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
53 /// them block.
54 worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
55
56 counters: AtomicCounters,
57
58 data: Mutex<SleepData>,
59 }
60
61 /// An instance of this struct is created when a thread becomes idle.
62 /// It is consumed when the thread finds work, and passed by `&mut`
63 /// reference for operations that preserve the idle state. (In other
64 /// words, producing one of these structs is evidence the thread is
65 /// idle.) It tracks state such as how long the thread has been idle.
66 pub(super) struct IdleState {
67 /// What is worker index of the idle thread?
68 worker_index: usize,
69
70 /// How many rounds have we been circling without sleeping?
71 rounds: u32,
72
73 /// Once we become sleepy, what was the sleepy counter value?
74 /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
75 jobs_counter: JobsEventCounter,
76 }
77
78 /// The "sleep state" for an individual worker.
79 #[derive(Default)]
80 struct WorkerSleepState {
81 /// Set to true when the worker goes to sleep; set to false when
82 /// the worker is notified or when it wakes.
83 is_blocked: Mutex<bool>,
84
85 condvar: Condvar,
86 }
87
88 const ROUNDS_UNTIL_SLEEPY: u32 = 32;
89 const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
90
91 impl Sleep {
92 pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep {
93 assert!(n_threads <= THREADS_MAX);
94 Sleep {
95 logger,
96 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
97 counters: AtomicCounters::new(),
98 data: Mutex::new(SleepData {
99 worker_count: n_threads,
100 active_threads: n_threads,
101 blocked_threads: 0,
102 }),
103 }
104 }
105
106 /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
107 /// if no other worker thread is active
108 #[inline]
109 pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
110 let mut data = self.data.lock().unwrap();
111 debug_assert!(data.active_threads > 0);
112 debug_assert!(data.blocked_threads < data.worker_count);
113 debug_assert!(data.active_threads > 0);
114 data.active_threads -= 1;
115 data.blocked_threads += 1;
116
117 data.deadlock_check(deadlock_handler);
118 }
119
120 /// Mark a previously blocked Rayon worker thread as unblocked
121 #[inline]
122 pub fn mark_unblocked(&self) {
123 let mut data = self.data.lock().unwrap();
124 debug_assert!(data.active_threads < data.worker_count);
125 debug_assert!(data.blocked_threads > 0);
126 data.active_threads += 1;
127 data.blocked_threads -= 1;
128 }
129
130 #[inline]
131 pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState {
132 self.logger.log(|| ThreadIdle {
133 worker: worker_index,
134 latch_addr: latch.addr(),
135 });
136
137 self.counters.add_inactive_thread();
138
139 IdleState {
140 worker_index,
141 rounds: 0,
142 jobs_counter: JobsEventCounter::DUMMY,
143 }
144 }
145
146 #[inline]
147 pub(super) fn work_found(&self, idle_state: IdleState) {
148 self.logger.log(|| ThreadFoundWork {
149 worker: idle_state.worker_index,
150 yields: idle_state.rounds,
151 });
152
153 // If we were the last idle thread and other threads are still sleeping,
154 // then we should wake up another thread.
155 let threads_to_wake = self.counters.sub_inactive_thread();
156 self.wake_any_threads(threads_to_wake as u32);
157 }
158
159 #[inline]
160 pub(super) fn no_work_found(
161 &self,
162 idle_state: &mut IdleState,
163 latch: &CoreLatch,
164 thread: &WorkerThread,
165 ) {
166 if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
167 thread::yield_now();
168 idle_state.rounds += 1;
169 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
170 idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index);
171 idle_state.rounds += 1;
172 thread::yield_now();
173 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
174 idle_state.rounds += 1;
175 thread::yield_now();
176 } else {
177 debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
178 self.sleep(idle_state, latch, thread);
179 }
180 }
181
182 #[cold]
183 fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter {
184 let counters = self
185 .counters
186 .increment_jobs_event_counter_if(JobsEventCounter::is_active);
187 let jobs_counter = counters.jobs_counter();
188 self.logger.log(|| ThreadSleepy {
189 worker: worker_index,
190 jobs_counter: jobs_counter.as_usize(),
191 });
192 jobs_counter
193 }
194
195 #[cold]
196 fn sleep(&self, idle_state: &mut IdleState, latch: &CoreLatch, thread: &WorkerThread) {
197 let worker_index = idle_state.worker_index;
198
199 if !latch.get_sleepy() {
200 self.logger.log(|| ThreadSleepInterruptedByLatch {
201 worker: worker_index,
202 latch_addr: latch.addr(),
203 });
204
205 return;
206 }
207
208 let sleep_state = &self.worker_sleep_states[worker_index];
209 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
210 debug_assert!(!*is_blocked);
211
212 // Our latch was signalled. We should wake back up fully as we
213 // will have some stuff to do.
214 if !latch.fall_asleep() {
215 self.logger.log(|| ThreadSleepInterruptedByLatch {
216 worker: worker_index,
217 latch_addr: latch.addr(),
218 });
219
220 idle_state.wake_fully();
221 return;
222 }
223
224 loop {
225 let counters = self.counters.load(Ordering::SeqCst);
226
227 // Check if the JEC has changed since we got sleepy.
228 debug_assert!(idle_state.jobs_counter.is_sleepy());
229 if counters.jobs_counter() != idle_state.jobs_counter {
230 // JEC has changed, so a new job was posted, but for some reason
231 // we didn't see it. We should return to just before the SLEEPY
232 // state so we can do another search and (if we fail to find
233 // work) go back to sleep.
234 self.logger.log(|| ThreadSleepInterruptedByJob {
235 worker: worker_index,
236 });
237
238 idle_state.wake_partly();
239 latch.wake_up();
240 return;
241 }
242
243 // Otherwise, let's move from IDLE to SLEEPING.
244 if self.counters.try_add_sleeping_thread(counters) {
245 break;
246 }
247 }
248
249 // Successfully registered as asleep.
250
251 self.logger.log(|| ThreadSleeping {
252 worker: worker_index,
253 latch_addr: latch.addr(),
254 });
255
256 // We have one last check for injected jobs to do. This protects against
257 // deadlock in the very unlikely event that
258 //
259 // - an external job is being injected while we are sleepy
260 // - that job triggers the rollover over the JEC such that we don't see it
261 // - we are the last active worker thread
262 std::sync::atomic::fence(Ordering::SeqCst);
263 if thread.has_injected_job() {
264 // If we see an externally injected job, then we have to 'wake
265 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
266 // the one that wakes us.)
267 self.counters.sub_sleeping_thread();
268 } else {
269 {
270 // Decrement the number of active threads and check for a deadlock
271 let mut data = self.data.lock().unwrap();
272 data.active_threads -= 1;
273 data.deadlock_check(&thread.registry.deadlock_handler);
274 }
275
276 // If we don't see an injected job (the normal case), then flag
277 // ourselves as asleep and wait till we are notified.
278 //
279 // (Note that `is_blocked` is held under a mutex and the mutex was
280 // acquired *before* we incremented the "sleepy counter". This means
281 // that whomever is coming to wake us will have to wait until we
282 // release the mutex in the call to `wait`, so they will see this
283 // boolean as true.)
284 thread.registry.release_thread();
285 *is_blocked = true;
286 while *is_blocked {
287 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
288 }
289
290 // Drop `is_blocked` now in case `acquire_thread` blocks
291 drop(is_blocked);
292
293 thread.registry.acquire_thread();
294 }
295
296 // Update other state:
297 idle_state.wake_fully();
298 latch.wake_up();
299
300 self.logger.log(|| ThreadAwoken {
301 worker: worker_index,
302 latch_addr: latch.addr(),
303 });
304 }
305
306 /// Notify the given thread that it should wake up (if it is
307 /// sleeping). When this method is invoked, we typically know the
308 /// thread is asleep, though in rare cases it could have been
309 /// awoken by (e.g.) new work having been posted.
310 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
311 self.wake_specific_thread(target_worker_index);
312 }
313
314 /// Signals that `num_jobs` new jobs were injected into the thread
315 /// pool from outside. This function will ensure that there are
316 /// threads available to process them, waking threads from sleep
317 /// if necessary.
318 ///
319 /// # Parameters
320 ///
321 /// - `source_worker_index` -- index of the thread that did the
322 /// push, or `usize::MAX` if this came from outside the thread
323 /// pool -- it is used only for logging.
324 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
325 /// We'll try to get at least one thread per job.
326 #[inline]
327 pub(super) fn new_injected_jobs(
328 &self,
329 source_worker_index: usize,
330 num_jobs: u32,
331 queue_was_empty: bool,
332 ) {
333 // This fence is needed to guarantee that threads
334 // as they are about to fall asleep, observe any
335 // new jobs that may have been injected.
336 std::sync::atomic::fence(Ordering::SeqCst);
337
338 self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
339 }
340
341 /// Signals that `num_jobs` new jobs were pushed onto a thread's
342 /// local deque. This function will try to ensure that there are
343 /// threads available to process them, waking threads from sleep
344 /// if necessary. However, this is not guaranteed: under certain
345 /// race conditions, the function may fail to wake any new
346 /// threads; in that case the existing thread should eventually
347 /// pop the job.
348 ///
349 /// # Parameters
350 ///
351 /// - `source_worker_index` -- index of the thread that did the
352 /// push, or `usize::MAX` if this came from outside the thread
353 /// pool -- it is used only for logging.
354 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
355 /// We'll try to get at least one thread per job.
356 #[inline]
357 pub(super) fn new_internal_jobs(
358 &self,
359 source_worker_index: usize,
360 num_jobs: u32,
361 queue_was_empty: bool,
362 ) {
363 self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
364 }
365
366 /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
367 #[inline]
368 fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) {
369 // Read the counters and -- if sleepy workers have announced themselves
370 // -- announce that there is now work available. The final value of `counters`
371 // with which we exit the loop thus corresponds to a state when
372 let counters = self
373 .counters
374 .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
375 let num_awake_but_idle = counters.awake_but_idle_threads();
376 let num_sleepers = counters.sleeping_threads();
377
378 self.logger.log(|| JobThreadCounts {
379 worker: source_worker_index,
380 num_idle: num_awake_but_idle as u16,
381 num_sleepers: num_sleepers as u16,
382 });
383
384 if num_sleepers == 0 {
385 // nobody to wake
386 return;
387 }
388
389 // Promote from u16 to u32 so we can interoperate with
390 // num_jobs more easily.
391 let num_awake_but_idle = num_awake_but_idle as u32;
392 let num_sleepers = num_sleepers as u32;
393
394 // If the queue is non-empty, then we always wake up a worker
395 // -- clearly the existing idle jobs aren't enough. Otherwise,
396 // check to see if we have enough idle workers.
397 if !queue_was_empty {
398 let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
399 self.wake_any_threads(num_to_wake);
400 } else if num_awake_but_idle < num_jobs {
401 let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
402 self.wake_any_threads(num_to_wake);
403 }
404 }
405
406 #[cold]
407 fn wake_any_threads(&self, mut num_to_wake: u32) {
408 if num_to_wake > 0 {
409 for i in 0..self.worker_sleep_states.len() {
410 if self.wake_specific_thread(i) {
411 num_to_wake -= 1;
412 if num_to_wake == 0 {
413 return;
414 }
415 }
416 }
417 }
418 }
419
420 fn wake_specific_thread(&self, index: usize) -> bool {
421 let sleep_state = &self.worker_sleep_states[index];
422
423 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
424 if *is_blocked {
425 *is_blocked = false;
426 sleep_state.condvar.notify_one();
427
428 // When the thread went to sleep, it will have incremented
429 // this value. When we wake it, its our job to decrement
430 // it. We could have the thread do it, but that would
431 // introduce a delay between when the thread was
432 // *notified* and when this counter was decremented. That
433 // might mislead people with new work into thinking that
434 // there are sleeping threads that they should try to
435 // wake, when in fact there is nothing left for them to
436 // do.
437 self.counters.sub_sleeping_thread();
438
439 // Increment the number of active threads
440 self.data.lock().unwrap().active_threads += 1;
441
442 self.logger.log(|| ThreadNotify { worker: index });
443
444 true
445 } else {
446 false
447 }
448 }
449 }
450
451 impl IdleState {
452 fn wake_fully(&mut self) {
453 self.rounds = 0;
454 self.jobs_counter = JobsEventCounter::DUMMY;
455 }
456
457 fn wake_partly(&mut self) {
458 self.rounds = ROUNDS_UNTIL_SLEEPY;
459 self.jobs_counter = JobsEventCounter::DUMMY;
460 }
461 }