]> git.proxmox.com Git - rustc.git/blob - vendor/rayon-core/src/registry.rs
New upstream version 1.63.0+dfsg1
[rustc.git] / vendor / rayon-core / src / registry.rs
1 use crate::job::{JobFifo, JobRef, StackJob};
2 use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
3 use crate::log::Event::*;
4 use crate::log::Logger;
5 use crate::sleep::Sleep;
6 use crate::unwind;
7 use crate::{
8 ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
9 };
10 use crossbeam_deque::{Injector, Steal, Stealer, Worker};
11 use std::any::Any;
12 use std::cell::Cell;
13 use std::collections::hash_map::DefaultHasher;
14 use std::fmt;
15 use std::hash::Hasher;
16 use std::io;
17 use std::mem;
18 use std::ptr;
19 #[allow(deprecated)]
20 use std::sync::atomic::ATOMIC_USIZE_INIT;
21 use std::sync::atomic::{AtomicUsize, Ordering};
22 use std::sync::{Arc, Once};
23 use std::thread;
24 use std::usize;
25
26 /// Thread builder used for customization via
27 /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
28 pub struct ThreadBuilder {
29 name: Option<String>,
30 stack_size: Option<usize>,
31 worker: Worker<JobRef>,
32 registry: Arc<Registry>,
33 index: usize,
34 }
35
36 impl ThreadBuilder {
37 /// Gets the index of this thread in the pool, within `0..num_threads`.
38 pub fn index(&self) -> usize {
39 self.index
40 }
41
42 /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
43 pub fn name(&self) -> Option<&str> {
44 self.name.as_ref().map(String::as_str)
45 }
46
47 /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
48 pub fn stack_size(&self) -> Option<usize> {
49 self.stack_size
50 }
51
52 /// Executes the main loop for this thread. This will not return until the
53 /// thread pool is dropped.
54 pub fn run(self) {
55 unsafe { main_loop(self.worker, self.registry, self.index) }
56 }
57 }
58
59 impl fmt::Debug for ThreadBuilder {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.debug_struct("ThreadBuilder")
62 .field("pool", &self.registry.id())
63 .field("index", &self.index)
64 .field("name", &self.name)
65 .field("stack_size", &self.stack_size)
66 .finish()
67 }
68 }
69
70 /// Generalized trait for spawning a thread in the `Registry`.
71 ///
72 /// This trait is pub-in-private -- E0445 forces us to make it public,
73 /// but we don't actually want to expose these details in the API.
74 pub trait ThreadSpawn {
75 private_decl! {}
76
77 /// Spawn a thread with the `ThreadBuilder` parameters, and then
78 /// call `ThreadBuilder::run()`.
79 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
80 }
81
82 /// Spawns a thread in the "normal" way with `std::thread::Builder`.
83 ///
84 /// This type is pub-in-private -- E0445 forces us to make it public,
85 /// but we don't actually want to expose these details in the API.
86 #[derive(Debug, Default)]
87 pub struct DefaultSpawn;
88
89 impl ThreadSpawn for DefaultSpawn {
90 private_impl! {}
91
92 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
93 let mut b = thread::Builder::new();
94 if let Some(name) = thread.name() {
95 b = b.name(name.to_owned());
96 }
97 if let Some(stack_size) = thread.stack_size() {
98 b = b.stack_size(stack_size);
99 }
100 b.spawn(|| thread.run())?;
101 Ok(())
102 }
103 }
104
105 /// Spawns a thread with a user's custom callback.
106 ///
107 /// This type is pub-in-private -- E0445 forces us to make it public,
108 /// but we don't actually want to expose these details in the API.
109 #[derive(Debug)]
110 pub struct CustomSpawn<F>(F);
111
112 impl<F> CustomSpawn<F>
113 where
114 F: FnMut(ThreadBuilder) -> io::Result<()>,
115 {
116 pub(super) fn new(spawn: F) -> Self {
117 CustomSpawn(spawn)
118 }
119 }
120
121 impl<F> ThreadSpawn for CustomSpawn<F>
122 where
123 F: FnMut(ThreadBuilder) -> io::Result<()>,
124 {
125 private_impl! {}
126
127 #[inline]
128 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
129 (self.0)(thread)
130 }
131 }
132
133 pub(super) struct Registry {
134 logger: Logger,
135 thread_infos: Vec<ThreadInfo>,
136 sleep: Sleep,
137 injected_jobs: Injector<JobRef>,
138 panic_handler: Option<Box<PanicHandler>>,
139 start_handler: Option<Box<StartHandler>>,
140 exit_handler: Option<Box<ExitHandler>>,
141
142 // When this latch reaches 0, it means that all work on this
143 // registry must be complete. This is ensured in the following ways:
144 //
145 // - if this is the global registry, there is a ref-count that never
146 // gets released.
147 // - if this is a user-created thread-pool, then so long as the thread-pool
148 // exists, it holds a reference.
149 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
150 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
151 // return until the blocking job is complete, that ref will continue to be held.
152 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
153 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
154 // and that job will keep the pool alive.
155 terminate_count: AtomicUsize,
156 }
157
158 /// ////////////////////////////////////////////////////////////////////////
159 /// Initialization
160
161 static mut THE_REGISTRY: Option<Arc<Registry>> = None;
162 static THE_REGISTRY_SET: Once = Once::new();
163
164 /// Starts the worker threads (if that has not already happened). If
165 /// initialization has not already occurred, use the default
166 /// configuration.
167 pub(super) fn global_registry() -> &'static Arc<Registry> {
168 set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
169 .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
170 .expect("The global thread pool has not been initialized.")
171 }
172
173 /// Starts the worker threads (if that has not already happened) with
174 /// the given builder.
175 pub(super) fn init_global_registry<S>(
176 builder: ThreadPoolBuilder<S>,
177 ) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
178 where
179 S: ThreadSpawn,
180 {
181 set_global_registry(|| Registry::new(builder))
182 }
183
184 /// Starts the worker threads (if that has not already happened)
185 /// by creating a registry with the given callback.
186 fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
187 where
188 F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
189 {
190 let mut result = Err(ThreadPoolBuildError::new(
191 ErrorKind::GlobalPoolAlreadyInitialized,
192 ));
193
194 THE_REGISTRY_SET.call_once(|| {
195 result = registry()
196 .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) })
197 });
198
199 result
200 }
201
202 struct Terminator<'a>(&'a Arc<Registry>);
203
204 impl<'a> Drop for Terminator<'a> {
205 fn drop(&mut self) {
206 self.0.terminate()
207 }
208 }
209
210 impl Registry {
211 pub(super) fn new<S>(
212 mut builder: ThreadPoolBuilder<S>,
213 ) -> Result<Arc<Self>, ThreadPoolBuildError>
214 where
215 S: ThreadSpawn,
216 {
217 // Soft-limit the number of threads that we can actually support.
218 let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads());
219
220 let breadth_first = builder.get_breadth_first();
221
222 let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
223 .map(|_| {
224 let worker = if breadth_first {
225 Worker::new_fifo()
226 } else {
227 Worker::new_lifo()
228 };
229
230 let stealer = worker.stealer();
231 (worker, stealer)
232 })
233 .unzip();
234
235 let logger = Logger::new(n_threads);
236 let registry = Arc::new(Registry {
237 logger: logger.clone(),
238 thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
239 sleep: Sleep::new(logger, n_threads),
240 injected_jobs: Injector::new(),
241 terminate_count: AtomicUsize::new(1),
242 panic_handler: builder.take_panic_handler(),
243 start_handler: builder.take_start_handler(),
244 exit_handler: builder.take_exit_handler(),
245 });
246
247 // If we return early or panic, make sure to terminate existing threads.
248 let t1000 = Terminator(&registry);
249
250 for (index, worker) in workers.into_iter().enumerate() {
251 let thread = ThreadBuilder {
252 name: builder.get_thread_name(index),
253 stack_size: builder.get_stack_size(),
254 registry: Arc::clone(&registry),
255 worker,
256 index,
257 };
258 if let Err(e) = builder.get_spawn_handler().spawn(thread) {
259 return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
260 }
261 }
262
263 // Returning normally now, without termination.
264 mem::forget(t1000);
265
266 Ok(registry)
267 }
268
269 pub(super) fn current() -> Arc<Registry> {
270 unsafe {
271 let worker_thread = WorkerThread::current();
272 let registry = if worker_thread.is_null() {
273 global_registry()
274 } else {
275 &(*worker_thread).registry
276 };
277 Arc::clone(registry)
278 }
279 }
280
281 /// Returns the number of threads in the current registry. This
282 /// is better than `Registry::current().num_threads()` because it
283 /// avoids incrementing the `Arc`.
284 pub(super) fn current_num_threads() -> usize {
285 unsafe {
286 let worker_thread = WorkerThread::current();
287 if worker_thread.is_null() {
288 global_registry().num_threads()
289 } else {
290 (*worker_thread).registry.num_threads()
291 }
292 }
293 }
294
295 /// Returns the current `WorkerThread` if it's part of this `Registry`.
296 pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
297 unsafe {
298 let worker = WorkerThread::current().as_ref()?;
299 if worker.registry().id() == self.id() {
300 Some(worker)
301 } else {
302 None
303 }
304 }
305 }
306
307 /// Returns an opaque identifier for this registry.
308 pub(super) fn id(&self) -> RegistryId {
309 // We can rely on `self` not to change since we only ever create
310 // registries that are boxed up in an `Arc` (see `new()` above).
311 RegistryId {
312 addr: self as *const Self as usize,
313 }
314 }
315
316 #[inline]
317 pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
318 self.logger.log(event)
319 }
320
321 pub(super) fn num_threads(&self) -> usize {
322 self.thread_infos.len()
323 }
324
325 pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
326 match self.panic_handler {
327 Some(ref handler) => {
328 // If the customizable panic handler itself panics,
329 // then we abort.
330 let abort_guard = unwind::AbortIfPanic;
331 handler(err);
332 mem::forget(abort_guard);
333 }
334 None => {
335 // Default panic handler aborts.
336 let _ = unwind::AbortIfPanic; // let this drop.
337 }
338 }
339 }
340
341 /// Waits for the worker threads to get up and running. This is
342 /// meant to be used for benchmarking purposes, primarily, so that
343 /// you can get more consistent numbers by having everything
344 /// "ready to go".
345 pub(super) fn wait_until_primed(&self) {
346 for info in &self.thread_infos {
347 info.primed.wait();
348 }
349 }
350
351 /// Waits for the worker threads to stop. This is used for testing
352 /// -- so we can check that termination actually works.
353 #[cfg(test)]
354 pub(super) fn wait_until_stopped(&self) {
355 for info in &self.thread_infos {
356 info.stopped.wait();
357 }
358 }
359
360 /// ////////////////////////////////////////////////////////////////////////
361 /// MAIN LOOP
362 ///
363 /// So long as all of the worker threads are hanging out in their
364 /// top-level loop, there is no work to be done.
365
366 /// Push a job into the given `registry`. If we are running on a
367 /// worker thread for the registry, this will push onto the
368 /// deque. Else, it will inject from the outside (which is slower).
369 pub(super) fn inject_or_push(&self, job_ref: JobRef) {
370 let worker_thread = WorkerThread::current();
371 unsafe {
372 if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
373 (*worker_thread).push(job_ref);
374 } else {
375 self.inject(&[job_ref]);
376 }
377 }
378 }
379
380 /// Push a job into the "external jobs" queue; it will be taken by
381 /// whatever worker has nothing to do. Use this is you know that
382 /// you are not on a worker of this registry.
383 pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
384 self.log(|| JobsInjected {
385 count: injected_jobs.len(),
386 });
387
388 // It should not be possible for `state.terminate` to be true
389 // here. It is only set to true when the user creates (and
390 // drops) a `ThreadPool`; and, in that case, they cannot be
391 // calling `inject()` later, since they dropped their
392 // `ThreadPool`.
393 debug_assert_ne!(
394 self.terminate_count.load(Ordering::Acquire),
395 0,
396 "inject() sees state.terminate as true"
397 );
398
399 let queue_was_empty = self.injected_jobs.is_empty();
400
401 for &job_ref in injected_jobs {
402 self.injected_jobs.push(job_ref);
403 }
404
405 self.sleep
406 .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
407 }
408
409 fn has_injected_job(&self) -> bool {
410 !self.injected_jobs.is_empty()
411 }
412
413 fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
414 loop {
415 match self.injected_jobs.steal() {
416 Steal::Success(job) => {
417 self.log(|| JobUninjected {
418 worker: worker_index,
419 });
420 return Some(job);
421 }
422 Steal::Empty => return None,
423 Steal::Retry => {}
424 }
425 }
426 }
427
428 /// If already in a worker-thread of this registry, just execute `op`.
429 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
430 /// completes and return its return value. If `op` panics, that panic will
431 /// be propagated as well. The second argument indicates `true` if injection
432 /// was performed, `false` if executed directly.
433 pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
434 where
435 OP: FnOnce(&WorkerThread, bool) -> R + Send,
436 R: Send,
437 {
438 unsafe {
439 let worker_thread = WorkerThread::current();
440 if worker_thread.is_null() {
441 self.in_worker_cold(op)
442 } else if (*worker_thread).registry().id() != self.id() {
443 self.in_worker_cross(&*worker_thread, op)
444 } else {
445 // Perfectly valid to give them a `&T`: this is the
446 // current thread, so we know the data structure won't be
447 // invalidated until we return.
448 op(&*worker_thread, false)
449 }
450 }
451 }
452
453 #[cold]
454 unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
455 where
456 OP: FnOnce(&WorkerThread, bool) -> R + Send,
457 R: Send,
458 {
459 thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
460
461 LOCK_LATCH.with(|l| {
462 // This thread isn't a member of *any* thread pool, so just block.
463 debug_assert!(WorkerThread::current().is_null());
464 let job = StackJob::new(
465 |injected| {
466 let worker_thread = WorkerThread::current();
467 assert!(injected && !worker_thread.is_null());
468 op(&*worker_thread, true)
469 },
470 l,
471 );
472 self.inject(&[job.as_job_ref()]);
473 job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
474
475 // flush accumulated logs as we exit the thread
476 self.logger.log(|| Flush);
477
478 job.into_result()
479 })
480 }
481
482 #[cold]
483 unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
484 where
485 OP: FnOnce(&WorkerThread, bool) -> R + Send,
486 R: Send,
487 {
488 // This thread is a member of a different pool, so let it process
489 // other work while waiting for this `op` to complete.
490 debug_assert!(current_thread.registry().id() != self.id());
491 let latch = SpinLatch::cross(current_thread);
492 let job = StackJob::new(
493 |injected| {
494 let worker_thread = WorkerThread::current();
495 assert!(injected && !worker_thread.is_null());
496 op(&*worker_thread, true)
497 },
498 latch,
499 );
500 self.inject(&[job.as_job_ref()]);
501 current_thread.wait_until(&job.latch);
502 job.into_result()
503 }
504
505 /// Increments the terminate counter. This increment should be
506 /// balanced by a call to `terminate`, which will decrement. This
507 /// is used when spawning asynchronous work, which needs to
508 /// prevent the registry from terminating so long as it is active.
509 ///
510 /// Note that blocking functions such as `join` and `scope` do not
511 /// need to concern themselves with this fn; their context is
512 /// responsible for ensuring the current thread-pool will not
513 /// terminate until they return.
514 ///
515 /// The global thread-pool always has an outstanding reference
516 /// (the initial one). Custom thread-pools have one outstanding
517 /// reference that is dropped when the `ThreadPool` is dropped:
518 /// since installing the thread-pool blocks until any joins/scopes
519 /// complete, this ensures that joins/scopes are covered.
520 ///
521 /// The exception is `::spawn()`, which can create a job outside
522 /// of any blocking scope. In that case, the job itself holds a
523 /// terminate count and is responsible for invoking `terminate()`
524 /// when finished.
525 pub(super) fn increment_terminate_count(&self) {
526 let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
527 debug_assert!(previous != 0, "registry ref count incremented from zero");
528 assert!(
529 previous != std::usize::MAX,
530 "overflow in registry ref count"
531 );
532 }
533
534 /// Signals that the thread-pool which owns this registry has been
535 /// dropped. The worker threads will gradually terminate, once any
536 /// extant work is completed.
537 pub(super) fn terminate(&self) {
538 if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
539 for (i, thread_info) in self.thread_infos.iter().enumerate() {
540 thread_info.terminate.set_and_tickle_one(self, i);
541 }
542 }
543 }
544
545 /// Notify the worker that the latch they are sleeping on has been "set".
546 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
547 self.sleep.notify_worker_latch_is_set(target_worker_index);
548 }
549 }
550
551 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
552 pub(super) struct RegistryId {
553 addr: usize,
554 }
555
556 struct ThreadInfo {
557 /// Latch set once thread has started and we are entering into the
558 /// main loop. Used to wait for worker threads to become primed,
559 /// primarily of interest for benchmarking.
560 primed: LockLatch,
561
562 /// Latch is set once worker thread has completed. Used to wait
563 /// until workers have stopped; only used for tests.
564 stopped: LockLatch,
565
566 /// The latch used to signal that terminated has been requested.
567 /// This latch is *set* by the `terminate` method on the
568 /// `Registry`, once the registry's main "terminate" counter
569 /// reaches zero.
570 ///
571 /// NB. We use a `CountLatch` here because it has no lifetimes and is
572 /// meant for async use, but the count never gets higher than one.
573 terminate: CountLatch,
574
575 /// the "stealer" half of the worker's deque
576 stealer: Stealer<JobRef>,
577 }
578
579 impl ThreadInfo {
580 fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
581 ThreadInfo {
582 primed: LockLatch::new(),
583 stopped: LockLatch::new(),
584 terminate: CountLatch::new(),
585 stealer,
586 }
587 }
588 }
589
590 /// ////////////////////////////////////////////////////////////////////////
591 /// WorkerThread identifiers
592
593 pub(super) struct WorkerThread {
594 /// the "worker" half of our local deque
595 worker: Worker<JobRef>,
596
597 /// local queue used for `spawn_fifo` indirection
598 fifo: JobFifo,
599
600 index: usize,
601
602 /// A weak random number generator.
603 rng: XorShift64Star,
604
605 registry: Arc<Registry>,
606 }
607
608 // This is a bit sketchy, but basically: the WorkerThread is
609 // allocated on the stack of the worker on entry and stored into this
610 // thread local variable. So it will remain valid at least until the
611 // worker is fully unwound. Using an unsafe pointer avoids the need
612 // for a RefCell<T> etc.
613 thread_local! {
614 static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
615 }
616
617 impl Drop for WorkerThread {
618 fn drop(&mut self) {
619 // Undo `set_current`
620 WORKER_THREAD_STATE.with(|t| {
621 assert!(t.get().eq(&(self as *const _)));
622 t.set(ptr::null());
623 });
624 }
625 }
626
627 impl WorkerThread {
628 /// Gets the `WorkerThread` index for the current thread; returns
629 /// NULL if this is not a worker thread. This pointer is valid
630 /// anywhere on the current thread.
631 #[inline]
632 pub(super) fn current() -> *const WorkerThread {
633 WORKER_THREAD_STATE.with(Cell::get)
634 }
635
636 /// Sets `self` as the worker thread index for the current thread.
637 /// This is done during worker thread startup.
638 unsafe fn set_current(thread: *const WorkerThread) {
639 WORKER_THREAD_STATE.with(|t| {
640 assert!(t.get().is_null());
641 t.set(thread);
642 });
643 }
644
645 /// Returns the registry that owns this worker thread.
646 #[inline]
647 pub(super) fn registry(&self) -> &Arc<Registry> {
648 &self.registry
649 }
650
651 #[inline]
652 pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
653 self.registry.logger.log(event)
654 }
655
656 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
657 #[inline]
658 pub(super) fn index(&self) -> usize {
659 self.index
660 }
661
662 #[inline]
663 pub(super) unsafe fn push(&self, job: JobRef) {
664 self.log(|| JobPushed { worker: self.index });
665 let queue_was_empty = self.worker.is_empty();
666 self.worker.push(job);
667 self.registry
668 .sleep
669 .new_internal_jobs(self.index, 1, queue_was_empty);
670 }
671
672 #[inline]
673 pub(super) unsafe fn push_fifo(&self, job: JobRef) {
674 self.push(self.fifo.push(job));
675 }
676
677 #[inline]
678 pub(super) fn local_deque_is_empty(&self) -> bool {
679 self.worker.is_empty()
680 }
681
682 /// Attempts to obtain a "local" job -- typically this means
683 /// popping from the top of the stack, though if we are configured
684 /// for breadth-first execution, it would mean dequeuing from the
685 /// bottom.
686 #[inline]
687 pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
688 let popped_job = self.worker.pop();
689
690 if popped_job.is_some() {
691 self.log(|| JobPopped { worker: self.index });
692 }
693
694 popped_job
695 }
696
697 /// Wait until the latch is set. Try to keep busy by popping and
698 /// stealing tasks as necessary.
699 #[inline]
700 pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
701 let latch = latch.as_core_latch();
702 if !latch.probe() {
703 self.wait_until_cold(latch);
704 }
705 }
706
707 #[cold]
708 unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
709 // the code below should swallow all panics and hence never
710 // unwind; but if something does wrong, we want to abort,
711 // because otherwise other code in rayon may assume that the
712 // latch has been signaled, and that can lead to random memory
713 // accesses, which would be *very bad*
714 let abort_guard = unwind::AbortIfPanic;
715
716 let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
717 while !latch.probe() {
718 // Try to find some work to do. We give preference first
719 // to things in our local deque, then in other workers
720 // deques, and finally to injected jobs from the
721 // outside. The idea is to finish what we started before
722 // we take on something new.
723 if let Some(job) = self
724 .take_local_job()
725 .or_else(|| self.steal())
726 .or_else(|| self.registry.pop_injected_job(self.index))
727 {
728 self.registry.sleep.work_found(idle_state);
729 self.execute(job);
730 idle_state = self.registry.sleep.start_looking(self.index, latch);
731 } else {
732 self.registry
733 .sleep
734 .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job())
735 }
736 }
737
738 // If we were sleepy, we are not anymore. We "found work" --
739 // whatever the surrounding thread was doing before it had to
740 // wait.
741 self.registry.sleep.work_found(idle_state);
742
743 self.log(|| ThreadSawLatchSet {
744 worker: self.index,
745 latch_addr: latch.addr(),
746 });
747 mem::forget(abort_guard); // successful execution, do not abort
748 }
749
750 #[inline]
751 pub(super) unsafe fn execute(&self, job: JobRef) {
752 job.execute();
753 }
754
755 /// Try to steal a single job and return it.
756 ///
757 /// This should only be done as a last resort, when there is no
758 /// local work to do.
759 unsafe fn steal(&self) -> Option<JobRef> {
760 // we only steal when we don't have any work to do locally
761 debug_assert!(self.local_deque_is_empty());
762
763 // otherwise, try to steal
764 let thread_infos = &self.registry.thread_infos.as_slice();
765 let num_threads = thread_infos.len();
766 if num_threads <= 1 {
767 return None;
768 }
769
770 loop {
771 let mut retry = false;
772 let start = self.rng.next_usize(num_threads);
773 let job = (start..num_threads)
774 .chain(0..start)
775 .filter(move |&i| i != self.index)
776 .find_map(|victim_index| {
777 let victim = &thread_infos[victim_index];
778 match victim.stealer.steal() {
779 Steal::Success(job) => {
780 self.log(|| JobStolen {
781 worker: self.index,
782 victim: victim_index,
783 });
784 Some(job)
785 }
786 Steal::Empty => None,
787 Steal::Retry => {
788 retry = true;
789 None
790 }
791 }
792 });
793 if job.is_some() || !retry {
794 return job;
795 }
796 }
797 }
798 }
799
800 /// ////////////////////////////////////////////////////////////////////////
801
802 unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
803 let worker_thread = &WorkerThread {
804 worker,
805 fifo: JobFifo::new(),
806 index,
807 rng: XorShift64Star::new(),
808 registry,
809 };
810 WorkerThread::set_current(worker_thread);
811 let registry = &*worker_thread.registry;
812
813 // let registry know we are ready to do work
814 registry.thread_infos[index].primed.set();
815
816 // Worker threads should not panic. If they do, just abort, as the
817 // internal state of the threadpool is corrupted. Note that if
818 // **user code** panics, we should catch that and redirect.
819 let abort_guard = unwind::AbortIfPanic;
820
821 // Inform a user callback that we started a thread.
822 if let Some(ref handler) = registry.start_handler {
823 match unwind::halt_unwinding(|| handler(index)) {
824 Ok(()) => {}
825 Err(err) => {
826 registry.handle_panic(err);
827 }
828 }
829 }
830
831 let my_terminate_latch = &registry.thread_infos[index].terminate;
832 worker_thread.log(|| ThreadStart {
833 worker: index,
834 terminate_addr: my_terminate_latch.as_core_latch().addr(),
835 });
836 worker_thread.wait_until(my_terminate_latch);
837
838 // Should not be any work left in our queue.
839 debug_assert!(worker_thread.take_local_job().is_none());
840
841 // let registry know we are done
842 registry.thread_infos[index].stopped.set();
843
844 // Normal termination, do not abort.
845 mem::forget(abort_guard);
846
847 worker_thread.log(|| ThreadTerminate { worker: index });
848
849 // Inform a user callback that we exited a thread.
850 if let Some(ref handler) = registry.exit_handler {
851 match unwind::halt_unwinding(|| handler(index)) {
852 Ok(()) => {}
853 Err(err) => {
854 registry.handle_panic(err);
855 }
856 }
857 // We're already exiting the thread, there's nothing else to do.
858 }
859 }
860
861 /// If already in a worker-thread, just execute `op`. Otherwise,
862 /// execute `op` in the default thread-pool. Either way, block until
863 /// `op` completes and return its return value. If `op` panics, that
864 /// panic will be propagated as well. The second argument indicates
865 /// `true` if injection was performed, `false` if executed directly.
866 pub(super) fn in_worker<OP, R>(op: OP) -> R
867 where
868 OP: FnOnce(&WorkerThread, bool) -> R + Send,
869 R: Send,
870 {
871 unsafe {
872 let owner_thread = WorkerThread::current();
873 if !owner_thread.is_null() {
874 // Perfectly valid to give them a `&T`: this is the
875 // current thread, so we know the data structure won't be
876 // invalidated until we return.
877 op(&*owner_thread, false)
878 } else {
879 global_registry().in_worker_cold(op)
880 }
881 }
882 }
883
884 /// [xorshift*] is a fast pseudorandom number generator which will
885 /// even tolerate weak seeding, as long as it's not zero.
886 ///
887 /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
888 struct XorShift64Star {
889 state: Cell<u64>,
890 }
891
892 impl XorShift64Star {
893 fn new() -> Self {
894 // Any non-zero seed will do -- this uses the hash of a global counter.
895 let mut seed = 0;
896 while seed == 0 {
897 let mut hasher = DefaultHasher::new();
898 #[allow(deprecated)]
899 static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
900 hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
901 seed = hasher.finish();
902 }
903
904 XorShift64Star {
905 state: Cell::new(seed),
906 }
907 }
908
909 fn next(&self) -> u64 {
910 let mut x = self.state.get();
911 debug_assert_ne!(x, 0);
912 x ^= x >> 12;
913 x ^= x << 25;
914 x ^= x >> 27;
915 self.state.set(x);
916 x.wrapping_mul(0x2545_f491_4f6c_dd1d)
917 }
918
919 /// Return a value from `0..n`.
920 fn next_usize(&self, n: usize) -> usize {
921 (self.next() % n as u64) as usize
922 }
923 }