]> git.proxmox.com Git - rustc.git/blob - vendor/rustc-rayon-core/src/registry.rs
New upstream version 1.63.0+dfsg1
[rustc.git] / vendor / rustc-rayon-core / src / registry.rs
1 use crate::job::{JobFifo, JobRef, StackJob};
2 use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
3 use crate::log::Event::*;
4 use crate::log::Logger;
5 use crate::sleep::Sleep;
6 use crate::unwind;
7 use crate::{
8 AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
9 ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
10 };
11 use crossbeam_deque::{Injector, Steal, Stealer, Worker};
12 use std::any::Any;
13 use std::cell::Cell;
14 use std::collections::hash_map::DefaultHasher;
15 use std::fmt;
16 use std::hash::Hasher;
17 use std::io;
18 use std::mem;
19 use std::ptr;
20 #[allow(deprecated)]
21 use std::sync::atomic::ATOMIC_USIZE_INIT;
22 use std::sync::atomic::{AtomicUsize, Ordering};
23 use std::sync::{Arc, Once};
24 use std::thread;
25 use std::usize;
26
27 /// Thread builder used for customization via
28 /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
29 pub struct ThreadBuilder {
30 name: Option<String>,
31 stack_size: Option<usize>,
32 worker: Worker<JobRef>,
33 registry: Arc<Registry>,
34 index: usize,
35 }
36
37 impl ThreadBuilder {
38 /// Gets the index of this thread in the pool, within `0..num_threads`.
39 pub fn index(&self) -> usize {
40 self.index
41 }
42
43 /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
44 pub fn name(&self) -> Option<&str> {
45 self.name.as_ref().map(String::as_str)
46 }
47
48 /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
49 pub fn stack_size(&self) -> Option<usize> {
50 self.stack_size
51 }
52
53 /// Executes the main loop for this thread. This will not return until the
54 /// thread pool is dropped.
55 pub fn run(self) {
56 unsafe { main_loop(self.worker, self.registry, self.index) }
57 }
58 }
59
60 impl fmt::Debug for ThreadBuilder {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 f.debug_struct("ThreadBuilder")
63 .field("pool", &self.registry.id())
64 .field("index", &self.index)
65 .field("name", &self.name)
66 .field("stack_size", &self.stack_size)
67 .finish()
68 }
69 }
70
71 /// Generalized trait for spawning a thread in the `Registry`.
72 ///
73 /// This trait is pub-in-private -- E0445 forces us to make it public,
74 /// but we don't actually want to expose these details in the API.
75 pub trait ThreadSpawn {
76 private_decl! {}
77
78 /// Spawn a thread with the `ThreadBuilder` parameters, and then
79 /// call `ThreadBuilder::run()`.
80 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
81 }
82
83 /// Spawns a thread in the "normal" way with `std::thread::Builder`.
84 ///
85 /// This type is pub-in-private -- E0445 forces us to make it public,
86 /// but we don't actually want to expose these details in the API.
87 #[derive(Debug, Default)]
88 pub struct DefaultSpawn;
89
90 impl ThreadSpawn for DefaultSpawn {
91 private_impl! {}
92
93 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
94 let mut b = thread::Builder::new();
95 if let Some(name) = thread.name() {
96 b = b.name(name.to_owned());
97 }
98 if let Some(stack_size) = thread.stack_size() {
99 b = b.stack_size(stack_size);
100 }
101 b.spawn(|| thread.run())?;
102 Ok(())
103 }
104 }
105
106 /// Spawns a thread with a user's custom callback.
107 ///
108 /// This type is pub-in-private -- E0445 forces us to make it public,
109 /// but we don't actually want to expose these details in the API.
110 #[derive(Debug)]
111 pub struct CustomSpawn<F>(F);
112
113 impl<F> CustomSpawn<F>
114 where
115 F: FnMut(ThreadBuilder) -> io::Result<()>,
116 {
117 pub(super) fn new(spawn: F) -> Self {
118 CustomSpawn(spawn)
119 }
120 }
121
122 impl<F> ThreadSpawn for CustomSpawn<F>
123 where
124 F: FnMut(ThreadBuilder) -> io::Result<()>,
125 {
126 private_impl! {}
127
128 #[inline]
129 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
130 (self.0)(thread)
131 }
132 }
133
134 pub struct Registry {
135 logger: Logger,
136 thread_infos: Vec<ThreadInfo>,
137 sleep: Sleep,
138 injected_jobs: Injector<JobRef>,
139 panic_handler: Option<Box<PanicHandler>>,
140 pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
141 start_handler: Option<Box<StartHandler>>,
142 exit_handler: Option<Box<ExitHandler>>,
143 pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
144 pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>,
145
146 // When this latch reaches 0, it means that all work on this
147 // registry must be complete. This is ensured in the following ways:
148 //
149 // - if this is the global registry, there is a ref-count that never
150 // gets released.
151 // - if this is a user-created thread-pool, then so long as the thread-pool
152 // exists, it holds a reference.
153 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
154 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
155 // return until the blocking job is complete, that ref will continue to be held.
156 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
157 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
158 // and that job will keep the pool alive.
159 terminate_count: AtomicUsize,
160 }
161
162 /// ////////////////////////////////////////////////////////////////////////
163 /// Initialization
164
165 static mut THE_REGISTRY: Option<Arc<Registry>> = None;
166 static THE_REGISTRY_SET: Once = Once::new();
167
168 /// Starts the worker threads (if that has not already happened). If
169 /// initialization has not already occurred, use the default
170 /// configuration.
171 pub(super) fn global_registry() -> &'static Arc<Registry> {
172 set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
173 .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
174 .expect("The global thread pool has not been initialized.")
175 }
176
177 /// Starts the worker threads (if that has not already happened) with
178 /// the given builder.
179 pub(super) fn init_global_registry<S>(
180 builder: ThreadPoolBuilder<S>,
181 ) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
182 where
183 S: ThreadSpawn,
184 {
185 set_global_registry(|| Registry::new(builder))
186 }
187
188 /// Starts the worker threads (if that has not already happened)
189 /// by creating a registry with the given callback.
190 fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
191 where
192 F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
193 {
194 let mut result = Err(ThreadPoolBuildError::new(
195 ErrorKind::GlobalPoolAlreadyInitialized,
196 ));
197
198 THE_REGISTRY_SET.call_once(|| {
199 result = registry()
200 .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) })
201 });
202
203 result
204 }
205
206 struct Terminator<'a>(&'a Arc<Registry>);
207
208 impl<'a> Drop for Terminator<'a> {
209 fn drop(&mut self) {
210 self.0.terminate()
211 }
212 }
213
214 impl Registry {
215 pub(super) fn new<S>(
216 mut builder: ThreadPoolBuilder<S>,
217 ) -> Result<Arc<Self>, ThreadPoolBuildError>
218 where
219 S: ThreadSpawn,
220 {
221 // Soft-limit the number of threads that we can actually support.
222 let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads());
223
224 let breadth_first = builder.get_breadth_first();
225
226 let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
227 .map(|_| {
228 let worker = if breadth_first {
229 Worker::new_fifo()
230 } else {
231 Worker::new_lifo()
232 };
233
234 let stealer = worker.stealer();
235 (worker, stealer)
236 })
237 .unzip();
238
239 let logger = Logger::new(n_threads);
240 let registry = Arc::new(Registry {
241 logger: logger.clone(),
242 thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
243 sleep: Sleep::new(logger, n_threads),
244 injected_jobs: Injector::new(),
245 terminate_count: AtomicUsize::new(1),
246 panic_handler: builder.take_panic_handler(),
247 deadlock_handler: builder.take_deadlock_handler(),
248 start_handler: builder.take_start_handler(),
249 exit_handler: builder.take_exit_handler(),
250 acquire_thread_handler: builder.take_acquire_thread_handler(),
251 release_thread_handler: builder.take_release_thread_handler(),
252 });
253
254 // If we return early or panic, make sure to terminate existing threads.
255 let t1000 = Terminator(&registry);
256
257 for (index, worker) in workers.into_iter().enumerate() {
258 let thread = ThreadBuilder {
259 name: builder.get_thread_name(index),
260 stack_size: builder.get_stack_size(),
261 registry: Arc::clone(&registry),
262 worker,
263 index,
264 };
265 if let Err(e) = builder.get_spawn_handler().spawn(thread) {
266 return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
267 }
268 }
269
270 // Returning normally now, without termination.
271 mem::forget(t1000);
272
273 Ok(registry)
274 }
275
276 pub fn current() -> Arc<Registry> {
277 unsafe {
278 let worker_thread = WorkerThread::current();
279 let registry = if worker_thread.is_null() {
280 global_registry()
281 } else {
282 &(*worker_thread).registry
283 };
284 Arc::clone(registry)
285 }
286 }
287
288 /// Returns the number of threads in the current registry. This
289 /// is better than `Registry::current().num_threads()` because it
290 /// avoids incrementing the `Arc`.
291 pub(super) fn current_num_threads() -> usize {
292 unsafe {
293 let worker_thread = WorkerThread::current();
294 if worker_thread.is_null() {
295 global_registry().num_threads()
296 } else {
297 (*worker_thread).registry.num_threads()
298 }
299 }
300 }
301
302 /// Returns the current `WorkerThread` if it's part of this `Registry`.
303 pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
304 unsafe {
305 let worker = WorkerThread::current().as_ref()?;
306 if worker.registry().id() == self.id() {
307 Some(worker)
308 } else {
309 None
310 }
311 }
312 }
313
314 /// Returns an opaque identifier for this registry.
315 pub(super) fn id(&self) -> RegistryId {
316 // We can rely on `self` not to change since we only ever create
317 // registries that are boxed up in an `Arc` (see `new()` above).
318 RegistryId {
319 addr: self as *const Self as usize,
320 }
321 }
322
323 #[inline]
324 pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
325 self.logger.log(event)
326 }
327
328 pub(super) fn num_threads(&self) -> usize {
329 self.thread_infos.len()
330 }
331
332 pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
333 match self.panic_handler {
334 Some(ref handler) => {
335 // If the customizable panic handler itself panics,
336 // then we abort.
337 let abort_guard = unwind::AbortIfPanic;
338 handler(err);
339 mem::forget(abort_guard);
340 }
341 None => {
342 // Default panic handler aborts.
343 let _ = unwind::AbortIfPanic; // let this drop.
344 }
345 }
346 }
347
348 /// Waits for the worker threads to get up and running. This is
349 /// meant to be used for benchmarking purposes, primarily, so that
350 /// you can get more consistent numbers by having everything
351 /// "ready to go".
352 pub(super) fn wait_until_primed(&self) {
353 for info in &self.thread_infos {
354 info.primed.wait();
355 }
356 }
357
358 /// Waits for the worker threads to stop. This is used for testing
359 /// -- so we can check that termination actually works.
360 pub(super) fn wait_until_stopped(&self) {
361 self.release_thread();
362 for info in &self.thread_infos {
363 info.stopped.wait();
364 }
365 self.acquire_thread();
366 }
367
368 pub(crate) fn acquire_thread(&self) {
369 if let Some(ref acquire_thread_handler) = self.acquire_thread_handler {
370 acquire_thread_handler();
371 }
372 }
373
374 pub(crate) fn release_thread(&self) {
375 if let Some(ref release_thread_handler) = self.release_thread_handler {
376 release_thread_handler();
377 }
378 }
379
380 /// ////////////////////////////////////////////////////////////////////////
381 /// MAIN LOOP
382 ///
383 /// So long as all of the worker threads are hanging out in their
384 /// top-level loop, there is no work to be done.
385
386 /// Push a job into the given `registry`. If we are running on a
387 /// worker thread for the registry, this will push onto the
388 /// deque. Else, it will inject from the outside (which is slower).
389 pub(super) fn inject_or_push(&self, job_ref: JobRef) {
390 let worker_thread = WorkerThread::current();
391 unsafe {
392 if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
393 (*worker_thread).push(job_ref);
394 } else {
395 self.inject(&[job_ref]);
396 }
397 }
398 }
399
400 /// Push a job into the "external jobs" queue; it will be taken by
401 /// whatever worker has nothing to do. Use this is you know that
402 /// you are not on a worker of this registry.
403 pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
404 self.log(|| JobsInjected {
405 count: injected_jobs.len(),
406 });
407
408 // It should not be possible for `state.terminate` to be true
409 // here. It is only set to true when the user creates (and
410 // drops) a `ThreadPool`; and, in that case, they cannot be
411 // calling `inject()` later, since they dropped their
412 // `ThreadPool`.
413 debug_assert_ne!(
414 self.terminate_count.load(Ordering::Acquire),
415 0,
416 "inject() sees state.terminate as true"
417 );
418
419 let queue_was_empty = self.injected_jobs.is_empty();
420
421 for &job_ref in injected_jobs {
422 self.injected_jobs.push(job_ref);
423 }
424
425 self.sleep
426 .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
427 }
428
429 pub(crate) fn has_injected_job(&self) -> bool {
430 !self.injected_jobs.is_empty()
431 }
432
433 fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
434 loop {
435 match self.injected_jobs.steal() {
436 Steal::Success(job) => {
437 self.log(|| JobUninjected {
438 worker: worker_index,
439 });
440 return Some(job);
441 }
442 Steal::Empty => return None,
443 Steal::Retry => {}
444 }
445 }
446 }
447
448 /// If already in a worker-thread of this registry, just execute `op`.
449 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
450 /// completes and return its return value. If `op` panics, that panic will
451 /// be propagated as well. The second argument indicates `true` if injection
452 /// was performed, `false` if executed directly.
453 pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
454 where
455 OP: FnOnce(&WorkerThread, bool) -> R + Send,
456 R: Send,
457 {
458 unsafe {
459 let worker_thread = WorkerThread::current();
460 if worker_thread.is_null() {
461 self.in_worker_cold(op)
462 } else if (*worker_thread).registry().id() != self.id() {
463 self.in_worker_cross(&*worker_thread, op)
464 } else {
465 // Perfectly valid to give them a `&T`: this is the
466 // current thread, so we know the data structure won't be
467 // invalidated until we return.
468 op(&*worker_thread, false)
469 }
470 }
471 }
472
473 #[cold]
474 unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
475 where
476 OP: FnOnce(&WorkerThread, bool) -> R + Send,
477 R: Send,
478 {
479 thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
480
481 LOCK_LATCH.with(|l| {
482 // This thread isn't a member of *any* thread pool, so just block.
483 debug_assert!(WorkerThread::current().is_null());
484 let job = StackJob::new(
485 0,
486 |injected| {
487 let worker_thread = WorkerThread::current();
488 assert!(injected && !worker_thread.is_null());
489 op(&*worker_thread, true)
490 },
491 l,
492 );
493 self.inject(&[job.as_job_ref()]);
494 self.release_thread();
495 job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
496 self.acquire_thread();
497
498 // flush accumulated logs as we exit the thread
499 self.logger.log(|| Flush);
500
501 job.into_result()
502 })
503 }
504
505 #[cold]
506 unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
507 where
508 OP: FnOnce(&WorkerThread, bool) -> R + Send,
509 R: Send,
510 {
511 // This thread is a member of a different pool, so let it process
512 // other work while waiting for this `op` to complete.
513 debug_assert!(current_thread.registry().id() != self.id());
514 let latch = SpinLatch::cross(current_thread);
515 let job = StackJob::new(
516 0,
517 |injected| {
518 let worker_thread = WorkerThread::current();
519 assert!(injected && !worker_thread.is_null());
520 op(&*worker_thread, true)
521 },
522 latch,
523 );
524 self.inject(&[job.as_job_ref()]);
525 current_thread.wait_until(&job.latch);
526 job.into_result()
527 }
528
529 /// Increments the terminate counter. This increment should be
530 /// balanced by a call to `terminate`, which will decrement. This
531 /// is used when spawning asynchronous work, which needs to
532 /// prevent the registry from terminating so long as it is active.
533 ///
534 /// Note that blocking functions such as `join` and `scope` do not
535 /// need to concern themselves with this fn; their context is
536 /// responsible for ensuring the current thread-pool will not
537 /// terminate until they return.
538 ///
539 /// The global thread-pool always has an outstanding reference
540 /// (the initial one). Custom thread-pools have one outstanding
541 /// reference that is dropped when the `ThreadPool` is dropped:
542 /// since installing the thread-pool blocks until any joins/scopes
543 /// complete, this ensures that joins/scopes are covered.
544 ///
545 /// The exception is `::spawn()`, which can create a job outside
546 /// of any blocking scope. In that case, the job itself holds a
547 /// terminate count and is responsible for invoking `terminate()`
548 /// when finished.
549 pub(super) fn increment_terminate_count(&self) {
550 let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
551 debug_assert!(previous != 0, "registry ref count incremented from zero");
552 assert!(
553 previous != std::usize::MAX,
554 "overflow in registry ref count"
555 );
556 }
557
558 /// Signals that the thread-pool which owns this registry has been
559 /// dropped. The worker threads will gradually terminate, once any
560 /// extant work is completed.
561 pub(super) fn terminate(&self) {
562 if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
563 for (i, thread_info) in self.thread_infos.iter().enumerate() {
564 thread_info.terminate.set_and_tickle_one(self, i);
565 }
566 }
567 }
568
569 /// Notify the worker that the latch they are sleeping on has been "set".
570 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
571 self.sleep.notify_worker_latch_is_set(target_worker_index);
572 }
573 }
574
575 /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
576 /// if no other worker thread is active
577 #[inline]
578 pub fn mark_blocked() {
579 let worker_thread = WorkerThread::current();
580 assert!(!worker_thread.is_null());
581 unsafe {
582 let registry = &(*worker_thread).registry;
583 registry.sleep.mark_blocked(&registry.deadlock_handler)
584 }
585 }
586
587 /// Mark a previously blocked Rayon worker thread as unblocked
588 #[inline]
589 pub fn mark_unblocked(registry: &Registry) {
590 registry.sleep.mark_unblocked()
591 }
592
593 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
594 pub(super) struct RegistryId {
595 addr: usize,
596 }
597
598 struct ThreadInfo {
599 /// Latch set once thread has started and we are entering into the
600 /// main loop. Used to wait for worker threads to become primed,
601 /// primarily of interest for benchmarking.
602 primed: LockLatch,
603
604 /// Latch is set once worker thread has completed. Used to wait
605 /// until workers have stopped; only used for tests.
606 stopped: LockLatch,
607
608 /// The latch used to signal that terminated has been requested.
609 /// This latch is *set* by the `terminate` method on the
610 /// `Registry`, once the registry's main "terminate" counter
611 /// reaches zero.
612 ///
613 /// NB. We use a `CountLatch` here because it has no lifetimes and is
614 /// meant for async use, but the count never gets higher than one.
615 terminate: CountLatch,
616
617 /// the "stealer" half of the worker's deque
618 stealer: Stealer<JobRef>,
619 }
620
621 impl ThreadInfo {
622 fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
623 ThreadInfo {
624 primed: LockLatch::new(),
625 stopped: LockLatch::new(),
626 terminate: CountLatch::new(),
627 stealer,
628 }
629 }
630 }
631
632 /// ////////////////////////////////////////////////////////////////////////
633 /// WorkerThread identifiers
634
635 pub(super) struct WorkerThread {
636 /// the "worker" half of our local deque
637 worker: Worker<JobRef>,
638
639 /// local queue used for `spawn_fifo` indirection
640 fifo: JobFifo,
641
642 pub(crate) index: usize,
643
644 /// A weak random number generator.
645 rng: XorShift64Star,
646
647 pub(crate) registry: Arc<Registry>,
648 }
649
650 // This is a bit sketchy, but basically: the WorkerThread is
651 // allocated on the stack of the worker on entry and stored into this
652 // thread local variable. So it will remain valid at least until the
653 // worker is fully unwound. Using an unsafe pointer avoids the need
654 // for a RefCell<T> etc.
655 thread_local! {
656 static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
657 }
658
659 impl Drop for WorkerThread {
660 fn drop(&mut self) {
661 // Undo `set_current`
662 WORKER_THREAD_STATE.with(|t| {
663 assert!(t.get().eq(&(self as *const _)));
664 t.set(ptr::null());
665 });
666 }
667 }
668
669 impl WorkerThread {
670 /// Gets the `WorkerThread` index for the current thread; returns
671 /// NULL if this is not a worker thread. This pointer is valid
672 /// anywhere on the current thread.
673 #[inline]
674 pub(super) fn current() -> *const WorkerThread {
675 WORKER_THREAD_STATE.with(Cell::get)
676 }
677
678 /// Sets `self` as the worker thread index for the current thread.
679 /// This is done during worker thread startup.
680 unsafe fn set_current(thread: *const WorkerThread) {
681 WORKER_THREAD_STATE.with(|t| {
682 assert!(t.get().is_null());
683 t.set(thread);
684 });
685 }
686
687 /// Returns the registry that owns this worker thread.
688 #[inline]
689 pub(super) fn registry(&self) -> &Arc<Registry> {
690 &self.registry
691 }
692
693 #[inline]
694 pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
695 self.registry.logger.log(event)
696 }
697
698 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
699 #[inline]
700 pub(super) fn index(&self) -> usize {
701 self.index
702 }
703
704 #[inline]
705 pub(super) unsafe fn push(&self, job: JobRef) {
706 self.log(|| JobPushed { worker: self.index });
707 let queue_was_empty = self.worker.is_empty();
708 self.worker.push(job);
709 self.registry
710 .sleep
711 .new_internal_jobs(self.index, 1, queue_was_empty);
712 }
713
714 #[inline]
715 pub(super) unsafe fn push_fifo(&self, job: JobRef) {
716 self.push(self.fifo.push(job));
717 }
718
719 #[inline]
720 pub(super) fn local_deque_is_empty(&self) -> bool {
721 self.worker.is_empty()
722 }
723
724 /// Attempts to obtain a "local" job -- typically this means
725 /// popping from the top of the stack, though if we are configured
726 /// for breadth-first execution, it would mean dequeuing from the
727 /// bottom.
728 #[inline]
729 pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
730 let popped_job = self.worker.pop();
731
732 if popped_job.is_some() {
733 self.log(|| JobPopped { worker: self.index });
734 }
735
736 popped_job
737 }
738
739 /// Wait until the latch is set. Try to keep busy by popping and
740 /// stealing tasks as necessary.
741 #[inline]
742 pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
743 let latch = latch.as_core_latch();
744 if !latch.probe() {
745 self.wait_until_cold(latch);
746 }
747 }
748
749 #[cold]
750 unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
751 // the code below should swallow all panics and hence never
752 // unwind; but if something does wrong, we want to abort,
753 // because otherwise other code in rayon may assume that the
754 // latch has been signaled, and that can lead to random memory
755 // accesses, which would be *very bad*
756 let abort_guard = unwind::AbortIfPanic;
757
758 let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
759 while !latch.probe() {
760 // Try to find some work to do. We give preference first
761 // to things in our local deque, then in other workers
762 // deques, and finally to injected jobs from the
763 // outside. The idea is to finish what we started before
764 // we take on something new.
765 if let Some(job) = self
766 .take_local_job()
767 .or_else(|| self.steal())
768 .or_else(|| self.registry.pop_injected_job(self.index))
769 {
770 self.registry.sleep.work_found(idle_state);
771 self.execute(job);
772 idle_state = self.registry.sleep.start_looking(self.index, latch);
773 } else {
774 self.registry
775 .sleep
776 .no_work_found(&mut idle_state, latch, &self.registry)
777 }
778 }
779
780 // If we were sleepy, we are not anymore. We "found work" --
781 // whatever the surrounding thread was doing before it had to
782 // wait.
783 self.registry.sleep.work_found(idle_state);
784
785 self.log(|| ThreadSawLatchSet {
786 worker: self.index,
787 latch_addr: latch.addr(),
788 });
789 mem::forget(abort_guard); // successful execution, do not abort
790 }
791
792 #[inline]
793 pub(super) unsafe fn execute(&self, job: JobRef) {
794 job.execute();
795 }
796
797 /// Try to steal a single job and return it.
798 ///
799 /// This should only be done as a last resort, when there is no
800 /// local work to do.
801 unsafe fn steal(&self) -> Option<JobRef> {
802 // we only steal when we don't have any work to do locally
803 debug_assert!(self.local_deque_is_empty());
804
805 // otherwise, try to steal
806 let thread_infos = &self.registry.thread_infos.as_slice();
807 let num_threads = thread_infos.len();
808 if num_threads <= 1 {
809 return None;
810 }
811
812 loop {
813 let mut retry = false;
814 let start = self.rng.next_usize(num_threads);
815 let job = (start..num_threads)
816 .chain(0..start)
817 .filter(move |&i| i != self.index)
818 .find_map(|victim_index| {
819 let victim = &thread_infos[victim_index];
820 match victim.stealer.steal() {
821 Steal::Success(job) => {
822 self.log(|| JobStolen {
823 worker: self.index,
824 victim: victim_index,
825 });
826 Some(job)
827 }
828 Steal::Empty => None,
829 Steal::Retry => {
830 retry = true;
831 None
832 }
833 }
834 });
835 if job.is_some() || !retry {
836 return job;
837 }
838 }
839 }
840 }
841
842 /// ////////////////////////////////////////////////////////////////////////
843
844 unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
845 let worker_thread = &WorkerThread {
846 worker,
847 fifo: JobFifo::new(),
848 index,
849 rng: XorShift64Star::new(),
850 registry,
851 };
852 WorkerThread::set_current(worker_thread);
853 let registry = &*worker_thread.registry;
854
855 // let registry know we are ready to do work
856 registry.thread_infos[index].primed.set();
857
858 // Worker threads should not panic. If they do, just abort, as the
859 // internal state of the threadpool is corrupted. Note that if
860 // **user code** panics, we should catch that and redirect.
861 let abort_guard = unwind::AbortIfPanic;
862
863 // Inform a user callback that we started a thread.
864 if let Some(ref handler) = registry.start_handler {
865 match unwind::halt_unwinding(|| handler(index)) {
866 Ok(()) => {}
867 Err(err) => {
868 registry.handle_panic(err);
869 }
870 }
871 }
872
873 let my_terminate_latch = &registry.thread_infos[index].terminate;
874 worker_thread.log(|| ThreadStart {
875 worker: index,
876 terminate_addr: my_terminate_latch.as_core_latch().addr(),
877 });
878 registry.acquire_thread();
879 worker_thread.wait_until(my_terminate_latch);
880
881 // Should not be any work left in our queue.
882 debug_assert!(worker_thread.take_local_job().is_none());
883
884 // let registry know we are done
885 registry.thread_infos[index].stopped.set();
886
887 // Normal termination, do not abort.
888 mem::forget(abort_guard);
889
890 worker_thread.log(|| ThreadTerminate { worker: index });
891
892 // Inform a user callback that we exited a thread.
893 if let Some(ref handler) = registry.exit_handler {
894 match unwind::halt_unwinding(|| handler(index)) {
895 Ok(()) => {}
896 Err(err) => {
897 registry.handle_panic(err);
898 }
899 }
900 // We're already exiting the thread, there's nothing else to do.
901 }
902
903 registry.release_thread();
904 }
905
906 /// If already in a worker-thread, just execute `op`. Otherwise,
907 /// execute `op` in the default thread-pool. Either way, block until
908 /// `op` completes and return its return value. If `op` panics, that
909 /// panic will be propagated as well. The second argument indicates
910 /// `true` if injection was performed, `false` if executed directly.
911 pub(super) fn in_worker<OP, R>(op: OP) -> R
912 where
913 OP: FnOnce(&WorkerThread, bool) -> R + Send,
914 R: Send,
915 {
916 unsafe {
917 let owner_thread = WorkerThread::current();
918 if !owner_thread.is_null() {
919 // Perfectly valid to give them a `&T`: this is the
920 // current thread, so we know the data structure won't be
921 // invalidated until we return.
922 op(&*owner_thread, false)
923 } else {
924 global_registry().in_worker_cold(op)
925 }
926 }
927 }
928
929 /// [xorshift*] is a fast pseudorandom number generator which will
930 /// even tolerate weak seeding, as long as it's not zero.
931 ///
932 /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
933 struct XorShift64Star {
934 state: Cell<u64>,
935 }
936
937 impl XorShift64Star {
938 fn new() -> Self {
939 // Any non-zero seed will do -- this uses the hash of a global counter.
940 let mut seed = 0;
941 while seed == 0 {
942 let mut hasher = DefaultHasher::new();
943 #[allow(deprecated)]
944 static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
945 hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
946 seed = hasher.finish();
947 }
948
949 XorShift64Star {
950 state: Cell::new(seed),
951 }
952 }
953
954 fn next(&self) -> u64 {
955 let mut x = self.state.get();
956 debug_assert_ne!(x, 0);
957 x ^= x >> 12;
958 x ^= x << 25;
959 x ^= x >> 27;
960 self.state.set(x);
961 x.wrapping_mul(0x2545_f491_4f6c_dd1d)
962 }
963
964 /// Return a value from `0..n`.
965 fn next_usize(&self, n: usize) -> usize {
966 (self.next() % n as u64) as usize
967 }
968 }