1 //! Code that decides when workers should go to sleep. See README.md
4 use crate::latch
::CoreLatch
;
5 use crate::log
::Event
::*;
6 use crate::log
::Logger
;
7 use crate::registry
::WorkerThread
;
8 use crate::DeadlockHandler
;
9 use crossbeam_utils
::CachePadded
;
10 use std
::sync
::atomic
::Ordering
;
11 use std
::sync
::{Condvar, Mutex}
;
16 pub(crate) use self::counters
::THREADS_MAX
;
17 use self::counters
::{AtomicCounters, JobsEventCounter}
;
20 /// The number of threads in the thread pool.
23 /// The number of threads in the thread pool which are running and
24 /// aren't blocked in user code or sleeping.
25 active_threads
: usize,
27 /// The number of threads which are blocked in user code.
28 /// This doesn't include threads blocked by this module.
29 blocked_threads
: usize,
33 /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
35 pub fn deadlock_check(&self, deadlock_handler
: &Option
<Box
<DeadlockHandler
>>) {
36 if self.active_threads
== 0 && self.blocked_threads
> 0 {
37 (deadlock_handler
.as_ref().unwrap())();
42 /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
43 /// of workers. It has callbacks that are invoked periodically at significant events,
44 /// such as when workers are looping and looking for work, when latches are set, or when
45 /// jobs are published, and it either blocks threads or wakes them in response to these
46 /// events. See the [`README.md`] in this module for more details.
48 /// [`README.md`] README.md
49 pub(super) struct Sleep
{
52 /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
54 worker_sleep_states
: Vec
<CachePadded
<WorkerSleepState
>>,
56 counters
: AtomicCounters
,
58 data
: Mutex
<SleepData
>,
61 /// An instance of this struct is created when a thread becomes idle.
62 /// It is consumed when the thread finds work, and passed by `&mut`
63 /// reference for operations that preserve the idle state. (In other
64 /// words, producing one of these structs is evidence the thread is
65 /// idle.) It tracks state such as how long the thread has been idle.
66 pub(super) struct IdleState
{
67 /// What is worker index of the idle thread?
70 /// How many rounds have we been circling without sleeping?
73 /// Once we become sleepy, what was the sleepy counter value?
74 /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
75 jobs_counter
: JobsEventCounter
,
78 /// The "sleep state" for an individual worker.
80 struct WorkerSleepState
{
81 /// Set to true when the worker goes to sleep; set to false when
82 /// the worker is notified or when it wakes.
83 is_blocked
: Mutex
<bool
>,
88 const ROUNDS_UNTIL_SLEEPY
: u32 = 32;
89 const ROUNDS_UNTIL_SLEEPING
: u32 = ROUNDS_UNTIL_SLEEPY
+ 1;
92 pub(super) fn new(logger
: Logger
, n_threads
: usize) -> Sleep
{
93 assert
!(n_threads
<= THREADS_MAX
);
96 worker_sleep_states
: (0..n_threads
).map(|_
| Default
::default()).collect(),
97 counters
: AtomicCounters
::new(),
98 data
: Mutex
::new(SleepData
{
99 worker_count
: n_threads
,
100 active_threads
: n_threads
,
106 /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
107 /// if no other worker thread is active
109 pub fn mark_blocked(&self, deadlock_handler
: &Option
<Box
<DeadlockHandler
>>) {
110 let mut data
= self.data
.lock().unwrap();
111 debug_assert
!(data
.active_threads
> 0);
112 debug_assert
!(data
.blocked_threads
< data
.worker_count
);
113 debug_assert
!(data
.active_threads
> 0);
114 data
.active_threads
-= 1;
115 data
.blocked_threads
+= 1;
117 data
.deadlock_check(deadlock_handler
);
120 /// Mark a previously blocked Rayon worker thread as unblocked
122 pub fn mark_unblocked(&self) {
123 let mut data
= self.data
.lock().unwrap();
124 debug_assert
!(data
.active_threads
< data
.worker_count
);
125 debug_assert
!(data
.blocked_threads
> 0);
126 data
.active_threads
+= 1;
127 data
.blocked_threads
-= 1;
131 pub(super) fn start_looking(&self, worker_index
: usize, latch
: &CoreLatch
) -> IdleState
{
132 self.logger
.log(|| ThreadIdle
{
133 worker
: worker_index
,
134 latch_addr
: latch
.addr(),
137 self.counters
.add_inactive_thread();
142 jobs_counter
: JobsEventCounter
::DUMMY
,
147 pub(super) fn work_found(&self, idle_state
: IdleState
) {
148 self.logger
.log(|| ThreadFoundWork
{
149 worker
: idle_state
.worker_index
,
150 yields
: idle_state
.rounds
,
153 // If we were the last idle thread and other threads are still sleeping,
154 // then we should wake up another thread.
155 let threads_to_wake
= self.counters
.sub_inactive_thread();
156 self.wake_any_threads(threads_to_wake
as u32);
160 pub(super) fn no_work_found(
162 idle_state
: &mut IdleState
,
164 thread
: &WorkerThread
,
166 if idle_state
.rounds
< ROUNDS_UNTIL_SLEEPY
{
168 idle_state
.rounds
+= 1;
169 } else if idle_state
.rounds
== ROUNDS_UNTIL_SLEEPY
{
170 idle_state
.jobs_counter
= self.announce_sleepy(idle_state
.worker_index
);
171 idle_state
.rounds
+= 1;
173 } else if idle_state
.rounds
< ROUNDS_UNTIL_SLEEPING
{
174 idle_state
.rounds
+= 1;
177 debug_assert_eq
!(idle_state
.rounds
, ROUNDS_UNTIL_SLEEPING
);
178 self.sleep(idle_state
, latch
, thread
);
183 fn announce_sleepy(&self, worker_index
: usize) -> JobsEventCounter
{
186 .increment_jobs_event_counter_if(JobsEventCounter
::is_active
);
187 let jobs_counter
= counters
.jobs_counter();
188 self.logger
.log(|| ThreadSleepy
{
189 worker
: worker_index
,
190 jobs_counter
: jobs_counter
.as_usize(),
196 fn sleep(&self, idle_state
: &mut IdleState
, latch
: &CoreLatch
, thread
: &WorkerThread
) {
197 let worker_index
= idle_state
.worker_index
;
199 if !latch
.get_sleepy() {
200 self.logger
.log(|| ThreadSleepInterruptedByLatch
{
201 worker
: worker_index
,
202 latch_addr
: latch
.addr(),
208 let sleep_state
= &self.worker_sleep_states
[worker_index
];
209 let mut is_blocked
= sleep_state
.is_blocked
.lock().unwrap();
210 debug_assert
!(!*is_blocked
);
212 // Our latch was signalled. We should wake back up fully as we
213 // will have some stuff to do.
214 if !latch
.fall_asleep() {
215 self.logger
.log(|| ThreadSleepInterruptedByLatch
{
216 worker
: worker_index
,
217 latch_addr
: latch
.addr(),
220 idle_state
.wake_fully();
225 let counters
= self.counters
.load(Ordering
::SeqCst
);
227 // Check if the JEC has changed since we got sleepy.
228 debug_assert
!(idle_state
.jobs_counter
.is_sleepy());
229 if counters
.jobs_counter() != idle_state
.jobs_counter
{
230 // JEC has changed, so a new job was posted, but for some reason
231 // we didn't see it. We should return to just before the SLEEPY
232 // state so we can do another search and (if we fail to find
233 // work) go back to sleep.
234 self.logger
.log(|| ThreadSleepInterruptedByJob
{
235 worker
: worker_index
,
238 idle_state
.wake_partly();
243 // Otherwise, let's move from IDLE to SLEEPING.
244 if self.counters
.try_add_sleeping_thread(counters
) {
249 // Successfully registered as asleep.
251 self.logger
.log(|| ThreadSleeping
{
252 worker
: worker_index
,
253 latch_addr
: latch
.addr(),
256 // We have one last check for injected jobs to do. This protects against
257 // deadlock in the very unlikely event that
259 // - an external job is being injected while we are sleepy
260 // - that job triggers the rollover over the JEC such that we don't see it
261 // - we are the last active worker thread
262 std
::sync
::atomic
::fence(Ordering
::SeqCst
);
263 if thread
.has_injected_job() {
264 // If we see an externally injected job, then we have to 'wake
265 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
266 // the one that wakes us.)
267 self.counters
.sub_sleeping_thread();
270 // Decrement the number of active threads and check for a deadlock
271 let mut data
= self.data
.lock().unwrap();
272 data
.active_threads
-= 1;
273 data
.deadlock_check(&thread
.registry
.deadlock_handler
);
276 // If we don't see an injected job (the normal case), then flag
277 // ourselves as asleep and wait till we are notified.
279 // (Note that `is_blocked` is held under a mutex and the mutex was
280 // acquired *before* we incremented the "sleepy counter". This means
281 // that whomever is coming to wake us will have to wait until we
282 // release the mutex in the call to `wait`, so they will see this
284 thread
.registry
.release_thread();
287 is_blocked
= sleep_state
.condvar
.wait(is_blocked
).unwrap();
290 // Drop `is_blocked` now in case `acquire_thread` blocks
293 thread
.registry
.acquire_thread();
296 // Update other state:
297 idle_state
.wake_fully();
300 self.logger
.log(|| ThreadAwoken
{
301 worker
: worker_index
,
302 latch_addr
: latch
.addr(),
306 /// Notify the given thread that it should wake up (if it is
307 /// sleeping). When this method is invoked, we typically know the
308 /// thread is asleep, though in rare cases it could have been
309 /// awoken by (e.g.) new work having been posted.
310 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index
: usize) {
311 self.wake_specific_thread(target_worker_index
);
314 /// Signals that `num_jobs` new jobs were injected into the thread
315 /// pool from outside. This function will ensure that there are
316 /// threads available to process them, waking threads from sleep
321 /// - `source_worker_index` -- index of the thread that did the
322 /// push, or `usize::MAX` if this came from outside the thread
323 /// pool -- it is used only for logging.
324 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
325 /// We'll try to get at least one thread per job.
327 pub(super) fn new_injected_jobs(
329 source_worker_index
: usize,
331 queue_was_empty
: bool
,
333 // This fence is needed to guarantee that threads
334 // as they are about to fall asleep, observe any
335 // new jobs that may have been injected.
336 std
::sync
::atomic
::fence(Ordering
::SeqCst
);
338 self.new_jobs(source_worker_index
, num_jobs
, queue_was_empty
)
341 /// Signals that `num_jobs` new jobs were pushed onto a thread's
342 /// local deque. This function will try to ensure that there are
343 /// threads available to process them, waking threads from sleep
344 /// if necessary. However, this is not guaranteed: under certain
345 /// race conditions, the function may fail to wake any new
346 /// threads; in that case the existing thread should eventually
351 /// - `source_worker_index` -- index of the thread that did the
352 /// push, or `usize::MAX` if this came from outside the thread
353 /// pool -- it is used only for logging.
354 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
355 /// We'll try to get at least one thread per job.
357 pub(super) fn new_internal_jobs(
359 source_worker_index
: usize,
361 queue_was_empty
: bool
,
363 self.new_jobs(source_worker_index
, num_jobs
, queue_was_empty
)
366 /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
368 fn new_jobs(&self, source_worker_index
: usize, num_jobs
: u32, queue_was_empty
: bool
) {
369 // Read the counters and -- if sleepy workers have announced themselves
370 // -- announce that there is now work available. The final value of `counters`
371 // with which we exit the loop thus corresponds to a state when
374 .increment_jobs_event_counter_if(JobsEventCounter
::is_sleepy
);
375 let num_awake_but_idle
= counters
.awake_but_idle_threads();
376 let num_sleepers
= counters
.sleeping_threads();
378 self.logger
.log(|| JobThreadCounts
{
379 worker
: source_worker_index
,
380 num_idle
: num_awake_but_idle
as u16,
381 num_sleepers
: num_sleepers
as u16,
384 if num_sleepers
== 0 {
389 // Promote from u16 to u32 so we can interoperate with
390 // num_jobs more easily.
391 let num_awake_but_idle
= num_awake_but_idle
as u32;
392 let num_sleepers
= num_sleepers
as u32;
394 // If the queue is non-empty, then we always wake up a worker
395 // -- clearly the existing idle jobs aren't enough. Otherwise,
396 // check to see if we have enough idle workers.
397 if !queue_was_empty
{
398 let num_to_wake
= std
::cmp
::min(num_jobs
, num_sleepers
);
399 self.wake_any_threads(num_to_wake
);
400 } else if num_awake_but_idle
< num_jobs
{
401 let num_to_wake
= std
::cmp
::min(num_jobs
- num_awake_but_idle
, num_sleepers
);
402 self.wake_any_threads(num_to_wake
);
407 fn wake_any_threads(&self, mut num_to_wake
: u32) {
409 for i
in 0..self.worker_sleep_states
.len() {
410 if self.wake_specific_thread(i
) {
412 if num_to_wake
== 0 {
420 fn wake_specific_thread(&self, index
: usize) -> bool
{
421 let sleep_state
= &self.worker_sleep_states
[index
];
423 let mut is_blocked
= sleep_state
.is_blocked
.lock().unwrap();
426 sleep_state
.condvar
.notify_one();
428 // When the thread went to sleep, it will have incremented
429 // this value. When we wake it, its our job to decrement
430 // it. We could have the thread do it, but that would
431 // introduce a delay between when the thread was
432 // *notified* and when this counter was decremented. That
433 // might mislead people with new work into thinking that
434 // there are sleeping threads that they should try to
435 // wake, when in fact there is nothing left for them to
437 self.counters
.sub_sleeping_thread();
439 // Increment the number of active threads
440 self.data
.lock().unwrap().active_threads
+= 1;
442 self.logger
.log(|| ThreadNotify { worker: index }
);
452 fn wake_fully(&mut self) {
454 self.jobs_counter
= JobsEventCounter
::DUMMY
;
457 fn wake_partly(&mut self) {
458 self.rounds
= ROUNDS_UNTIL_SLEEPY
;
459 self.jobs_counter
= JobsEventCounter
::DUMMY
;