1 use ::{ExitHandler, PanicHandler, StartHandler, ThreadPoolBuilder, ThreadPoolBuildError, ErrorKind}
;
2 use crossbeam_deque
::{Deque, Steal, Stealer}
;
3 use job
::{JobRef, StackJob}
;
7 use internal
::task
::Task
;
8 use latch
::{LatchProbe, Latch, CountLatch, LockLatch, SpinLatch, TickleLatch}
;
13 use std
::collections
::hash_map
::DefaultHasher
;
14 use std
::hash
::Hasher
;
15 use std
::sync
::{Arc, Mutex, Once, ONCE_INIT}
;
16 use std
::sync
::atomic
::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}
;
24 thread_infos
: Vec
<ThreadInfo
>,
25 state
: Mutex
<RegistryState
>,
27 job_uninjector
: Stealer
<JobRef
>,
28 panic_handler
: Option
<Box
<PanicHandler
>>,
29 start_handler
: Option
<Box
<StartHandler
>>,
30 exit_handler
: Option
<Box
<ExitHandler
>>,
32 // When this latch reaches 0, it means that all work on this
33 // registry must be complete. This is ensured in the following ways:
35 // - if this is the global registry, there is a ref-count that never
37 // - if this is a user-created thread-pool, then so long as the thread-pool
38 // exists, it holds a reference.
39 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
40 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
41 // return until the blocking job is complete, that ref will continue to be held.
42 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
43 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
44 // and that job will keep the pool alive.
45 terminate_latch
: CountLatch
,
48 struct RegistryState
{
49 job_injector
: Deque
<JobRef
>,
52 /// ////////////////////////////////////////////////////////////////////////
55 static mut THE_REGISTRY
: Option
<&'
static Arc
<Registry
>> = None
;
56 static THE_REGISTRY_SET
: Once
= ONCE_INIT
;
58 /// Starts the worker threads (if that has not already happened). If
59 /// initialization has not already occurred, use the default
61 fn global_registry() -> &'
static Arc
<Registry
> {
62 THE_REGISTRY_SET
.call_once(|| unsafe { init_registry(ThreadPoolBuilder::new()).unwrap() }
);
63 unsafe { THE_REGISTRY.expect("The global thread pool has not been initialized.") }
66 /// Starts the worker threads (if that has not already happened) with
67 /// the given builder.
68 pub fn init_global_registry(builder
: ThreadPoolBuilder
) -> Result
<&'
static Registry
, ThreadPoolBuildError
> {
69 let mut called
= false;
70 let mut init_result
= Ok(());;
71 THE_REGISTRY_SET
.call_once(|| unsafe {
72 init_result
= init_registry(builder
);
76 init_result
.map(|()| &**global_registry())
78 Err(ThreadPoolBuildError
::new(ErrorKind
::GlobalPoolAlreadyInitialized
))
82 /// Initializes the global registry with the given builder.
83 /// Meant to be called from within the `THE_REGISTRY_SET` once
84 /// function. Declared `unsafe` because it writes to `THE_REGISTRY` in
85 /// an unsynchronized fashion.
86 unsafe fn init_registry(builder
: ThreadPoolBuilder
) -> Result
<(), ThreadPoolBuildError
> {
87 Registry
::new(builder
).map(|registry
| THE_REGISTRY
= Some(leak(registry
)))
90 struct Terminator
<'a
>(&'a Arc
<Registry
>);
92 impl<'a
> Drop
for Terminator
<'a
> {
99 pub fn new(mut builder
: ThreadPoolBuilder
) -> Result
<Arc
<Registry
>, ThreadPoolBuildError
> {
100 let n_threads
= builder
.get_num_threads();
101 let breadth_first
= builder
.get_breadth_first();
103 let inj_worker
= Deque
::new();
104 let inj_stealer
= inj_worker
.stealer();
105 let workers
: Vec
<_
> = (0..n_threads
)
106 .map(|_
| Deque
::new())
108 let stealers
: Vec
<_
> = workers
.iter().map(|d
| d
.stealer()).collect();
110 let registry
= Arc
::new(Registry
{
111 thread_infos
: stealers
.into_iter()
112 .map(|s
| ThreadInfo
::new(s
))
114 state
: Mutex
::new(RegistryState
::new(inj_worker
)),
116 job_uninjector
: inj_stealer
,
117 terminate_latch
: CountLatch
::new(),
118 panic_handler
: builder
.take_panic_handler(),
119 start_handler
: builder
.take_start_handler(),
120 exit_handler
: builder
.take_exit_handler(),
123 // If we return early or panic, make sure to terminate existing threads.
124 let t1000
= Terminator(®istry
);
126 for (index
, worker
) in workers
.into_iter().enumerate() {
127 let registry
= registry
.clone();
128 let mut b
= thread
::Builder
::new();
129 if let Some(name
) = builder
.get_thread_name(index
) {
132 if let Some(stack_size
) = builder
.get_stack_size() {
133 b
= b
.stack_size(stack_size
);
135 if let Err(e
) = b
.spawn(move || unsafe { main_loop(worker, registry, index, breadth_first) }
) {
136 return Err(ThreadPoolBuildError
::new(ErrorKind
::IOError(e
)))
140 // Returning normally now, without termination.
146 #[cfg(rayon_unstable)]
147 pub fn global() -> Arc
<Registry
> {
148 global_registry().clone()
151 pub fn current() -> Arc
<Registry
> {
153 let worker_thread
= WorkerThread
::current();
154 if worker_thread
.is_null() {
155 global_registry().clone()
157 (*worker_thread
).registry
.clone()
162 /// Returns the number of threads in the current registry. This
163 /// is better than `Registry::current().num_threads()` because it
164 /// avoids incrementing the `Arc`.
165 pub fn current_num_threads() -> usize {
167 let worker_thread
= WorkerThread
::current();
168 if worker_thread
.is_null() {
169 global_registry().num_threads()
171 (*worker_thread
).registry
.num_threads()
177 /// Returns an opaque identifier for this registry.
178 pub fn id(&self) -> RegistryId
{
179 // We can rely on `self` not to change since we only ever create
180 // registries that are boxed up in an `Arc` (see `new()` above).
181 RegistryId { addr: self as *const Self as usize }
184 pub fn num_threads(&self) -> usize {
185 self.thread_infos
.len()
188 pub fn handle_panic(&self, err
: Box
<Any
+ Send
>) {
189 match self.panic_handler
{
190 Some(ref handler
) => {
191 // If the customizable panic handler itself panics,
193 let abort_guard
= unwind
::AbortIfPanic
;
195 mem
::forget(abort_guard
);
198 // Default panic handler aborts.
199 let _
= unwind
::AbortIfPanic
; // let this drop.
204 /// Waits for the worker threads to get up and running. This is
205 /// meant to be used for benchmarking purposes, primarily, so that
206 /// you can get more consistent numbers by having everything
208 pub fn wait_until_primed(&self) {
209 for info
in &self.thread_infos
{
214 /// Waits for the worker threads to stop. This is used for testing
215 /// -- so we can check that termination actually works.
217 pub fn wait_until_stopped(&self) {
218 for info
in &self.thread_infos
{
223 /// ////////////////////////////////////////////////////////////////////////
226 /// So long as all of the worker threads are hanging out in their
227 /// top-level loop, there is no work to be done.
229 /// Push a job into the given `registry`. If we are running on a
230 /// worker thread for the registry, this will push onto the
231 /// deque. Else, it will inject from the outside (which is slower).
232 pub fn inject_or_push(&self, job_ref
: JobRef
) {
233 let worker_thread
= WorkerThread
::current();
235 if !worker_thread
.is_null() && (*worker_thread
).registry().id() == self.id() {
236 (*worker_thread
).push(job_ref
);
238 self.inject(&[job_ref
]);
243 /// Unsafe: the caller must guarantee that `task` will stay valid
244 /// until it executes.
245 #[cfg(rayon_unstable)]
246 pub unsafe fn submit_task
<T
>(&self, task
: Arc
<T
>)
249 let task_job
= TaskJob
::new(task
);
250 let task_job_ref
= TaskJob
::into_job_ref(task_job
);
251 return self.inject_or_push(task_job_ref
);
253 /// A little newtype wrapper for `T`, just because I did not
254 /// want to implement `Job` for all `T: Task`.
255 struct TaskJob
<T
: Task
> {
259 impl<T
: Task
> TaskJob
<T
> {
260 fn new(arc
: Arc
<T
>) -> Arc
<Self> {
261 // `TaskJob<T>` has the same layout as `T`, so we can safely
262 // tranmsute this `T` into a `TaskJob<T>`. This lets us write our
263 // impls of `Job` for `TaskJob<T>`, making them more restricted.
264 // Since `Job` is a private trait, this is not strictly necessary,
265 // I don't think, but makes me feel better.
266 unsafe { mem::transmute(arc) }
269 pub fn into_task(this
: Arc
<TaskJob
<T
>>) -> Arc
<T
> {
270 // Same logic as `new()`
271 unsafe { mem::transmute(this) }
274 unsafe fn into_job_ref(this
: Arc
<Self>) -> JobRef
{
275 let this
: *const Self = mem
::transmute(this
);
280 impl<T
: Task
> Job
for TaskJob
<T
> {
281 unsafe fn execute(this
: *const Self) {
282 let this
: Arc
<Self> = mem
::transmute(this
);
283 let task
: Arc
<T
> = TaskJob
::into_task(this
);
289 /// Push a job into the "external jobs" queue; it will be taken by
290 /// whatever worker has nothing to do. Use this is you know that
291 /// you are not on a worker of this registry.
292 pub fn inject(&self, injected_jobs
: &[JobRef
]) {
293 log
!(InjectJobs { count: injected_jobs.len() }
);
295 let state
= self.state
.lock().unwrap();
297 // It should not be possible for `state.terminate` to be true
298 // here. It is only set to true when the user creates (and
299 // drops) a `ThreadPool`; and, in that case, they cannot be
300 // calling `inject()` later, since they dropped their
302 assert
!(!self.terminate_latch
.probe(), "inject() sees state.terminate as true");
304 for &job_ref
in injected_jobs
{
305 state
.job_injector
.push(job_ref
);
308 self.sleep
.tickle(usize::MAX
);
311 fn pop_injected_job(&self, worker_index
: usize) -> Option
<JobRef
> {
313 match self.job_uninjector
.steal() {
314 Steal
::Empty
=> return None
,
316 log
!(UninjectedWork { worker: worker_index }
);
324 /// If already in a worker-thread of this registry, just execute `op`.
325 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
326 /// completes and return its return value. If `op` panics, that panic will
327 /// be propagated as well. The second argument indicates `true` if injection
328 /// was performed, `false` if executed directly.
329 pub fn in_worker
<OP
, R
>(&self, op
: OP
) -> R
330 where OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
, R
: Send
333 let worker_thread
= WorkerThread
::current();
334 if worker_thread
.is_null() {
335 self.in_worker_cold(op
)
336 } else if (*worker_thread
).registry().id() != self.id() {
337 self.in_worker_cross(&*worker_thread
, op
)
339 // Perfectly valid to give them a `&T`: this is the
340 // current thread, so we know the data structure won't be
341 // invalidated until we return.
342 op(&*worker_thread
, false)
348 unsafe fn in_worker_cold
<OP
, R
>(&self, op
: OP
) -> R
349 where OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
, R
: Send
351 // This thread isn't a member of *any* thread pool, so just block.
352 debug_assert
!(WorkerThread
::current().is_null());
353 let job
= StackJob
::new(|injected
| {
354 let worker_thread
= WorkerThread
::current();
355 assert
!(injected
&& !worker_thread
.is_null());
356 op(&*worker_thread
, true)
357 }, LockLatch
::new());
358 self.inject(&[job
.as_job_ref()]);
364 unsafe fn in_worker_cross
<OP
, R
>(&self, current_thread
: &WorkerThread
, op
: OP
) -> R
365 where OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
, R
: Send
367 // This thread is a member of a different pool, so let it process
368 // other work while waiting for this `op` to complete.
369 debug_assert
!(current_thread
.registry().id() != self.id());
370 let latch
= TickleLatch
::new(SpinLatch
::new(), ¤t_thread
.registry().sleep
);
371 let job
= StackJob
::new(|injected
| {
372 let worker_thread
= WorkerThread
::current();
373 assert
!(injected
&& !worker_thread
.is_null());
374 op(&*worker_thread
, true)
376 self.inject(&[job
.as_job_ref()]);
377 current_thread
.wait_until(&job
.latch
);
381 /// Increment the terminate counter. This increment should be
382 /// balanced by a call to `terminate`, which will decrement. This
383 /// is used when spawning asynchronous work, which needs to
384 /// prevent the registry from terminating so long as it is active.
386 /// Note that blocking functions such as `join` and `scope` do not
387 /// need to concern themselves with this fn; their context is
388 /// responsible for ensuring the current thread-pool will not
389 /// terminate until they return.
391 /// The global thread-pool always has an outstanding reference
392 /// (the initial one). Custom thread-pools have one outstanding
393 /// reference that is dropped when the `ThreadPool` is dropped:
394 /// since installing the thread-pool blocks until any joins/scopes
395 /// complete, this ensures that joins/scopes are covered.
397 /// The exception is `::spawn()`, which can create a job outside
398 /// of any blocking scope. In that case, the job itself holds a
399 /// terminate count and is responsible for invoking `terminate()`
401 pub fn increment_terminate_count(&self) {
402 self.terminate_latch
.increment();
405 /// Signals that the thread-pool which owns this registry has been
406 /// dropped. The worker threads will gradually terminate, once any
407 /// extant work is completed.
408 pub fn terminate(&self) {
409 self.terminate_latch
.set();
410 self.sleep
.tickle(usize::MAX
);
414 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
415 pub struct RegistryId
{
420 pub fn new(job_injector
: Deque
<JobRef
>) -> RegistryState
{
422 job_injector
: job_injector
,
428 /// Latch set once thread has started and we are entering into the
429 /// main loop. Used to wait for worker threads to become primed,
430 /// primarily of interest for benchmarking.
433 /// Latch is set once worker thread has completed. Used to wait
434 /// until workers have stopped; only used for tests.
437 /// the "stealer" half of the worker's deque
438 stealer
: Stealer
<JobRef
>,
442 fn new(stealer
: Stealer
<JobRef
>) -> ThreadInfo
{
444 primed
: LockLatch
::new(),
445 stopped
: LockLatch
::new(),
451 /// ////////////////////////////////////////////////////////////////////////
452 /// WorkerThread identifiers
454 pub struct WorkerThread
{
455 /// the "worker" half of our local deque
456 worker
: Deque
<JobRef
>,
460 /// are these workers configured to steal breadth-first or not?
463 /// A weak random number generator.
466 registry
: Arc
<Registry
>,
469 // This is a bit sketchy, but basically: the WorkerThread is
470 // allocated on the stack of the worker on entry and stored into this
471 // thread local variable. So it will remain valid at least until the
472 // worker is fully unwound. Using an unsafe pointer avoids the need
473 // for a RefCell<T> etc.
475 static WORKER_THREAD_STATE
: Cell
<*const WorkerThread
> =
476 Cell
::new(0 as *const WorkerThread
)
480 /// Gets the `WorkerThread` index for the current thread; returns
481 /// NULL if this is not a worker thread. This pointer is valid
482 /// anywhere on the current thread.
484 pub fn current() -> *const WorkerThread
{
485 WORKER_THREAD_STATE
.with(|t
| t
.get())
488 /// Sets `self` as the worker thread index for the current thread.
489 /// This is done during worker thread startup.
490 unsafe fn set_current(thread
: *const WorkerThread
) {
491 WORKER_THREAD_STATE
.with(|t
| {
492 assert
!(t
.get().is_null());
497 /// Returns the registry that owns this worker thread.
498 pub fn registry(&self) -> &Arc
<Registry
> {
502 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
504 pub fn index(&self) -> usize {
509 pub unsafe fn push(&self, job
: JobRef
) {
510 self.worker
.push(job
);
511 self.registry
.sleep
.tickle(self.index
);
515 pub fn local_deque_is_empty(&self) -> bool
{
516 self.worker
.len() == 0
519 /// Attempts to obtain a "local" job -- typically this means
520 /// popping from the top of the stack, though if we are configured
521 /// for breadth-first execution, it would mean dequeuing from the
524 pub unsafe fn take_local_job(&self) -> Option
<JobRef
> {
525 if !self.breadth_first
{
529 match self.worker
.steal() {
530 Steal
::Empty
=> return None
,
531 Steal
::Data(d
) => return Some(d
),
538 /// Wait until the latch is set. Try to keep busy by popping and
539 /// stealing tasks as necessary.
541 pub unsafe fn wait_until
<L
: LatchProbe
+ ?Sized
>(&self, latch
: &L
) {
542 log
!(WaitUntil { worker: self.index }
);
544 self.wait_until_cold(latch
);
549 unsafe fn wait_until_cold
<L
: LatchProbe
+ ?Sized
>(&self, latch
: &L
) {
550 // the code below should swallow all panics and hence never
551 // unwind; but if something does wrong, we want to abort,
552 // because otherwise other code in rayon may assume that the
553 // latch has been signaled, and that can lead to random memory
554 // accesses, which would be *very bad*
555 let abort_guard
= unwind
::AbortIfPanic
;
558 while !latch
.probe() {
559 // Try to find some work to do. We give preference first
560 // to things in our local deque, then in other workers
561 // deques, and finally to injected jobs from the
562 // outside. The idea is to finish what we started before
563 // we take on something new.
564 if let Some(job
) = self.take_local_job()
565 .or_else(|| self.steal())
566 .or_else(|| self.registry
.pop_injected_job(self.index
)) {
567 yields
= self.registry
.sleep
.work_found(self.index
, yields
);
570 yields
= self.registry
.sleep
.no_work_found(self.index
, yields
);
574 // If we were sleepy, we are not anymore. We "found work" --
575 // whatever the surrounding thread was doing before it had to
577 self.registry
.sleep
.work_found(self.index
, yields
);
579 log
!(LatchSet { worker: self.index }
);
580 mem
::forget(abort_guard
); // successful execution, do not abort
583 pub unsafe fn execute(&self, job
: JobRef
) {
586 // Subtle: executing this job will have `set()` some of its
587 // latches. This may mean that a sleepy (or sleeping) worker
588 // can now make progress. So we have to tickle them to let
590 self.registry
.sleep
.tickle(self.index
);
593 /// Try to steal a single job and return it.
595 /// This should only be done as a last resort, when there is no
596 /// local work to do.
597 unsafe fn steal(&self) -> Option
<JobRef
> {
598 // we only steal when we don't have any work to do locally
599 debug_assert
!(self.worker
.pop().is_none());
601 // otherwise, try to steal
602 let num_threads
= self.registry
.thread_infos
.len();
603 if num_threads
<= 1 {
607 let start
= self.rng
.next_usize(num_threads
);
608 (start
.. num_threads
)
610 .filter(|&i
| i
!= self.index
)
611 .filter_map(|victim_index
| {
612 let victim
= &self.registry
.thread_infos
[victim_index
];
614 match victim
.stealer
.steal() {
615 Steal
::Empty
=> return None
,
631 /// ////////////////////////////////////////////////////////////////////////
633 unsafe fn main_loop(worker
: Deque
<JobRef
>,
634 registry
: Arc
<Registry
>,
636 breadth_first
: bool
) {
637 let worker_thread
= WorkerThread
{
639 breadth_first
: breadth_first
,
641 rng
: XorShift64Star
::new(),
642 registry
: registry
.clone(),
644 WorkerThread
::set_current(&worker_thread
);
646 // let registry know we are ready to do work
647 registry
.thread_infos
[index
].primed
.set();
649 // Worker threads should not panic. If they do, just abort, as the
650 // internal state of the threadpool is corrupted. Note that if
651 // **user code** panics, we should catch that and redirect.
652 let abort_guard
= unwind
::AbortIfPanic
;
654 // Inform a user callback that we started a thread.
655 if let Some(ref handler
) = registry
.start_handler
{
656 let registry
= registry
.clone();
657 match unwind
::halt_unwinding(|| handler(index
)) {
661 registry
.handle_panic(err
);
666 worker_thread
.wait_until(®istry
.terminate_latch
);
668 // Should not be any work left in our queue.
669 debug_assert
!(worker_thread
.take_local_job().is_none());
671 // let registry know we are done
672 registry
.thread_infos
[index
].stopped
.set();
674 // Normal termination, do not abort.
675 mem
::forget(abort_guard
);
677 // Inform a user callback that we exited a thread.
678 if let Some(ref handler
) = registry
.exit_handler
{
679 let registry
= registry
.clone();
680 match unwind
::halt_unwinding(|| handler(index
)) {
684 registry
.handle_panic(err
);
687 // We're already exiting the thread, there's nothing else to do.
691 /// If already in a worker-thread, just execute `op`. Otherwise,
692 /// execute `op` in the default thread-pool. Either way, block until
693 /// `op` completes and return its return value. If `op` panics, that
694 /// panic will be propagated as well. The second argument indicates
695 /// `true` if injection was performed, `false` if executed directly.
696 pub fn in_worker
<OP
, R
>(op
: OP
) -> R
697 where OP
: FnOnce(&WorkerThread
, bool
) -> R
+ Send
, R
: Send
700 let owner_thread
= WorkerThread
::current();
701 if !owner_thread
.is_null() {
702 // Perfectly valid to give them a `&T`: this is the
703 // current thread, so we know the data structure won't be
704 // invalidated until we return.
705 op(&*owner_thread
, false)
707 global_registry().in_worker_cold(op
)
712 /// [xorshift*] is a fast pseudorandom number generator which will
713 /// even tolerate weak seeding, as long as it's not zero.
715 /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
716 struct XorShift64Star
{
720 impl XorShift64Star
{
722 // Any non-zero seed will do -- this uses the hash of a global counter.
725 let mut hasher
= DefaultHasher
::new();
726 static COUNTER
: AtomicUsize
= ATOMIC_USIZE_INIT
;
727 hasher
.write_usize(COUNTER
.fetch_add(1, Ordering
::Relaxed
));
728 seed
= hasher
.finish();
732 state
: Cell
::new(seed
),
736 fn next(&self) -> u64 {
737 let mut x
= self.state
.get();
738 debug_assert_ne
!(x
, 0);
743 x
.wrapping_mul(0x2545_f491_4f6c_dd1d)
746 /// Return a value from `0..n`.
747 fn next_usize(&self, n
: usize) -> usize {
748 (self.next() % n
as u64) as usize