1 use crate::job
::{JobFifo, JobRef, StackJob}
;
2 use crate::latch
::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch}
;
3 use crate::log
::Event
::*;
4 use crate::log
::Logger
;
5 use crate::sleep
::Sleep
;
8 AcquireThreadHandler
, DeadlockHandler
, ErrorKind
, ExitHandler
, PanicHandler
,
9 ReleaseThreadHandler
, StartHandler
, ThreadPoolBuildError
, ThreadPoolBuilder
,
11 use crossbeam_deque
::{Injector, Steal, Stealer, Worker}
;
14 use std
::collections
::hash_map
::DefaultHasher
;
16 use std
::hash
::Hasher
;
21 use std
::sync
::atomic
::ATOMIC_USIZE_INIT
;
22 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
23 use std
::sync
::{Arc, Once}
;
27 /// Thread builder used for customization via
28 /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
29 pub struct ThreadBuilder
{
31 stack_size
: Option
<usize>,
32 worker
: Worker
<JobRef
>,
33 registry
: Arc
<Registry
>,
38 /// Gets the index of this thread in the pool, within `0..num_threads`.
39 pub fn index(&self) -> usize {
43 /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
44 pub fn name(&self) -> Option
<&str> {
45 self.name
.as_ref().map(String
::as_str
)
48 /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
49 pub fn stack_size(&self) -> Option
<usize> {
53 /// Executes the main loop for this thread. This will not return until the
54 /// thread pool is dropped.
56 unsafe { main_loop(self.worker, self.registry, self.index) }
60 impl fmt
::Debug
for ThreadBuilder
{
61 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
62 f
.debug_struct("ThreadBuilder")
63 .field("pool", &self.registry
.id())
64 .field("index", &self.index
)
65 .field("name", &self.name
)
66 .field("stack_size", &self.stack_size
)
71 /// Generalized trait for spawning a thread in the `Registry`.
73 /// This trait is pub-in-private -- E0445 forces us to make it public,
74 /// but we don't actually want to expose these details in the API.
75 pub trait ThreadSpawn
{
78 /// Spawn a thread with the `ThreadBuilder` parameters, and then
79 /// call `ThreadBuilder::run()`.
80 fn spawn(&mut self, thread
: ThreadBuilder
) -> io
::Result
<()>;
83 /// Spawns a thread in the "normal" way with `std::thread::Builder`.
85 /// This type is pub-in-private -- E0445 forces us to make it public,
86 /// but we don't actually want to expose these details in the API.
87 #[derive(Debug, Default)]
88 pub struct DefaultSpawn
;
90 impl ThreadSpawn
for DefaultSpawn
{
93 fn spawn(&mut self, thread
: ThreadBuilder
) -> io
::Result
<()> {
94 let mut b
= thread
::Builder
::new();
95 if let Some(name
) = thread
.name() {
96 b
= b
.name(name
.to_owned());
98 if let Some(stack_size
) = thread
.stack_size() {
99 b
= b
.stack_size(stack_size
);
101 b
.spawn(|| thread
.run())?
;
106 /// Spawns a thread with a user's custom callback.
108 /// This type is pub-in-private -- E0445 forces us to make it public,
109 /// but we don't actually want to expose these details in the API.
111 pub struct CustomSpawn
<F
>(F
);
113 impl<F
> CustomSpawn
<F
>
115 F
: FnMut(ThreadBuilder
) -> io
::Result
<()>,
117 pub(super) fn new(spawn
: F
) -> Self {
122 impl<F
> ThreadSpawn
for CustomSpawn
<F
>
124 F
: FnMut(ThreadBuilder
) -> io
::Result
<()>,
129 fn spawn(&mut self, thread
: ThreadBuilder
) -> io
::Result
<()> {
134 pub struct Registry
{
136 thread_infos
: Vec
<ThreadInfo
>,
138 injected_jobs
: Injector
<JobRef
>,
139 panic_handler
: Option
<Box
<PanicHandler
>>,
140 pub(crate) deadlock_handler
: Option
<Box
<DeadlockHandler
>>,
141 start_handler
: Option
<Box
<StartHandler
>>,
142 exit_handler
: Option
<Box
<ExitHandler
>>,
143 pub(crate) acquire_thread_handler
: Option
<Box
<AcquireThreadHandler
>>,
144 pub(crate) release_thread_handler
: Option
<Box
<ReleaseThreadHandler
>>,
146 // When this latch reaches 0, it means that all work on this
147 // registry must be complete. This is ensured in the following ways:
149 // - if this is the global registry, there is a ref-count that never
151 // - if this is a user-created thread-pool, then so long as the thread-pool
152 // exists, it holds a reference.
153 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
154 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
155 // return until the blocking job is complete, that ref will continue to be held.
156 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
157 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
158 // and that job will keep the pool alive.
159 terminate_count
: AtomicUsize
,
162 /// ////////////////////////////////////////////////////////////////////////
165 static mut THE_REGISTRY
: Option
<Arc
<Registry
>> = None
;
166 static THE_REGISTRY_SET
: Once
= Once
::new();
168 /// Starts the worker threads (if that has not already happened). If
169 /// initialization has not already occurred, use the default
171 pub(super) fn global_registry() -> &'
static Arc
<Registry
> {
172 set_global_registry(|| Registry
::new(ThreadPoolBuilder
::new()))
173 .or_else(|err
| unsafe { THE_REGISTRY.as_ref().ok_or(err) }
)
174 .expect("The global thread pool has not been initialized.")
177 /// Starts the worker threads (if that has not already happened) with
178 /// the given builder.
179 pub(super) fn init_global_registry
<S
>(
180 builder
: ThreadPoolBuilder
<S
>,
181 ) -> Result
<&'
static Arc
<Registry
>, ThreadPoolBuildError
>
185 set_global_registry(|| Registry
::new(builder
))
188 /// Starts the worker threads (if that has not already happened)
189 /// by creating a registry with the given callback.
190 fn set_global_registry
<F
>(registry
: F
) -> Result
<&'
static Arc
<Registry
>, ThreadPoolBuildError
>
192 F
: FnOnce() -> Result
<Arc
<Registry
>, ThreadPoolBuildError
>,
194 let mut result
= Err(ThreadPoolBuildError
::new(
195 ErrorKind
::GlobalPoolAlreadyInitialized
,
198 THE_REGISTRY_SET
.call_once(|| {
200 .map(|registry
: Arc
<Registry
>| unsafe { &*THE_REGISTRY.get_or_insert(registry) }
)
206 struct Terminator
<'a
>(&'a Arc
<Registry
>);
208 impl<'a
> Drop
for Terminator
<'a
> {
215 pub(super) fn new
<S
>(
216 mut builder
: ThreadPoolBuilder
<S
>,
217 ) -> Result
<Arc
<Self>, ThreadPoolBuildError
>
221 // Soft-limit the number of threads that we can actually support.
222 let n_threads
= Ord
::min(builder
.get_num_threads(), crate::max_num_threads());
224 let breadth_first
= builder
.get_breadth_first();
226 let (workers
, stealers
): (Vec
<_
>, Vec
<_
>) = (0..n_threads
)
228 let worker
= if breadth_first
{
234 let stealer
= worker
.stealer();
239 let logger
= Logger
::new(n_threads
);
240 let registry
= Arc
::new(Registry
{
241 logger
: logger
.clone(),
242 thread_infos
: stealers
.into_iter().map(ThreadInfo
::new
).collect(),
243 sleep
: Sleep
::new(logger
, n_threads
),
244 injected_jobs
: Injector
::new(),
245 terminate_count
: AtomicUsize
::new(1),
246 panic_handler
: builder
.take_panic_handler(),
247 deadlock_handler
: builder
.take_deadlock_handler(),
248 start_handler
: builder
.take_start_handler(),
249 exit_handler
: builder
.take_exit_handler(),
250 acquire_thread_handler
: builder
.take_acquire_thread_handler(),
251 release_thread_handler
: builder
.take_release_thread_handler(),
254 // If we return early or panic, make sure to terminate existing threads.
255 let t1000
= Terminator(®istry
);
257 for (index
, worker
) in workers
.into_iter().enumerate() {
258 let thread
= ThreadBuilder
{
259 name
: builder
.get_thread_name(index
),
260 stack_size
: builder
.get_stack_size(),
261 registry
: Arc
::clone(®istry
),
265 if let Err(e
) = builder
.get_spawn_handler().spawn(thread
) {
266 return Err(ThreadPoolBuildError
::new(ErrorKind
::IOError(e
)));
270 // Returning normally now, without termination.
276 pub fn current() -> Arc
<Registry
> {
278 let worker_thread
= WorkerThread
::current();
279 let registry
= if worker_thread
.is_null() {
282 &(*worker_thread
).registry
288 /// Returns the number of threads in the current registry. This
289 /// is better than `Registry::current().num_threads()` because it
290 /// avoids incrementing the `Arc`.
291 pub(super) fn current_num_threads() -> usize {
293 let worker_thread
= WorkerThread
::current();
294 if worker_thread
.is_null() {
295 global_registry().num_threads()
297 (*worker_thread
).registry
.num_threads()
302 /// Returns the current `WorkerThread` if it's part of this `Registry`.
303 pub(super) fn current_thread(&self) -> Option
<&WorkerThread
> {
305 let worker
= WorkerThread
::current().as_ref()?
;
306 if worker
.registry().id() == self.id() {
314 /// Returns an opaque identifier for this registry.
315 pub(super) fn id(&self) -> RegistryId
{
316 // We can rely on `self` not to change since we only ever create
317 // registries that are boxed up in an `Arc` (see `new()` above).
319 addr
: self as *const Self as usize,
324 pub(super) fn log(&self, event
: impl FnOnce() -> crate::log
::Event
) {
325 self.logger
.log(event
)
328 pub(super) fn num_threads(&self) -> usize {
329 self.thread_infos
.len()
332 pub(super) fn handle_panic(&self, err
: Box
<dyn Any
+ Send
>) {
333 match self.panic_handler
{
334 Some(ref handler
) => {
335 // If the customizable panic handler itself panics,
337 let abort_guard
= unwind
::AbortIfPanic
;
339 mem
::forget(abort_guard
);
342 // Default panic handler aborts.
343 let _
= unwind
::AbortIfPanic
; // let this drop.
348 /// Waits for the worker threads to get up and running. This is
349 /// meant to be used for benchmarking purposes, primarily, so that
350 /// you can get more consistent numbers by having everything
352 pub(super) fn wait_until_primed(&self) {
353 for info
in &self.thread_infos
{
358 /// Waits for the worker threads to stop. This is used for testing
359 /// -- so we can check that termination actually works.
360 pub(super) fn wait_until_stopped(&self) {
361 self.release_thread();
362 for info
in &self.thread_infos
{
365 self.acquire_thread();
368 pub(crate) fn acquire_thread(&self) {
369 if let Some(ref acquire_thread_handler
) = self.acquire_thread_handler
{
370 acquire_thread_handler();
374 pub(crate) fn release_thread(&self) {
375 if let Some(ref release_thread_handler
) = self.release_thread_handler
{
376 release_thread_handler();
380 /// ////////////////////////////////////////////////////////////////////////
383 /// So long as all of the worker threads are hanging out in their
384 /// top-level loop, there is no work to be done.
386 /// Push a job into the given `registry`. If we are running on a
387 /// worker thread for the registry, this will push onto the
388 /// deque. Else, it will inject from the outside (which is slower).
389 pub(super) fn inject_or_push(&self, job_ref
: JobRef
) {
390 let worker_thread
= WorkerThread
::current();
392 if !worker_thread
.is_null() && (*worker_thread
).registry().id() == self.id() {
393 (*worker_thread
).push(job_ref
);
395 self.inject(&[job_ref
]);
400 /// Push a job into the "external jobs" queue; it will be taken by
401 /// whatever worker has nothing to do. Use this is you know that
402 /// you are not on a worker of this registry.
403 pub(super) fn inject(&self, injected_jobs
: &[JobRef
]) {
404 self.log(|| JobsInjected
{
405 count
: injected_jobs
.len(),
408 // It should not be possible for `state.terminate` to be true
409 // here. It is only set to true when the user creates (and
410 // drops) a `ThreadPool`; and, in that case, they cannot be
411 // calling `inject()` later, since they dropped their
414 self.terminate_count
.load(Ordering
::Acquire
),
416 "inject() sees state.terminate as true"
419 let queue_was_empty
= self.injected_jobs
.is_empty();
421 for &job_ref
in injected_jobs
{
422 self.injected_jobs
.push(job_ref
);
426 .new_injected_jobs(usize::MAX
, injected_jobs
.len() as u32, queue_was_empty
);
429 pub(crate) fn has_injected_job(&self) -> bool
{
430 !self.injected_jobs
.is_empty()
433 fn pop_injected_job(&self, worker_index
: usize) -> Option
<JobRef
> {
435 match self.injected_jobs
.steal() {
436 Steal
::Success(job
) => {
437 self.log(|| JobUninjected
{
438 worker
: worker_index
,
442 Steal
::Empty
=> return None
,
448 /// If already in a worker-thread of this registry, just execute `op`.
449 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
450 /// completes and return its return value. If `op` panics, that panic will
451 /// be propagated as well. The second argument indicates `true` if injection
452 /// was performed, `false` if executed directly.
453 pub(super) fn in_worker
<OP
, R
>(&self, op
: OP
) -> R
455 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
459 let worker_thread
= WorkerThread
::current();
460 if worker_thread
.is_null() {
461 self.in_worker_cold(op
)
462 } else if (*worker_thread
).registry().id() != self.id() {
463 self.in_worker_cross(&*worker_thread
, op
)
465 // Perfectly valid to give them a `&T`: this is the
466 // current thread, so we know the data structure won't be
467 // invalidated until we return.
468 op(&*worker_thread
, false)
474 unsafe fn in_worker_cold
<OP
, R
>(&self, op
: OP
) -> R
476 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
479 thread_local
!(static LOCK_LATCH
: LockLatch
= LockLatch
::new());
481 LOCK_LATCH
.with(|l
| {
482 // This thread isn't a member of *any* thread pool, so just block.
483 debug_assert
!(WorkerThread
::current().is_null());
484 let job
= StackJob
::new(
487 let worker_thread
= WorkerThread
::current();
488 assert
!(injected
&& !worker_thread
.is_null());
489 op(&*worker_thread
, true)
493 self.inject(&[job
.as_job_ref()]);
494 self.release_thread();
495 job
.latch
.wait_and_reset(); // Make sure we can use the same latch again next time.
496 self.acquire_thread();
498 // flush accumulated logs as we exit the thread
499 self.logger
.log(|| Flush
);
506 unsafe fn in_worker_cross
<OP
, R
>(&self, current_thread
: &WorkerThread
, op
: OP
) -> R
508 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
511 // This thread is a member of a different pool, so let it process
512 // other work while waiting for this `op` to complete.
513 debug_assert
!(current_thread
.registry().id() != self.id());
514 let latch
= SpinLatch
::cross(current_thread
);
515 let job
= StackJob
::new(
518 let worker_thread
= WorkerThread
::current();
519 assert
!(injected
&& !worker_thread
.is_null());
520 op(&*worker_thread
, true)
524 self.inject(&[job
.as_job_ref()]);
525 current_thread
.wait_until(&job
.latch
);
529 /// Increments the terminate counter. This increment should be
530 /// balanced by a call to `terminate`, which will decrement. This
531 /// is used when spawning asynchronous work, which needs to
532 /// prevent the registry from terminating so long as it is active.
534 /// Note that blocking functions such as `join` and `scope` do not
535 /// need to concern themselves with this fn; their context is
536 /// responsible for ensuring the current thread-pool will not
537 /// terminate until they return.
539 /// The global thread-pool always has an outstanding reference
540 /// (the initial one). Custom thread-pools have one outstanding
541 /// reference that is dropped when the `ThreadPool` is dropped:
542 /// since installing the thread-pool blocks until any joins/scopes
543 /// complete, this ensures that joins/scopes are covered.
545 /// The exception is `::spawn()`, which can create a job outside
546 /// of any blocking scope. In that case, the job itself holds a
547 /// terminate count and is responsible for invoking `terminate()`
549 pub(super) fn increment_terminate_count(&self) {
550 let previous
= self.terminate_count
.fetch_add(1, Ordering
::AcqRel
);
551 debug_assert
!(previous
!= 0, "registry ref count incremented from zero");
553 previous
!= std
::usize::MAX
,
554 "overflow in registry ref count"
558 /// Signals that the thread-pool which owns this registry has been
559 /// dropped. The worker threads will gradually terminate, once any
560 /// extant work is completed.
561 pub(super) fn terminate(&self) {
562 if self.terminate_count
.fetch_sub(1, Ordering
::AcqRel
) == 1 {
563 for (i
, thread_info
) in self.thread_infos
.iter().enumerate() {
564 thread_info
.terminate
.set_and_tickle_one(self, i
);
569 /// Notify the worker that the latch they are sleeping on has been "set".
570 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index
: usize) {
571 self.sleep
.notify_worker_latch_is_set(target_worker_index
);
575 /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
576 /// if no other worker thread is active
578 pub fn mark_blocked() {
579 let worker_thread
= WorkerThread
::current();
580 assert
!(!worker_thread
.is_null());
582 let registry
= &(*worker_thread
).registry
;
583 registry
.sleep
.mark_blocked(®istry
.deadlock_handler
)
587 /// Mark a previously blocked Rayon worker thread as unblocked
589 pub fn mark_unblocked(registry
: &Registry
) {
590 registry
.sleep
.mark_unblocked()
593 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
594 pub(super) struct RegistryId
{
599 /// Latch set once thread has started and we are entering into the
600 /// main loop. Used to wait for worker threads to become primed,
601 /// primarily of interest for benchmarking.
604 /// Latch is set once worker thread has completed. Used to wait
605 /// until workers have stopped; only used for tests.
608 /// The latch used to signal that terminated has been requested.
609 /// This latch is *set* by the `terminate` method on the
610 /// `Registry`, once the registry's main "terminate" counter
613 /// NB. We use a `CountLatch` here because it has no lifetimes and is
614 /// meant for async use, but the count never gets higher than one.
615 terminate
: CountLatch
,
617 /// the "stealer" half of the worker's deque
618 stealer
: Stealer
<JobRef
>,
622 fn new(stealer
: Stealer
<JobRef
>) -> ThreadInfo
{
624 primed
: LockLatch
::new(),
625 stopped
: LockLatch
::new(),
626 terminate
: CountLatch
::new(),
632 /// ////////////////////////////////////////////////////////////////////////
633 /// WorkerThread identifiers
635 pub(super) struct WorkerThread
{
636 /// the "worker" half of our local deque
637 worker
: Worker
<JobRef
>,
639 /// local queue used for `spawn_fifo` indirection
642 pub(crate) index
: usize,
644 /// A weak random number generator.
647 pub(crate) registry
: Arc
<Registry
>,
650 // This is a bit sketchy, but basically: the WorkerThread is
651 // allocated on the stack of the worker on entry and stored into this
652 // thread local variable. So it will remain valid at least until the
653 // worker is fully unwound. Using an unsafe pointer avoids the need
654 // for a RefCell<T> etc.
656 static WORKER_THREAD_STATE
: Cell
<*const WorkerThread
> = Cell
::new(ptr
::null());
659 impl Drop
for WorkerThread
{
661 // Undo `set_current`
662 WORKER_THREAD_STATE
.with(|t
| {
663 assert
!(t
.get().eq(&(self as *const _
)));
670 /// Gets the `WorkerThread` index for the current thread; returns
671 /// NULL if this is not a worker thread. This pointer is valid
672 /// anywhere on the current thread.
674 pub(super) fn current() -> *const WorkerThread
{
675 WORKER_THREAD_STATE
.with(Cell
::get
)
678 /// Sets `self` as the worker thread index for the current thread.
679 /// This is done during worker thread startup.
680 unsafe fn set_current(thread
: *const WorkerThread
) {
681 WORKER_THREAD_STATE
.with(|t
| {
682 assert
!(t
.get().is_null());
687 /// Returns the registry that owns this worker thread.
689 pub(super) fn registry(&self) -> &Arc
<Registry
> {
694 pub(super) fn log(&self, event
: impl FnOnce() -> crate::log
::Event
) {
695 self.registry
.logger
.log(event
)
698 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
700 pub(super) fn index(&self) -> usize {
705 pub(super) unsafe fn push(&self, job
: JobRef
) {
706 self.log(|| JobPushed { worker: self.index }
);
707 let queue_was_empty
= self.worker
.is_empty();
708 self.worker
.push(job
);
711 .new_internal_jobs(self.index
, 1, queue_was_empty
);
715 pub(super) unsafe fn push_fifo(&self, job
: JobRef
) {
716 self.push(self.fifo
.push(job
));
720 pub(super) fn local_deque_is_empty(&self) -> bool
{
721 self.worker
.is_empty()
724 /// Attempts to obtain a "local" job -- typically this means
725 /// popping from the top of the stack, though if we are configured
726 /// for breadth-first execution, it would mean dequeuing from the
729 pub(super) unsafe fn take_local_job(&self) -> Option
<JobRef
> {
730 let popped_job
= self.worker
.pop();
732 if popped_job
.is_some() {
733 self.log(|| JobPopped { worker: self.index }
);
739 /// Wait until the latch is set. Try to keep busy by popping and
740 /// stealing tasks as necessary.
742 pub(super) unsafe fn wait_until
<L
: AsCoreLatch
+ ?Sized
>(&self, latch
: &L
) {
743 let latch
= latch
.as_core_latch();
745 self.wait_until_cold(latch
);
750 unsafe fn wait_until_cold(&self, latch
: &CoreLatch
) {
751 // the code below should swallow all panics and hence never
752 // unwind; but if something does wrong, we want to abort,
753 // because otherwise other code in rayon may assume that the
754 // latch has been signaled, and that can lead to random memory
755 // accesses, which would be *very bad*
756 let abort_guard
= unwind
::AbortIfPanic
;
758 let mut idle_state
= self.registry
.sleep
.start_looking(self.index
, latch
);
759 while !latch
.probe() {
760 // Try to find some work to do. We give preference first
761 // to things in our local deque, then in other workers
762 // deques, and finally to injected jobs from the
763 // outside. The idea is to finish what we started before
764 // we take on something new.
765 if let Some(job
) = self
767 .or_else(|| self.steal())
768 .or_else(|| self.registry
.pop_injected_job(self.index
))
770 self.registry
.sleep
.work_found(idle_state
);
772 idle_state
= self.registry
.sleep
.start_looking(self.index
, latch
);
776 .no_work_found(&mut idle_state
, latch
, &self.registry
)
780 // If we were sleepy, we are not anymore. We "found work" --
781 // whatever the surrounding thread was doing before it had to
783 self.registry
.sleep
.work_found(idle_state
);
785 self.log(|| ThreadSawLatchSet
{
787 latch_addr
: latch
.addr(),
789 mem
::forget(abort_guard
); // successful execution, do not abort
793 pub(super) unsafe fn execute(&self, job
: JobRef
) {
797 /// Try to steal a single job and return it.
799 /// This should only be done as a last resort, when there is no
800 /// local work to do.
801 unsafe fn steal(&self) -> Option
<JobRef
> {
802 // we only steal when we don't have any work to do locally
803 debug_assert
!(self.local_deque_is_empty());
805 // otherwise, try to steal
806 let thread_infos
= &self.registry
.thread_infos
.as_slice();
807 let num_threads
= thread_infos
.len();
808 if num_threads
<= 1 {
813 let mut retry
= false;
814 let start
= self.rng
.next_usize(num_threads
);
815 let job
= (start
..num_threads
)
817 .filter(move |&i
| i
!= self.index
)
818 .find_map(|victim_index
| {
819 let victim
= &thread_infos
[victim_index
];
820 match victim
.stealer
.steal() {
821 Steal
::Success(job
) => {
822 self.log(|| JobStolen
{
824 victim
: victim_index
,
828 Steal
::Empty
=> None
,
835 if job
.is_some() || !retry
{
842 /// ////////////////////////////////////////////////////////////////////////
844 unsafe fn main_loop(worker
: Worker
<JobRef
>, registry
: Arc
<Registry
>, index
: usize) {
845 let worker_thread
= &WorkerThread
{
847 fifo
: JobFifo
::new(),
849 rng
: XorShift64Star
::new(),
852 WorkerThread
::set_current(worker_thread
);
853 let registry
= &*worker_thread
.registry
;
855 // let registry know we are ready to do work
856 registry
.thread_infos
[index
].primed
.set();
858 // Worker threads should not panic. If they do, just abort, as the
859 // internal state of the threadpool is corrupted. Note that if
860 // **user code** panics, we should catch that and redirect.
861 let abort_guard
= unwind
::AbortIfPanic
;
863 // Inform a user callback that we started a thread.
864 if let Some(ref handler
) = registry
.start_handler
{
865 match unwind
::halt_unwinding(|| handler(index
)) {
868 registry
.handle_panic(err
);
873 let my_terminate_latch
= ®istry
.thread_infos
[index
].terminate
;
874 worker_thread
.log(|| ThreadStart
{
876 terminate_addr
: my_terminate_latch
.as_core_latch().addr(),
878 registry
.acquire_thread();
879 worker_thread
.wait_until(my_terminate_latch
);
881 // Should not be any work left in our queue.
882 debug_assert
!(worker_thread
.take_local_job().is_none());
884 // let registry know we are done
885 registry
.thread_infos
[index
].stopped
.set();
887 // Normal termination, do not abort.
888 mem
::forget(abort_guard
);
890 worker_thread
.log(|| ThreadTerminate { worker: index }
);
892 // Inform a user callback that we exited a thread.
893 if let Some(ref handler
) = registry
.exit_handler
{
894 match unwind
::halt_unwinding(|| handler(index
)) {
897 registry
.handle_panic(err
);
900 // We're already exiting the thread, there's nothing else to do.
903 registry
.release_thread();
906 /// If already in a worker-thread, just execute `op`. Otherwise,
907 /// execute `op` in the default thread-pool. Either way, block until
908 /// `op` completes and return its return value. If `op` panics, that
909 /// panic will be propagated as well. The second argument indicates
910 /// `true` if injection was performed, `false` if executed directly.
911 pub(super) fn in_worker
<OP
, R
>(op
: OP
) -> R
913 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
917 let owner_thread
= WorkerThread
::current();
918 if !owner_thread
.is_null() {
919 // Perfectly valid to give them a `&T`: this is the
920 // current thread, so we know the data structure won't be
921 // invalidated until we return.
922 op(&*owner_thread
, false)
924 global_registry().in_worker_cold(op
)
929 /// [xorshift*] is a fast pseudorandom number generator which will
930 /// even tolerate weak seeding, as long as it's not zero.
932 /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
933 struct XorShift64Star
{
937 impl XorShift64Star
{
939 // Any non-zero seed will do -- this uses the hash of a global counter.
942 let mut hasher
= DefaultHasher
::new();
944 static COUNTER
: AtomicUsize
= ATOMIC_USIZE_INIT
;
945 hasher
.write_usize(COUNTER
.fetch_add(1, Ordering
::Relaxed
));
946 seed
= hasher
.finish();
950 state
: Cell
::new(seed
),
954 fn next(&self) -> u64 {
955 let mut x
= self.state
.get();
956 debug_assert_ne
!(x
, 0);
961 x
.wrapping_mul(0x2545_f491_4f6c_dd1d)
964 /// Return a value from `0..n`.
965 fn next_usize(&self, n
: usize) -> usize {
966 (self.next() % n
as u64) as usize