1 use crate::job
::{JobFifo, JobRef, StackJob}
;
2 use crate::latch
::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch}
;
3 use crate::log
::Event
::*;
4 use crate::log
::Logger
;
5 use crate::sleep
::Sleep
;
8 ErrorKind
, ExitHandler
, PanicHandler
, StartHandler
, ThreadPoolBuildError
, ThreadPoolBuilder
,
10 use crossbeam_deque
::{Injector, Steal, Stealer, Worker}
;
13 use std
::collections
::hash_map
::DefaultHasher
;
15 use std
::hash
::Hasher
;
20 use std
::sync
::atomic
::ATOMIC_USIZE_INIT
;
21 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
22 use std
::sync
::{Arc, Once}
;
26 /// Thread builder used for customization via
27 /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
28 pub struct ThreadBuilder
{
30 stack_size
: Option
<usize>,
31 worker
: Worker
<JobRef
>,
32 registry
: Arc
<Registry
>,
37 /// Gets the index of this thread in the pool, within `0..num_threads`.
38 pub fn index(&self) -> usize {
42 /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
43 pub fn name(&self) -> Option
<&str> {
44 self.name
.as_ref().map(String
::as_str
)
47 /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
48 pub fn stack_size(&self) -> Option
<usize> {
52 /// Executes the main loop for this thread. This will not return until the
53 /// thread pool is dropped.
55 unsafe { main_loop(self.worker, self.registry, self.index) }
59 impl fmt
::Debug
for ThreadBuilder
{
60 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
61 f
.debug_struct("ThreadBuilder")
62 .field("pool", &self.registry
.id())
63 .field("index", &self.index
)
64 .field("name", &self.name
)
65 .field("stack_size", &self.stack_size
)
70 /// Generalized trait for spawning a thread in the `Registry`.
72 /// This trait is pub-in-private -- E0445 forces us to make it public,
73 /// but we don't actually want to expose these details in the API.
74 pub trait ThreadSpawn
{
77 /// Spawn a thread with the `ThreadBuilder` parameters, and then
78 /// call `ThreadBuilder::run()`.
79 fn spawn(&mut self, thread
: ThreadBuilder
) -> io
::Result
<()>;
82 /// Spawns a thread in the "normal" way with `std::thread::Builder`.
84 /// This type is pub-in-private -- E0445 forces us to make it public,
85 /// but we don't actually want to expose these details in the API.
86 #[derive(Debug, Default)]
87 pub struct DefaultSpawn
;
89 impl ThreadSpawn
for DefaultSpawn
{
92 fn spawn(&mut self, thread
: ThreadBuilder
) -> io
::Result
<()> {
93 let mut b
= thread
::Builder
::new();
94 if let Some(name
) = thread
.name() {
95 b
= b
.name(name
.to_owned());
97 if let Some(stack_size
) = thread
.stack_size() {
98 b
= b
.stack_size(stack_size
);
100 b
.spawn(|| thread
.run())?
;
105 /// Spawns a thread with a user's custom callback.
107 /// This type is pub-in-private -- E0445 forces us to make it public,
108 /// but we don't actually want to expose these details in the API.
110 pub struct CustomSpawn
<F
>(F
);
112 impl<F
> CustomSpawn
<F
>
114 F
: FnMut(ThreadBuilder
) -> io
::Result
<()>,
116 pub(super) fn new(spawn
: F
) -> Self {
121 impl<F
> ThreadSpawn
for CustomSpawn
<F
>
123 F
: FnMut(ThreadBuilder
) -> io
::Result
<()>,
128 fn spawn(&mut self, thread
: ThreadBuilder
) -> io
::Result
<()> {
133 pub(super) struct Registry
{
135 thread_infos
: Vec
<ThreadInfo
>,
137 injected_jobs
: Injector
<JobRef
>,
138 panic_handler
: Option
<Box
<PanicHandler
>>,
139 start_handler
: Option
<Box
<StartHandler
>>,
140 exit_handler
: Option
<Box
<ExitHandler
>>,
142 // When this latch reaches 0, it means that all work on this
143 // registry must be complete. This is ensured in the following ways:
145 // - if this is the global registry, there is a ref-count that never
147 // - if this is a user-created thread-pool, then so long as the thread-pool
148 // exists, it holds a reference.
149 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
150 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
151 // return until the blocking job is complete, that ref will continue to be held.
152 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
153 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
154 // and that job will keep the pool alive.
155 terminate_count
: AtomicUsize
,
158 /// ////////////////////////////////////////////////////////////////////////
161 static mut THE_REGISTRY
: Option
<Arc
<Registry
>> = None
;
162 static THE_REGISTRY_SET
: Once
= Once
::new();
164 /// Starts the worker threads (if that has not already happened). If
165 /// initialization has not already occurred, use the default
167 pub(super) fn global_registry() -> &'
static Arc
<Registry
> {
168 set_global_registry(|| Registry
::new(ThreadPoolBuilder
::new()))
169 .or_else(|err
| unsafe { THE_REGISTRY.as_ref().ok_or(err) }
)
170 .expect("The global thread pool has not been initialized.")
173 /// Starts the worker threads (if that has not already happened) with
174 /// the given builder.
175 pub(super) fn init_global_registry
<S
>(
176 builder
: ThreadPoolBuilder
<S
>,
177 ) -> Result
<&'
static Arc
<Registry
>, ThreadPoolBuildError
>
181 set_global_registry(|| Registry
::new(builder
))
184 /// Starts the worker threads (if that has not already happened)
185 /// by creating a registry with the given callback.
186 fn set_global_registry
<F
>(registry
: F
) -> Result
<&'
static Arc
<Registry
>, ThreadPoolBuildError
>
188 F
: FnOnce() -> Result
<Arc
<Registry
>, ThreadPoolBuildError
>,
190 let mut result
= Err(ThreadPoolBuildError
::new(
191 ErrorKind
::GlobalPoolAlreadyInitialized
,
194 THE_REGISTRY_SET
.call_once(|| {
196 .map(|registry
: Arc
<Registry
>| unsafe { &*THE_REGISTRY.get_or_insert(registry) }
)
202 struct Terminator
<'a
>(&'a Arc
<Registry
>);
204 impl<'a
> Drop
for Terminator
<'a
> {
211 pub(super) fn new
<S
>(
212 mut builder
: ThreadPoolBuilder
<S
>,
213 ) -> Result
<Arc
<Self>, ThreadPoolBuildError
>
217 // Soft-limit the number of threads that we can actually support.
218 let n_threads
= Ord
::min(builder
.get_num_threads(), crate::max_num_threads());
220 let breadth_first
= builder
.get_breadth_first();
222 let (workers
, stealers
): (Vec
<_
>, Vec
<_
>) = (0..n_threads
)
224 let worker
= if breadth_first
{
230 let stealer
= worker
.stealer();
235 let logger
= Logger
::new(n_threads
);
236 let registry
= Arc
::new(Registry
{
237 logger
: logger
.clone(),
238 thread_infos
: stealers
.into_iter().map(ThreadInfo
::new
).collect(),
239 sleep
: Sleep
::new(logger
, n_threads
),
240 injected_jobs
: Injector
::new(),
241 terminate_count
: AtomicUsize
::new(1),
242 panic_handler
: builder
.take_panic_handler(),
243 start_handler
: builder
.take_start_handler(),
244 exit_handler
: builder
.take_exit_handler(),
247 // If we return early or panic, make sure to terminate existing threads.
248 let t1000
= Terminator(®istry
);
250 for (index
, worker
) in workers
.into_iter().enumerate() {
251 let thread
= ThreadBuilder
{
252 name
: builder
.get_thread_name(index
),
253 stack_size
: builder
.get_stack_size(),
254 registry
: Arc
::clone(®istry
),
258 if let Err(e
) = builder
.get_spawn_handler().spawn(thread
) {
259 return Err(ThreadPoolBuildError
::new(ErrorKind
::IOError(e
)));
263 // Returning normally now, without termination.
269 pub(super) fn current() -> Arc
<Registry
> {
271 let worker_thread
= WorkerThread
::current();
272 let registry
= if worker_thread
.is_null() {
275 &(*worker_thread
).registry
281 /// Returns the number of threads in the current registry. This
282 /// is better than `Registry::current().num_threads()` because it
283 /// avoids incrementing the `Arc`.
284 pub(super) fn current_num_threads() -> usize {
286 let worker_thread
= WorkerThread
::current();
287 if worker_thread
.is_null() {
288 global_registry().num_threads()
290 (*worker_thread
).registry
.num_threads()
295 /// Returns the current `WorkerThread` if it's part of this `Registry`.
296 pub(super) fn current_thread(&self) -> Option
<&WorkerThread
> {
298 let worker
= WorkerThread
::current().as_ref()?
;
299 if worker
.registry().id() == self.id() {
307 /// Returns an opaque identifier for this registry.
308 pub(super) fn id(&self) -> RegistryId
{
309 // We can rely on `self` not to change since we only ever create
310 // registries that are boxed up in an `Arc` (see `new()` above).
312 addr
: self as *const Self as usize,
317 pub(super) fn log(&self, event
: impl FnOnce() -> crate::log
::Event
) {
318 self.logger
.log(event
)
321 pub(super) fn num_threads(&self) -> usize {
322 self.thread_infos
.len()
325 pub(super) fn handle_panic(&self, err
: Box
<dyn Any
+ Send
>) {
326 match self.panic_handler
{
327 Some(ref handler
) => {
328 // If the customizable panic handler itself panics,
330 let abort_guard
= unwind
::AbortIfPanic
;
332 mem
::forget(abort_guard
);
335 // Default panic handler aborts.
336 let _
= unwind
::AbortIfPanic
; // let this drop.
341 /// Waits for the worker threads to get up and running. This is
342 /// meant to be used for benchmarking purposes, primarily, so that
343 /// you can get more consistent numbers by having everything
345 pub(super) fn wait_until_primed(&self) {
346 for info
in &self.thread_infos
{
351 /// Waits for the worker threads to stop. This is used for testing
352 /// -- so we can check that termination actually works.
354 pub(super) fn wait_until_stopped(&self) {
355 for info
in &self.thread_infos
{
360 /// ////////////////////////////////////////////////////////////////////////
363 /// So long as all of the worker threads are hanging out in their
364 /// top-level loop, there is no work to be done.
366 /// Push a job into the given `registry`. If we are running on a
367 /// worker thread for the registry, this will push onto the
368 /// deque. Else, it will inject from the outside (which is slower).
369 pub(super) fn inject_or_push(&self, job_ref
: JobRef
) {
370 let worker_thread
= WorkerThread
::current();
372 if !worker_thread
.is_null() && (*worker_thread
).registry().id() == self.id() {
373 (*worker_thread
).push(job_ref
);
375 self.inject(&[job_ref
]);
380 /// Push a job into the "external jobs" queue; it will be taken by
381 /// whatever worker has nothing to do. Use this is you know that
382 /// you are not on a worker of this registry.
383 pub(super) fn inject(&self, injected_jobs
: &[JobRef
]) {
384 self.log(|| JobsInjected
{
385 count
: injected_jobs
.len(),
388 // It should not be possible for `state.terminate` to be true
389 // here. It is only set to true when the user creates (and
390 // drops) a `ThreadPool`; and, in that case, they cannot be
391 // calling `inject()` later, since they dropped their
394 self.terminate_count
.load(Ordering
::Acquire
),
396 "inject() sees state.terminate as true"
399 let queue_was_empty
= self.injected_jobs
.is_empty();
401 for &job_ref
in injected_jobs
{
402 self.injected_jobs
.push(job_ref
);
406 .new_injected_jobs(usize::MAX
, injected_jobs
.len() as u32, queue_was_empty
);
409 fn has_injected_job(&self) -> bool
{
410 !self.injected_jobs
.is_empty()
413 fn pop_injected_job(&self, worker_index
: usize) -> Option
<JobRef
> {
415 match self.injected_jobs
.steal() {
416 Steal
::Success(job
) => {
417 self.log(|| JobUninjected
{
418 worker
: worker_index
,
422 Steal
::Empty
=> return None
,
428 /// If already in a worker-thread of this registry, just execute `op`.
429 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
430 /// completes and return its return value. If `op` panics, that panic will
431 /// be propagated as well. The second argument indicates `true` if injection
432 /// was performed, `false` if executed directly.
433 pub(super) fn in_worker
<OP
, R
>(&self, op
: OP
) -> R
435 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
439 let worker_thread
= WorkerThread
::current();
440 if worker_thread
.is_null() {
441 self.in_worker_cold(op
)
442 } else if (*worker_thread
).registry().id() != self.id() {
443 self.in_worker_cross(&*worker_thread
, op
)
445 // Perfectly valid to give them a `&T`: this is the
446 // current thread, so we know the data structure won't be
447 // invalidated until we return.
448 op(&*worker_thread
, false)
454 unsafe fn in_worker_cold
<OP
, R
>(&self, op
: OP
) -> R
456 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
459 thread_local
!(static LOCK_LATCH
: LockLatch
= LockLatch
::new());
461 LOCK_LATCH
.with(|l
| {
462 // This thread isn't a member of *any* thread pool, so just block.
463 debug_assert
!(WorkerThread
::current().is_null());
464 let job
= StackJob
::new(
466 let worker_thread
= WorkerThread
::current();
467 assert
!(injected
&& !worker_thread
.is_null());
468 op(&*worker_thread
, true)
472 self.inject(&[job
.as_job_ref()]);
473 job
.latch
.wait_and_reset(); // Make sure we can use the same latch again next time.
475 // flush accumulated logs as we exit the thread
476 self.logger
.log(|| Flush
);
483 unsafe fn in_worker_cross
<OP
, R
>(&self, current_thread
: &WorkerThread
, op
: OP
) -> R
485 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
488 // This thread is a member of a different pool, so let it process
489 // other work while waiting for this `op` to complete.
490 debug_assert
!(current_thread
.registry().id() != self.id());
491 let latch
= SpinLatch
::cross(current_thread
);
492 let job
= StackJob
::new(
494 let worker_thread
= WorkerThread
::current();
495 assert
!(injected
&& !worker_thread
.is_null());
496 op(&*worker_thread
, true)
500 self.inject(&[job
.as_job_ref()]);
501 current_thread
.wait_until(&job
.latch
);
505 /// Increments the terminate counter. This increment should be
506 /// balanced by a call to `terminate`, which will decrement. This
507 /// is used when spawning asynchronous work, which needs to
508 /// prevent the registry from terminating so long as it is active.
510 /// Note that blocking functions such as `join` and `scope` do not
511 /// need to concern themselves with this fn; their context is
512 /// responsible for ensuring the current thread-pool will not
513 /// terminate until they return.
515 /// The global thread-pool always has an outstanding reference
516 /// (the initial one). Custom thread-pools have one outstanding
517 /// reference that is dropped when the `ThreadPool` is dropped:
518 /// since installing the thread-pool blocks until any joins/scopes
519 /// complete, this ensures that joins/scopes are covered.
521 /// The exception is `::spawn()`, which can create a job outside
522 /// of any blocking scope. In that case, the job itself holds a
523 /// terminate count and is responsible for invoking `terminate()`
525 pub(super) fn increment_terminate_count(&self) {
526 let previous
= self.terminate_count
.fetch_add(1, Ordering
::AcqRel
);
527 debug_assert
!(previous
!= 0, "registry ref count incremented from zero");
529 previous
!= std
::usize::MAX
,
530 "overflow in registry ref count"
534 /// Signals that the thread-pool which owns this registry has been
535 /// dropped. The worker threads will gradually terminate, once any
536 /// extant work is completed.
537 pub(super) fn terminate(&self) {
538 if self.terminate_count
.fetch_sub(1, Ordering
::AcqRel
) == 1 {
539 for (i
, thread_info
) in self.thread_infos
.iter().enumerate() {
540 thread_info
.terminate
.set_and_tickle_one(self, i
);
545 /// Notify the worker that the latch they are sleeping on has been "set".
546 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index
: usize) {
547 self.sleep
.notify_worker_latch_is_set(target_worker_index
);
551 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
552 pub(super) struct RegistryId
{
557 /// Latch set once thread has started and we are entering into the
558 /// main loop. Used to wait for worker threads to become primed,
559 /// primarily of interest for benchmarking.
562 /// Latch is set once worker thread has completed. Used to wait
563 /// until workers have stopped; only used for tests.
566 /// The latch used to signal that terminated has been requested.
567 /// This latch is *set* by the `terminate` method on the
568 /// `Registry`, once the registry's main "terminate" counter
571 /// NB. We use a `CountLatch` here because it has no lifetimes and is
572 /// meant for async use, but the count never gets higher than one.
573 terminate
: CountLatch
,
575 /// the "stealer" half of the worker's deque
576 stealer
: Stealer
<JobRef
>,
580 fn new(stealer
: Stealer
<JobRef
>) -> ThreadInfo
{
582 primed
: LockLatch
::new(),
583 stopped
: LockLatch
::new(),
584 terminate
: CountLatch
::new(),
590 /// ////////////////////////////////////////////////////////////////////////
591 /// WorkerThread identifiers
593 pub(super) struct WorkerThread
{
594 /// the "worker" half of our local deque
595 worker
: Worker
<JobRef
>,
597 /// local queue used for `spawn_fifo` indirection
602 /// A weak random number generator.
605 registry
: Arc
<Registry
>,
608 // This is a bit sketchy, but basically: the WorkerThread is
609 // allocated on the stack of the worker on entry and stored into this
610 // thread local variable. So it will remain valid at least until the
611 // worker is fully unwound. Using an unsafe pointer avoids the need
612 // for a RefCell<T> etc.
614 static WORKER_THREAD_STATE
: Cell
<*const WorkerThread
> = Cell
::new(ptr
::null());
617 impl Drop
for WorkerThread
{
619 // Undo `set_current`
620 WORKER_THREAD_STATE
.with(|t
| {
621 assert
!(t
.get().eq(&(self as *const _
)));
628 /// Gets the `WorkerThread` index for the current thread; returns
629 /// NULL if this is not a worker thread. This pointer is valid
630 /// anywhere on the current thread.
632 pub(super) fn current() -> *const WorkerThread
{
633 WORKER_THREAD_STATE
.with(Cell
::get
)
636 /// Sets `self` as the worker thread index for the current thread.
637 /// This is done during worker thread startup.
638 unsafe fn set_current(thread
: *const WorkerThread
) {
639 WORKER_THREAD_STATE
.with(|t
| {
640 assert
!(t
.get().is_null());
645 /// Returns the registry that owns this worker thread.
647 pub(super) fn registry(&self) -> &Arc
<Registry
> {
652 pub(super) fn log(&self, event
: impl FnOnce() -> crate::log
::Event
) {
653 self.registry
.logger
.log(event
)
656 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
658 pub(super) fn index(&self) -> usize {
663 pub(super) unsafe fn push(&self, job
: JobRef
) {
664 self.log(|| JobPushed { worker: self.index }
);
665 let queue_was_empty
= self.worker
.is_empty();
666 self.worker
.push(job
);
669 .new_internal_jobs(self.index
, 1, queue_was_empty
);
673 pub(super) unsafe fn push_fifo(&self, job
: JobRef
) {
674 self.push(self.fifo
.push(job
));
678 pub(super) fn local_deque_is_empty(&self) -> bool
{
679 self.worker
.is_empty()
682 /// Attempts to obtain a "local" job -- typically this means
683 /// popping from the top of the stack, though if we are configured
684 /// for breadth-first execution, it would mean dequeuing from the
687 pub(super) unsafe fn take_local_job(&self) -> Option
<JobRef
> {
688 let popped_job
= self.worker
.pop();
690 if popped_job
.is_some() {
691 self.log(|| JobPopped { worker: self.index }
);
697 /// Wait until the latch is set. Try to keep busy by popping and
698 /// stealing tasks as necessary.
700 pub(super) unsafe fn wait_until
<L
: AsCoreLatch
+ ?Sized
>(&self, latch
: &L
) {
701 let latch
= latch
.as_core_latch();
703 self.wait_until_cold(latch
);
708 unsafe fn wait_until_cold(&self, latch
: &CoreLatch
) {
709 // the code below should swallow all panics and hence never
710 // unwind; but if something does wrong, we want to abort,
711 // because otherwise other code in rayon may assume that the
712 // latch has been signaled, and that can lead to random memory
713 // accesses, which would be *very bad*
714 let abort_guard
= unwind
::AbortIfPanic
;
716 let mut idle_state
= self.registry
.sleep
.start_looking(self.index
, latch
);
717 while !latch
.probe() {
718 // Try to find some work to do. We give preference first
719 // to things in our local deque, then in other workers
720 // deques, and finally to injected jobs from the
721 // outside. The idea is to finish what we started before
722 // we take on something new.
723 if let Some(job
) = self
725 .or_else(|| self.steal())
726 .or_else(|| self.registry
.pop_injected_job(self.index
))
728 self.registry
.sleep
.work_found(idle_state
);
730 idle_state
= self.registry
.sleep
.start_looking(self.index
, latch
);
734 .no_work_found(&mut idle_state
, latch
, || self.registry
.has_injected_job())
738 // If we were sleepy, we are not anymore. We "found work" --
739 // whatever the surrounding thread was doing before it had to
741 self.registry
.sleep
.work_found(idle_state
);
743 self.log(|| ThreadSawLatchSet
{
745 latch_addr
: latch
.addr(),
747 mem
::forget(abort_guard
); // successful execution, do not abort
751 pub(super) unsafe fn execute(&self, job
: JobRef
) {
755 /// Try to steal a single job and return it.
757 /// This should only be done as a last resort, when there is no
758 /// local work to do.
759 unsafe fn steal(&self) -> Option
<JobRef
> {
760 // we only steal when we don't have any work to do locally
761 debug_assert
!(self.local_deque_is_empty());
763 // otherwise, try to steal
764 let thread_infos
= &self.registry
.thread_infos
.as_slice();
765 let num_threads
= thread_infos
.len();
766 if num_threads
<= 1 {
771 let mut retry
= false;
772 let start
= self.rng
.next_usize(num_threads
);
773 let job
= (start
..num_threads
)
775 .filter(move |&i
| i
!= self.index
)
776 .find_map(|victim_index
| {
777 let victim
= &thread_infos
[victim_index
];
778 match victim
.stealer
.steal() {
779 Steal
::Success(job
) => {
780 self.log(|| JobStolen
{
782 victim
: victim_index
,
786 Steal
::Empty
=> None
,
793 if job
.is_some() || !retry
{
800 /// ////////////////////////////////////////////////////////////////////////
802 unsafe fn main_loop(worker
: Worker
<JobRef
>, registry
: Arc
<Registry
>, index
: usize) {
803 let worker_thread
= &WorkerThread
{
805 fifo
: JobFifo
::new(),
807 rng
: XorShift64Star
::new(),
810 WorkerThread
::set_current(worker_thread
);
811 let registry
= &*worker_thread
.registry
;
813 // let registry know we are ready to do work
814 registry
.thread_infos
[index
].primed
.set();
816 // Worker threads should not panic. If they do, just abort, as the
817 // internal state of the threadpool is corrupted. Note that if
818 // **user code** panics, we should catch that and redirect.
819 let abort_guard
= unwind
::AbortIfPanic
;
821 // Inform a user callback that we started a thread.
822 if let Some(ref handler
) = registry
.start_handler
{
823 match unwind
::halt_unwinding(|| handler(index
)) {
826 registry
.handle_panic(err
);
831 let my_terminate_latch
= ®istry
.thread_infos
[index
].terminate
;
832 worker_thread
.log(|| ThreadStart
{
834 terminate_addr
: my_terminate_latch
.as_core_latch().addr(),
836 worker_thread
.wait_until(my_terminate_latch
);
838 // Should not be any work left in our queue.
839 debug_assert
!(worker_thread
.take_local_job().is_none());
841 // let registry know we are done
842 registry
.thread_infos
[index
].stopped
.set();
844 // Normal termination, do not abort.
845 mem
::forget(abort_guard
);
847 worker_thread
.log(|| ThreadTerminate { worker: index }
);
849 // Inform a user callback that we exited a thread.
850 if let Some(ref handler
) = registry
.exit_handler
{
851 match unwind
::halt_unwinding(|| handler(index
)) {
854 registry
.handle_panic(err
);
857 // We're already exiting the thread, there's nothing else to do.
861 /// If already in a worker-thread, just execute `op`. Otherwise,
862 /// execute `op` in the default thread-pool. Either way, block until
863 /// `op` completes and return its return value. If `op` panics, that
864 /// panic will be propagated as well. The second argument indicates
865 /// `true` if injection was performed, `false` if executed directly.
866 pub(super) fn in_worker
<OP
, R
>(op
: OP
) -> R
868 OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
,
872 let owner_thread
= WorkerThread
::current();
873 if !owner_thread
.is_null() {
874 // Perfectly valid to give them a `&T`: this is the
875 // current thread, so we know the data structure won't be
876 // invalidated until we return.
877 op(&*owner_thread
, false)
879 global_registry().in_worker_cold(op
)
884 /// [xorshift*] is a fast pseudorandom number generator which will
885 /// even tolerate weak seeding, as long as it's not zero.
887 /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
888 struct XorShift64Star
{
892 impl XorShift64Star
{
894 // Any non-zero seed will do -- this uses the hash of a global counter.
897 let mut hasher
= DefaultHasher
::new();
899 static COUNTER
: AtomicUsize
= ATOMIC_USIZE_INIT
;
900 hasher
.write_usize(COUNTER
.fetch_add(1, Ordering
::Relaxed
));
901 seed
= hasher
.finish();
905 state
: Cell
::new(seed
),
909 fn next(&self) -> u64 {
910 let mut x
= self.state
.get();
911 debug_assert_ne
!(x
, 0);
916 x
.wrapping_mul(0x2545_f491_4f6c_dd1d)
919 /// Return a value from `0..n`.
920 fn next_usize(&self, n
: usize) -> usize {
921 (self.next() % n
as u64) as usize