1 use crate::job
::{JobFifo, JobRef, StackJob}
;
2 use crate::latch
::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}
;
3 use crate::sleep
::Sleep
;
6 ErrorKind
, ExitHandler
, PanicHandler
, StartHandler
, ThreadPoolBuildError
, ThreadPoolBuilder
,
9 use crossbeam_deque
::{Injector, Steal, Stealer, Worker}
;
11 use std
::collections
::hash_map
::DefaultHasher
;
13 use std
::hash
::Hasher
;
17 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
18 use std
::sync
::{Arc, Mutex, Once}
;
22 /// Thread builder used for customization via
23 /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
24 pub struct ThreadBuilder
{
26 stack_size
: Option
<usize>,
27 worker
: Worker
<JobRef
>,
28 stealer
: Stealer
<JobRef
>,
29 registry
: Arc
<Registry
>,
34 /// Gets the index of this thread in the pool, within `0..num_threads`.
35 pub fn index(&self) -> usize {
39 /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
40 pub fn name(&self) -> Option
<&str> {
44 /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
45 pub fn stack_size(&self) -> Option
<usize> {
49 /// Executes the main loop for this thread. This will not return until the
50 /// thread pool is dropped.
52 unsafe { main_loop(self) }
56 impl fmt
::Debug
for ThreadBuilder
{
57 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
58 f
.debug_struct("ThreadBuilder")
59 .field("pool", &self.registry
.id())
60 .field("index", &self.index
)
61 .field("name", &self.name
)
62 .field("stack_size", &self.stack_size
)
67 /// Generalized trait for spawning a thread in the `Registry`.
69 /// This trait is pub-in-private -- E0445 forces us to make it public,
70 /// but we don't actually want to expose these details in the API.
71 pub trait ThreadSpawn
{
74 /// Spawn a thread with the `ThreadBuilder` parameters, and then
75 /// call `ThreadBuilder::run()`.
76 fn spawn(&mut self, thread
: ThreadBuilder
) -> io
::Result
<()>;
79 /// Spawns a thread in the "normal" way with `std::thread::Builder`.
81 /// This type is pub-in-private -- E0445 forces us to make it public,
82 /// but we don't actually want to expose these details in the API.
83 #[derive(Debug, Default)]
84 pub struct DefaultSpawn
;
86 impl ThreadSpawn
for DefaultSpawn
{
89 fn spawn(&mut self, thread
: ThreadBuilder
) -> io
::Result
<()> {
90 let mut b
= thread
::Builder
::new();
91 if let Some(name
) = thread
.name() {
92 b
= b
.name(name
.to_owned());
94 if let Some(stack_size
) = thread
.stack_size() {
95 b
= b
.stack_size(stack_size
);
97 b
.spawn(|| thread
.run())?
;
102 /// Spawns a thread with a user's custom callback.
104 /// This type is pub-in-private -- E0445 forces us to make it public,
105 /// but we don't actually want to expose these details in the API.
107 pub struct CustomSpawn
<F
>(F
);
109 impl<F
> CustomSpawn
<F
>
111 F
: FnMut(ThreadBuilder
) -> io
::Result
<()>,
113 pub(super) fn new(spawn
: F
) -> Self {
118 impl<F
> ThreadSpawn
for CustomSpawn
<F
>
120 F
: FnMut(ThreadBuilder
) -> io
::Result
<()>,
125 fn spawn(&mut self, thread
: ThreadBuilder
) -> io
::Result
<()> {
130 pub(super) struct Registry
{
131 thread_infos
: Vec
<ThreadInfo
>,
133 injected_jobs
: Injector
<JobRef
>,
134 broadcasts
: Mutex
<Vec
<Worker
<JobRef
>>>,
135 panic_handler
: Option
<Box
<PanicHandler
>>,
136 start_handler
: Option
<Box
<StartHandler
>>,
137 exit_handler
: Option
<Box
<ExitHandler
>>,
139 // When this latch reaches 0, it means that all work on this
140 // registry must be complete. This is ensured in the following ways:
142 // - if this is the global registry, there is a ref-count that never
144 // - if this is a user-created thread-pool, then so long as the thread-pool
145 // exists, it holds a reference.
146 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
147 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
148 // return until the blocking job is complete, that ref will continue to be held.
149 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
150 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
151 // and that job will keep the pool alive.
152 terminate_count
: AtomicUsize
,
155 /// ////////////////////////////////////////////////////////////////////////
158 static mut THE_REGISTRY
: Option
<Arc
<Registry
>> = None
;
159 static THE_REGISTRY_SET
: Once
= Once
::new();
161 /// Starts the worker threads (if that has not already happened). If
162 /// initialization has not already occurred, use the default
164 pub(super) fn global_registry() -> &'
static Arc
<Registry
> {
165 set_global_registry(default_global_registry
)
166 .or_else(|err
| unsafe { THE_REGISTRY.as_ref().ok_or(err) }
)
167 .expect("The global thread pool has not been initialized.")
170 /// Starts the worker threads (if that has not already happened) with
171 /// the given builder.
172 pub(super) fn init_global_registry
<S
>(
173 builder
: ThreadPoolBuilder
<S
>,
174 ) -> Result
<&'
static Arc
<Registry
>, ThreadPoolBuildError
>
178 set_global_registry(|| Registry
::new(builder
))
181 /// Starts the worker threads (if that has not already happened)
182 /// by creating a registry with the given callback.
183 fn set_global_registry
<F
>(registry
: F
) -> Result
<&'
static Arc
<Registry
>, ThreadPoolBuildError
>
185 F
: FnOnce() -> Result
<Arc
<Registry
>, ThreadPoolBuildError
>,
187 let mut result
= Err(ThreadPoolBuildError
::new(
188 ErrorKind
::GlobalPoolAlreadyInitialized
,
191 THE_REGISTRY_SET
.call_once(|| {
193 .map(|registry
: Arc
<Registry
>| unsafe { &*THE_REGISTRY.get_or_insert(registry) }
)
199 fn default_global_registry() -> Result
<Arc
<Registry
>, ThreadPoolBuildError
> {
200 let result
= Registry
::new(ThreadPoolBuilder
::new());
202 // If we're running in an environment that doesn't support threads at all, we can fall back to
203 // using the current thread alone. This is crude, and probably won't work for non-blocking
204 // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
206 // Notably, this allows current WebAssembly targets to work even though their threading support
207 // is stubbed out, and we won't have to change anything if they do add real threading.
208 let unsupported
= matches
!(&result
, Err(e
) if e
.is_unsupported());
209 if unsupported
&& WorkerThread
::current().is_null() {
210 let builder
= ThreadPoolBuilder
::new().num_threads(1).use_current_thread();
211 let fallback_result
= Registry
::new(builder
);
212 if fallback_result
.is_ok() {
213 return fallback_result
;
220 struct Terminator
<'a
>(&'a Arc
<Registry
>);
222 impl<'a
> Drop
for Terminator
<'a
> {
229 pub(super) fn new
<S
>(
230 mut builder
: ThreadPoolBuilder
<S
>,
231 ) -> Result
<Arc
<Self>, ThreadPoolBuildError
>
235 // Soft-limit the number of threads that we can actually support.
236 let n_threads
= Ord
::min(builder
.get_num_threads(), crate::max_num_threads());
238 let breadth_first
= builder
.get_breadth_first();
240 let (workers
, stealers
): (Vec
<_
>, Vec
<_
>) = (0..n_threads
)
242 let worker
= if breadth_first
{
248 let stealer
= worker
.stealer();
253 let (broadcasts
, broadcast_stealers
): (Vec
<_
>, Vec
<_
>) = (0..n_threads
)
255 let worker
= Worker
::new_fifo();
256 let stealer
= worker
.stealer();
261 let registry
= Arc
::new(Registry
{
262 thread_infos
: stealers
.into_iter().map(ThreadInfo
::new
).collect(),
263 sleep
: Sleep
::new(n_threads
),
264 injected_jobs
: Injector
::new(),
265 broadcasts
: Mutex
::new(broadcasts
),
266 terminate_count
: AtomicUsize
::new(1),
267 panic_handler
: builder
.take_panic_handler(),
268 start_handler
: builder
.take_start_handler(),
269 exit_handler
: builder
.take_exit_handler(),
272 // If we return early or panic, make sure to terminate existing threads.
273 let t1000
= Terminator(®istry
);
275 for (index
, (worker
, stealer
)) in workers
.into_iter().zip(broadcast_stealers
).enumerate() {
276 let thread
= ThreadBuilder
{
277 name
: builder
.get_thread_name(index
),
278 stack_size
: builder
.get_stack_size(),
279 registry
: Arc
::clone(®istry
),
285 if index
== 0 && builder
.use_current_thread
{
286 if !WorkerThread
::current().is_null() {
287 return Err(ThreadPoolBuildError
::new(
288 ErrorKind
::CurrentThreadAlreadyInPool
,
291 // Rather than starting a new thread, we're just taking over the current thread
292 // *without* running the main loop, so we can still return from here.
293 // The WorkerThread is leaked, but we never shutdown the global pool anyway.
294 let worker_thread
= Box
::into_raw(Box
::new(WorkerThread
::from(thread
)));
297 WorkerThread
::set_current(worker_thread
);
298 Latch
::set(®istry
.thread_infos
[index
].primed
);
303 if let Err(e
) = builder
.get_spawn_handler().spawn(thread
) {
304 return Err(ThreadPoolBuildError
::new(ErrorKind
::IOError(e
)));
308 // Returning normally now, without termination.
314 pub(super) fn current() -> Arc
<Registry
> {
316 let worker_thread
= WorkerThread
::current();
317 let registry
= if worker_thread
.is_null() {
320 &(*worker_thread
).registry
326 /// Returns the number of threads in the current registry. This
327 /// is better than `Registry::current().num_threads()` because it
328 /// avoids incrementing the `Arc`.
329 pub(super) fn current_num_threads() -> usize {
331 let worker_thread
= WorkerThread
::current();
332 if worker_thread
.is_null() {
333 global_registry().num_threads()
335 (*worker_thread
).registry
.num_threads()
340 /// Returns the current `WorkerThread` if it's part of this `Registry`.
341 pub(super) fn current_thread(&self) -> Option
<&WorkerThread
> {
343 let worker
= WorkerThread
::current().as_ref()?
;
344 if worker
.registry().id() == self.id() {
352 /// Returns an opaque identifier for this registry.
353 pub(super) fn id(&self) -> RegistryId
{
354 // We can rely on `self` not to change since we only ever create
355 // registries that are boxed up in an `Arc` (see `new()` above).
357 addr
: self as *const Self as usize,
361 pub(super) fn num_threads(&self) -> usize {
362 self.thread_infos
.len()
365 pub(super) fn catch_unwind(&self, f
: impl FnOnce()) {
366 if let Err(err
) = unwind
::halt_unwinding(f
) {
367 // If there is no handler, or if that handler itself panics, then we abort.
368 let abort_guard
= unwind
::AbortIfPanic
;
369 if let Some(ref handler
) = self.panic_handler
{
371 mem
::forget(abort_guard
);
376 /// Waits for the worker threads to get up and running. This is
377 /// meant to be used for benchmarking purposes, primarily, so that
378 /// you can get more consistent numbers by having everything
380 pub(super) fn wait_until_primed(&self) {
381 for info
in &self.thread_infos
{
386 /// Waits for the worker threads to stop. This is used for testing
387 /// -- so we can check that termination actually works.
389 pub(super) fn wait_until_stopped(&self) {
390 for info
in &self.thread_infos
{
395 /// ////////////////////////////////////////////////////////////////////////
398 /// So long as all of the worker threads are hanging out in their
399 /// top-level loop, there is no work to be done.
401 /// Push a job into the given `registry`. If we are running on a
402 /// worker thread for the registry, this will push onto the
403 /// deque. Else, it will inject from the outside (which is slower).
404 pub(super) fn inject_or_push(&self, job_ref
: JobRef
) {
405 let worker_thread
= WorkerThread
::current();
407 if !worker_thread
.is_null() && (*worker_thread
).registry().id() == self.id() {
408 (*worker_thread
).push(job_ref
);
410 self.inject(job_ref
);
415 /// Push a job into the "external jobs" queue; it will be taken by
416 /// whatever worker has nothing to do. Use this if you know that
417 /// you are not on a worker of this registry.
418 pub(super) fn inject(&self, injected_job
: JobRef
) {
419 // It should not be possible for `state.terminate` to be true
420 // here. It is only set to true when the user creates (and
421 // drops) a `ThreadPool`; and, in that case, they cannot be
422 // calling `inject()` later, since they dropped their
425 self.terminate_count
.load(Ordering
::Acquire
),
427 "inject() sees state.terminate as true"
430 let queue_was_empty
= self.injected_jobs
.is_empty();
432 self.injected_jobs
.push(injected_job
);
433 self.sleep
.new_injected_jobs(1, queue_was_empty
);
436 fn has_injected_job(&self) -> bool
{
437 !self.injected_jobs
.is_empty()
440 fn pop_injected_job(&self) -> Option
<JobRef
> {
442 match self.injected_jobs
.steal() {
443 Steal
::Success(job
) => return Some(job
),
444 Steal
::Empty
=> return None
,
450 /// Push a job into each thread's own "external jobs" queue; it will be
451 /// executed only on that thread, when it has nothing else to do locally,
452 /// before it tries to steal other work.
454 /// **Panics** if not given exactly as many jobs as there are threads.
455 pub(super) fn inject_broadcast(&self, injected_jobs
: impl ExactSizeIterator
<Item
= JobRef
>) {
456 assert_eq
!(self.num_threads(), injected_jobs
.len());
458 let broadcasts
= self.broadcasts
.lock().unwrap();
460 // It should not be possible for `state.terminate` to be true
461 // here. It is only set to true when the user creates (and
462 // drops) a `ThreadPool`; and, in that case, they cannot be
463 // calling `inject_broadcast()` later, since they dropped their
466 self.terminate_count
.load(Ordering
::Acquire
),
468 "inject_broadcast() sees state.terminate as true"
471 assert_eq
!(broadcasts
.len(), injected_jobs
.len());
472 for (worker
, job_ref
) in broadcasts
.iter().zip(injected_jobs
) {
473 worker
.push(job_ref
);
476 for i
in 0..self.num_threads() {
477 self.sleep
.notify_worker_latch_is_set(i
);
481 /// If already in a worker-thread of this registry, just execute `op`.
482 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
483 /// completes and return its return value. If `op` panics, that panic will
484 /// be propagated as well. The second argument indicates `true` if injection
485 /// was performed, `false` if executed directly.
486 pub(super) fn in_worker
<OP
, R
>(&self, op
: OP
) -> R
488 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
492 let worker_thread
= WorkerThread
::current();
493 if worker_thread
.is_null() {
494 self.in_worker_cold(op
)
495 } else if (*worker_thread
).registry().id() != self.id() {
496 self.in_worker_cross(&*worker_thread
, op
)
498 // Perfectly valid to give them a `&T`: this is the
499 // current thread, so we know the data structure won't be
500 // invalidated until we return.
501 op(&*worker_thread
, false)
507 unsafe fn in_worker_cold
<OP
, R
>(&self, op
: OP
) -> R
509 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
512 thread_local
!(static LOCK_LATCH
: LockLatch
= LockLatch
::new());
514 LOCK_LATCH
.with(|l
| {
515 // This thread isn't a member of *any* thread pool, so just block.
516 debug_assert
!(WorkerThread
::current().is_null());
517 let job
= StackJob
::new(
519 let worker_thread
= WorkerThread
::current();
520 assert
!(injected
&& !worker_thread
.is_null());
521 op(&*worker_thread
, true)
525 self.inject(job
.as_job_ref());
526 job
.latch
.wait_and_reset(); // Make sure we can use the same latch again next time.
533 unsafe fn in_worker_cross
<OP
, R
>(&self, current_thread
: &WorkerThread
, op
: OP
) -> R
535 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
538 // This thread is a member of a different pool, so let it process
539 // other work while waiting for this `op` to complete.
540 debug_assert
!(current_thread
.registry().id() != self.id());
541 let latch
= SpinLatch
::cross(current_thread
);
542 let job
= StackJob
::new(
544 let worker_thread
= WorkerThread
::current();
545 assert
!(injected
&& !worker_thread
.is_null());
546 op(&*worker_thread
, true)
550 self.inject(job
.as_job_ref());
551 current_thread
.wait_until(&job
.latch
);
555 /// Increments the terminate counter. This increment should be
556 /// balanced by a call to `terminate`, which will decrement. This
557 /// is used when spawning asynchronous work, which needs to
558 /// prevent the registry from terminating so long as it is active.
560 /// Note that blocking functions such as `join` and `scope` do not
561 /// need to concern themselves with this fn; their context is
562 /// responsible for ensuring the current thread-pool will not
563 /// terminate until they return.
565 /// The global thread-pool always has an outstanding reference
566 /// (the initial one). Custom thread-pools have one outstanding
567 /// reference that is dropped when the `ThreadPool` is dropped:
568 /// since installing the thread-pool blocks until any joins/scopes
569 /// complete, this ensures that joins/scopes are covered.
571 /// The exception is `::spawn()`, which can create a job outside
572 /// of any blocking scope. In that case, the job itself holds a
573 /// terminate count and is responsible for invoking `terminate()`
575 pub(super) fn increment_terminate_count(&self) {
576 let previous
= self.terminate_count
.fetch_add(1, Ordering
::AcqRel
);
577 debug_assert
!(previous
!= 0, "registry ref count incremented from zero");
579 previous
!= std
::usize::MAX
,
580 "overflow in registry ref count"
584 /// Signals that the thread-pool which owns this registry has been
585 /// dropped. The worker threads will gradually terminate, once any
586 /// extant work is completed.
587 pub(super) fn terminate(&self) {
588 if self.terminate_count
.fetch_sub(1, Ordering
::AcqRel
) == 1 {
589 for (i
, thread_info
) in self.thread_infos
.iter().enumerate() {
590 unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) }
;
595 /// Notify the worker that the latch they are sleeping on has been "set".
596 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index
: usize) {
597 self.sleep
.notify_worker_latch_is_set(target_worker_index
);
601 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
602 pub(super) struct RegistryId
{
607 /// Latch set once thread has started and we are entering into the
608 /// main loop. Used to wait for worker threads to become primed,
609 /// primarily of interest for benchmarking.
612 /// Latch is set once worker thread has completed. Used to wait
613 /// until workers have stopped; only used for tests.
616 /// The latch used to signal that terminated has been requested.
617 /// This latch is *set* by the `terminate` method on the
618 /// `Registry`, once the registry's main "terminate" counter
620 terminate
: OnceLatch
,
622 /// the "stealer" half of the worker's deque
623 stealer
: Stealer
<JobRef
>,
627 fn new(stealer
: Stealer
<JobRef
>) -> ThreadInfo
{
629 primed
: LockLatch
::new(),
630 stopped
: LockLatch
::new(),
631 terminate
: OnceLatch
::new(),
637 /// ////////////////////////////////////////////////////////////////////////
638 /// WorkerThread identifiers
640 pub(super) struct WorkerThread
{
641 /// the "worker" half of our local deque
642 worker
: Worker
<JobRef
>,
644 /// the "stealer" half of the worker's broadcast deque
645 stealer
: Stealer
<JobRef
>,
647 /// local queue used for `spawn_fifo` indirection
652 /// A weak random number generator.
655 registry
: Arc
<Registry
>,
658 // This is a bit sketchy, but basically: the WorkerThread is
659 // allocated on the stack of the worker on entry and stored into this
660 // thread local variable. So it will remain valid at least until the
661 // worker is fully unwound. Using an unsafe pointer avoids the need
662 // for a RefCell<T> etc.
664 static WORKER_THREAD_STATE
: Cell
<*const WorkerThread
> = const { Cell::new(ptr::null()) }
;
667 impl From
<ThreadBuilder
> for WorkerThread
{
668 fn from(thread
: ThreadBuilder
) -> Self {
670 worker
: thread
.worker
,
671 stealer
: thread
.stealer
,
672 fifo
: JobFifo
::new(),
674 rng
: XorShift64Star
::new(),
675 registry
: thread
.registry
,
680 impl Drop
for WorkerThread
{
682 // Undo `set_current`
683 WORKER_THREAD_STATE
.with(|t
| {
684 assert
!(t
.get().eq(&(self as *const _
)));
691 /// Gets the `WorkerThread` index for the current thread; returns
692 /// NULL if this is not a worker thread. This pointer is valid
693 /// anywhere on the current thread.
695 pub(super) fn current() -> *const WorkerThread
{
696 WORKER_THREAD_STATE
.with(Cell
::get
)
699 /// Sets `self` as the worker thread index for the current thread.
700 /// This is done during worker thread startup.
701 unsafe fn set_current(thread
: *const WorkerThread
) {
702 WORKER_THREAD_STATE
.with(|t
| {
703 assert
!(t
.get().is_null());
708 /// Returns the registry that owns this worker thread.
710 pub(super) fn registry(&self) -> &Arc
<Registry
> {
714 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
716 pub(super) fn index(&self) -> usize {
721 pub(super) unsafe fn push(&self, job
: JobRef
) {
722 let queue_was_empty
= self.worker
.is_empty();
723 self.worker
.push(job
);
724 self.registry
.sleep
.new_internal_jobs(1, queue_was_empty
);
728 pub(super) unsafe fn push_fifo(&self, job
: JobRef
) {
729 self.push(self.fifo
.push(job
));
733 pub(super) fn local_deque_is_empty(&self) -> bool
{
734 self.worker
.is_empty()
737 /// Attempts to obtain a "local" job -- typically this means
738 /// popping from the top of the stack, though if we are configured
739 /// for breadth-first execution, it would mean dequeuing from the
742 pub(super) fn take_local_job(&self) -> Option
<JobRef
> {
743 let popped_job
= self.worker
.pop();
745 if popped_job
.is_some() {
750 match self.stealer
.steal() {
751 Steal
::Success(job
) => return Some(job
),
752 Steal
::Empty
=> return None
,
758 fn has_injected_job(&self) -> bool
{
759 !self.stealer
.is_empty() || self.registry
.has_injected_job()
762 /// Wait until the latch is set. Try to keep busy by popping and
763 /// stealing tasks as necessary.
765 pub(super) unsafe fn wait_until
<L
: AsCoreLatch
+ ?Sized
>(&self, latch
: &L
) {
766 let latch
= latch
.as_core_latch();
768 self.wait_until_cold(latch
);
773 unsafe fn wait_until_cold(&self, latch
: &CoreLatch
) {
774 // the code below should swallow all panics and hence never
775 // unwind; but if something does wrong, we want to abort,
776 // because otherwise other code in rayon may assume that the
777 // latch has been signaled, and that can lead to random memory
778 // accesses, which would be *very bad*
779 let abort_guard
= unwind
::AbortIfPanic
;
781 'outer
: while !latch
.probe() {
782 // Check for local work *before* we start marking ourself idle,
783 // especially to avoid modifying shared sleep state.
784 if let Some(job
) = self.take_local_job() {
789 let mut idle_state
= self.registry
.sleep
.start_looking(self.index
);
790 while !latch
.probe() {
791 if let Some(job
) = self.find_work() {
792 self.registry
.sleep
.work_found();
794 // The job might have injected local work, so go back to the outer loop.
799 .no_work_found(&mut idle_state
, latch
, || self.has_injected_job())
803 // If we were sleepy, we are not anymore. We "found work" --
804 // whatever the surrounding thread was doing before it had to wait.
805 self.registry
.sleep
.work_found();
809 mem
::forget(abort_guard
); // successful execution, do not abort
812 unsafe fn wait_until_out_of_work(&self) {
813 debug_assert_eq
!(self as *const _
, WorkerThread
::current());
814 let registry
= &*self.registry
;
815 let index
= self.index
;
817 self.wait_until(®istry
.thread_infos
[index
].terminate
);
819 // Should not be any work left in our queue.
820 debug_assert
!(self.take_local_job().is_none());
822 // Let registry know we are done
823 Latch
::set(®istry
.thread_infos
[index
].stopped
);
826 fn find_work(&self) -> Option
<JobRef
> {
827 // Try to find some work to do. We give preference first
828 // to things in our local deque, then in other workers
829 // deques, and finally to injected jobs from the
830 // outside. The idea is to finish what we started before
831 // we take on something new.
832 self.take_local_job()
833 .or_else(|| self.steal())
834 .or_else(|| self.registry
.pop_injected_job())
837 pub(super) fn yield_now(&self) -> Yield
{
838 match self.find_work() {
839 Some(job
) => unsafe {
847 pub(super) fn yield_local(&self) -> Yield
{
848 match self.take_local_job() {
849 Some(job
) => unsafe {
858 pub(super) unsafe fn execute(&self, job
: JobRef
) {
862 /// Try to steal a single job and return it.
864 /// This should only be done as a last resort, when there is no
865 /// local work to do.
866 fn steal(&self) -> Option
<JobRef
> {
867 // we only steal when we don't have any work to do locally
868 debug_assert
!(self.local_deque_is_empty());
870 // otherwise, try to steal
871 let thread_infos
= &self.registry
.thread_infos
.as_slice();
872 let num_threads
= thread_infos
.len();
873 if num_threads
<= 1 {
878 let mut retry
= false;
879 let start
= self.rng
.next_usize(num_threads
);
880 let job
= (start
..num_threads
)
882 .filter(move |&i
| i
!= self.index
)
883 .find_map(|victim_index
| {
884 let victim
= &thread_infos
[victim_index
];
885 match victim
.stealer
.steal() {
886 Steal
::Success(job
) => Some(job
),
887 Steal
::Empty
=> None
,
894 if job
.is_some() || !retry
{
901 /// ////////////////////////////////////////////////////////////////////////
903 unsafe fn main_loop(thread
: ThreadBuilder
) {
904 let worker_thread
= &WorkerThread
::from(thread
);
905 WorkerThread
::set_current(worker_thread
);
906 let registry
= &*worker_thread
.registry
;
907 let index
= worker_thread
.index
;
909 // let registry know we are ready to do work
910 Latch
::set(®istry
.thread_infos
[index
].primed
);
912 // Worker threads should not panic. If they do, just abort, as the
913 // internal state of the threadpool is corrupted. Note that if
914 // **user code** panics, we should catch that and redirect.
915 let abort_guard
= unwind
::AbortIfPanic
;
917 // Inform a user callback that we started a thread.
918 if let Some(ref handler
) = registry
.start_handler
{
919 registry
.catch_unwind(|| handler(index
));
922 worker_thread
.wait_until_out_of_work();
924 // Normal termination, do not abort.
925 mem
::forget(abort_guard
);
927 // Inform a user callback that we exited a thread.
928 if let Some(ref handler
) = registry
.exit_handler
{
929 registry
.catch_unwind(|| handler(index
));
930 // We're already exiting the thread, there's nothing else to do.
934 /// If already in a worker-thread, just execute `op`. Otherwise,
935 /// execute `op` in the default thread-pool. Either way, block until
936 /// `op` completes and return its return value. If `op` panics, that
937 /// panic will be propagated as well. The second argument indicates
938 /// `true` if injection was performed, `false` if executed directly.
939 pub(super) fn in_worker
<OP
, R
>(op
: OP
) -> R
941 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
945 let owner_thread
= WorkerThread
::current();
946 if !owner_thread
.is_null() {
947 // Perfectly valid to give them a `&T`: this is the
948 // current thread, so we know the data structure won't be
949 // invalidated until we return.
950 op(&*owner_thread
, false)
952 global_registry().in_worker(op
)
957 /// [xorshift*] is a fast pseudorandom number generator which will
958 /// even tolerate weak seeding, as long as it's not zero.
960 /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
961 struct XorShift64Star
{
965 impl XorShift64Star
{
967 // Any non-zero seed will do -- this uses the hash of a global counter.
970 let mut hasher
= DefaultHasher
::new();
971 static COUNTER
: AtomicUsize
= AtomicUsize
::new(0);
972 hasher
.write_usize(COUNTER
.fetch_add(1, Ordering
::Relaxed
));
973 seed
= hasher
.finish();
977 state
: Cell
::new(seed
),
981 fn next(&self) -> u64 {
982 let mut x
= self.state
.get();
983 debug_assert_ne
!(x
, 0);
988 x
.wrapping_mul(0x2545_f491_4f6c_dd1d)
991 /// Return a value from `0..n`.
992 fn next_usize(&self, n
: usize) -> usize {
993 (self.next() % n
as u64) as usize