]> git.proxmox.com Git - rustc.git/blame - vendor/rayon-core/src/registry.rs
New upstream version 1.48.0~beta.8+dfsg1
[rustc.git] / vendor / rayon-core / src / registry.rs
CommitLineData
f035d41b 1use crate::job::{JobFifo, JobRef, StackJob};
1b1a35ee 2use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
f035d41b 3use crate::log::Event::*;
1b1a35ee 4use crate::log::Logger;
f035d41b
XL
5use crate::sleep::Sleep;
6use crate::unwind;
7use crate::util::leak;
8use crate::{
9 ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
10};
1b1a35ee 11use crossbeam_deque::{Injector, Steal, Stealer, Worker};
2c00a5a8 12use std::any::Any;
dc9dc135
XL
13use std::cell::Cell;
14use std::collections::hash_map::DefaultHasher;
416331ca 15use std::fmt;
dc9dc135 16use std::hash::Hasher;
416331ca 17use std::io;
2c00a5a8 18use std::mem;
416331ca
XL
19use std::ptr;
20#[allow(deprecated)]
21use std::sync::atomic::ATOMIC_USIZE_INIT;
22use std::sync::atomic::{AtomicUsize, Ordering};
e74abb32 23use std::sync::{Arc, Once};
416331ca 24use std::thread;
2c00a5a8 25use std::usize;
416331ca
XL
26
27/// Thread builder used for customization via
28/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
29pub struct ThreadBuilder {
30 name: Option<String>,
31 stack_size: Option<usize>,
32 worker: Worker<JobRef>,
33 registry: Arc<Registry>,
34 index: usize,
35}
36
37impl ThreadBuilder {
f035d41b 38 /// Gets the index of this thread in the pool, within `0..num_threads`.
416331ca
XL
39 pub fn index(&self) -> usize {
40 self.index
41 }
42
f035d41b 43 /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
416331ca
XL
44 pub fn name(&self) -> Option<&str> {
45 self.name.as_ref().map(String::as_str)
46 }
47
f035d41b 48 /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
416331ca
XL
49 pub fn stack_size(&self) -> Option<usize> {
50 self.stack_size
51 }
52
f035d41b 53 /// Executes the main loop for this thread. This will not return until the
416331ca
XL
54 /// thread pool is dropped.
55 pub fn run(self) {
56 unsafe { main_loop(self.worker, self.registry, self.index) }
57 }
58}
59
60impl fmt::Debug for ThreadBuilder {
f035d41b 61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
416331ca
XL
62 f.debug_struct("ThreadBuilder")
63 .field("pool", &self.registry.id())
64 .field("index", &self.index)
65 .field("name", &self.name)
66 .field("stack_size", &self.stack_size)
67 .finish()
68 }
69}
2c00a5a8 70
416331ca
XL
71/// Generalized trait for spawning a thread in the `Registry`.
72///
73/// This trait is pub-in-private -- E0445 forces us to make it public,
74/// but we don't actually want to expose these details in the API.
75pub trait ThreadSpawn {
76 private_decl! {}
77
78 /// Spawn a thread with the `ThreadBuilder` parameters, and then
79 /// call `ThreadBuilder::run()`.
f035d41b 80 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
416331ca
XL
81}
82
83/// Spawns a thread in the "normal" way with `std::thread::Builder`.
84///
85/// This type is pub-in-private -- E0445 forces us to make it public,
86/// but we don't actually want to expose these details in the API.
87#[derive(Debug, Default)]
88pub struct DefaultSpawn;
89
90impl ThreadSpawn for DefaultSpawn {
91 private_impl! {}
92
93 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
94 let mut b = thread::Builder::new();
95 if let Some(name) = thread.name() {
96 b = b.name(name.to_owned());
97 }
98 if let Some(stack_size) = thread.stack_size() {
99 b = b.stack_size(stack_size);
100 }
101 b.spawn(|| thread.run())?;
102 Ok(())
103 }
104}
105
106/// Spawns a thread with a user's custom callback.
107///
108/// This type is pub-in-private -- E0445 forces us to make it public,
109/// but we don't actually want to expose these details in the API.
110#[derive(Debug)]
111pub struct CustomSpawn<F>(F);
112
113impl<F> CustomSpawn<F>
114where
115 F: FnMut(ThreadBuilder) -> io::Result<()>,
116{
117 pub(super) fn new(spawn: F) -> Self {
118 CustomSpawn(spawn)
119 }
120}
121
122impl<F> ThreadSpawn for CustomSpawn<F>
123where
124 F: FnMut(ThreadBuilder) -> io::Result<()>,
125{
126 private_impl! {}
127
128 #[inline]
129 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
130 (self.0)(thread)
131 }
132}
133
134pub(super) struct Registry {
1b1a35ee 135 logger: Logger,
2c00a5a8 136 thread_infos: Vec<ThreadInfo>,
2c00a5a8 137 sleep: Sleep,
1b1a35ee 138 injected_jobs: Injector<JobRef>,
2c00a5a8
XL
139 panic_handler: Option<Box<PanicHandler>>,
140 start_handler: Option<Box<StartHandler>>,
141 exit_handler: Option<Box<ExitHandler>>,
142
143 // When this latch reaches 0, it means that all work on this
144 // registry must be complete. This is ensured in the following ways:
145 //
146 // - if this is the global registry, there is a ref-count that never
147 // gets released.
148 // - if this is a user-created thread-pool, then so long as the thread-pool
149 // exists, it holds a reference.
150 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
151 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
152 // return until the blocking job is complete, that ref will continue to be held.
153 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
154 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
155 // and that job will keep the pool alive.
1b1a35ee 156 terminate_count: AtomicUsize,
2c00a5a8
XL
157}
158
2c00a5a8
XL
159/// ////////////////////////////////////////////////////////////////////////
160/// Initialization
161
162static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None;
e74abb32 163static THE_REGISTRY_SET: Once = Once::new();
2c00a5a8
XL
164
165/// Starts the worker threads (if that has not already happened). If
166/// initialization has not already occurred, use the default
167/// configuration.
168fn global_registry() -> &'static Arc<Registry> {
416331ca
XL
169 set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
170 .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) })
171 .expect("The global thread pool has not been initialized.")
2c00a5a8
XL
172}
173
174/// Starts the worker threads (if that has not already happened) with
175/// the given builder.
416331ca
XL
176pub(super) fn init_global_registry<S>(
177 builder: ThreadPoolBuilder<S>,
178) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
179where
180 S: ThreadSpawn,
181{
182 set_global_registry(|| Registry::new(builder))
2c00a5a8
XL
183}
184
416331ca
XL
185/// Starts the worker threads (if that has not already happened)
186/// by creating a registry with the given callback.
187fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
188where
189 F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
190{
191 let mut result = Err(ThreadPoolBuildError::new(
192 ErrorKind::GlobalPoolAlreadyInitialized,
193 ));
194 THE_REGISTRY_SET.call_once(|| {
195 result = registry().map(|registry| {
196 let registry = leak(registry);
197 unsafe {
198 THE_REGISTRY = Some(registry);
199 }
200 registry
201 });
202 });
203 result
2c00a5a8
XL
204}
205
206struct Terminator<'a>(&'a Arc<Registry>);
207
208impl<'a> Drop for Terminator<'a> {
209 fn drop(&mut self) {
210 self.0.terminate()
211 }
212}
213
214impl Registry {
416331ca
XL
215 pub(super) fn new<S>(
216 mut builder: ThreadPoolBuilder<S>,
217 ) -> Result<Arc<Self>, ThreadPoolBuildError>
218 where
219 S: ThreadSpawn,
220 {
2c00a5a8
XL
221 let n_threads = builder.get_num_threads();
222 let breadth_first = builder.get_breadth_first();
223
416331ca
XL
224 let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
225 .map(|_| {
e74abb32
XL
226 let worker = if breadth_first {
227 Worker::new_fifo()
416331ca 228 } else {
e74abb32
XL
229 Worker::new_lifo()
230 };
231
232 let stealer = worker.stealer();
233 (worker, stealer)
416331ca
XL
234 })
235 .unzip();
2c00a5a8 236
1b1a35ee 237 let logger = Logger::new(n_threads);
2c00a5a8 238 let registry = Arc::new(Registry {
1b1a35ee 239 logger: logger.clone(),
416331ca 240 thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
1b1a35ee
XL
241 sleep: Sleep::new(logger, n_threads),
242 injected_jobs: Injector::new(),
243 terminate_count: AtomicUsize::new(1),
2c00a5a8
XL
244 panic_handler: builder.take_panic_handler(),
245 start_handler: builder.take_start_handler(),
246 exit_handler: builder.take_exit_handler(),
247 });
248
249 // If we return early or panic, make sure to terminate existing threads.
250 let t1000 = Terminator(&registry);
251
252 for (index, worker) in workers.into_iter().enumerate() {
416331ca
XL
253 let thread = ThreadBuilder {
254 name: builder.get_thread_name(index),
255 stack_size: builder.get_stack_size(),
256 registry: registry.clone(),
257 worker,
258 index,
259 };
260 if let Err(e) = builder.get_spawn_handler().spawn(thread) {
261 return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
2c00a5a8
XL
262 }
263 }
264
265 // Returning normally now, without termination.
266 mem::forget(t1000);
267
268 Ok(registry.clone())
269 }
270
416331ca 271 pub(super) fn current() -> Arc<Registry> {
2c00a5a8
XL
272 unsafe {
273 let worker_thread = WorkerThread::current();
274 if worker_thread.is_null() {
275 global_registry().clone()
276 } else {
277 (*worker_thread).registry.clone()
278 }
279 }
280 }
281
282 /// Returns the number of threads in the current registry. This
283 /// is better than `Registry::current().num_threads()` because it
284 /// avoids incrementing the `Arc`.
416331ca 285 pub(super) fn current_num_threads() -> usize {
2c00a5a8
XL
286 unsafe {
287 let worker_thread = WorkerThread::current();
288 if worker_thread.is_null() {
289 global_registry().num_threads()
290 } else {
291 (*worker_thread).registry.num_threads()
292 }
293 }
294 }
295
416331ca
XL
296 /// Returns the current `WorkerThread` if it's part of this `Registry`.
297 pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
298 unsafe {
299 let worker = WorkerThread::current().as_ref()?;
300 if worker.registry().id() == self.id() {
301 Some(worker)
302 } else {
303 None
304 }
305 }
306 }
2c00a5a8
XL
307
308 /// Returns an opaque identifier for this registry.
416331ca 309 pub(super) fn id(&self) -> RegistryId {
2c00a5a8
XL
310 // We can rely on `self` not to change since we only ever create
311 // registries that are boxed up in an `Arc` (see `new()` above).
416331ca
XL
312 RegistryId {
313 addr: self as *const Self as usize,
314 }
2c00a5a8
XL
315 }
316
1b1a35ee
XL
317 #[inline]
318 pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
319 self.logger.log(event)
320 }
321
416331ca 322 pub(super) fn num_threads(&self) -> usize {
2c00a5a8
XL
323 self.thread_infos.len()
324 }
325
e74abb32 326 pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
2c00a5a8
XL
327 match self.panic_handler {
328 Some(ref handler) => {
329 // If the customizable panic handler itself panics,
330 // then we abort.
331 let abort_guard = unwind::AbortIfPanic;
332 handler(err);
333 mem::forget(abort_guard);
334 }
335 None => {
336 // Default panic handler aborts.
337 let _ = unwind::AbortIfPanic; // let this drop.
338 }
339 }
340 }
341
342 /// Waits for the worker threads to get up and running. This is
343 /// meant to be used for benchmarking purposes, primarily, so that
344 /// you can get more consistent numbers by having everything
345 /// "ready to go".
416331ca 346 pub(super) fn wait_until_primed(&self) {
2c00a5a8
XL
347 for info in &self.thread_infos {
348 info.primed.wait();
349 }
350 }
351
352 /// Waits for the worker threads to stop. This is used for testing
353 /// -- so we can check that termination actually works.
354 #[cfg(test)]
416331ca 355 pub(super) fn wait_until_stopped(&self) {
2c00a5a8
XL
356 for info in &self.thread_infos {
357 info.stopped.wait();
358 }
359 }
360
361 /// ////////////////////////////////////////////////////////////////////////
362 /// MAIN LOOP
363 ///
364 /// So long as all of the worker threads are hanging out in their
365 /// top-level loop, there is no work to be done.
366
367 /// Push a job into the given `registry`. If we are running on a
368 /// worker thread for the registry, this will push onto the
369 /// deque. Else, it will inject from the outside (which is slower).
416331ca 370 pub(super) fn inject_or_push(&self, job_ref: JobRef) {
2c00a5a8
XL
371 let worker_thread = WorkerThread::current();
372 unsafe {
373 if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
374 (*worker_thread).push(job_ref);
375 } else {
376 self.inject(&[job_ref]);
377 }
378 }
379 }
380
2c00a5a8
XL
381 /// Push a job into the "external jobs" queue; it will be taken by
382 /// whatever worker has nothing to do. Use this is you know that
383 /// you are not on a worker of this registry.
416331ca 384 pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
1b1a35ee
XL
385 self.log(|| JobsInjected {
386 count: injected_jobs.len(),
416331ca
XL
387 });
388
389 // It should not be possible for `state.terminate` to be true
390 // here. It is only set to true when the user creates (and
391 // drops) a `ThreadPool`; and, in that case, they cannot be
392 // calling `inject()` later, since they dropped their
393 // `ThreadPool`.
1b1a35ee
XL
394 debug_assert_ne!(
395 self.terminate_count.load(Ordering::Acquire),
396 0,
416331ca
XL
397 "inject() sees state.terminate as true"
398 );
399
1b1a35ee
XL
400 let queue_was_empty = self.injected_jobs.is_empty();
401
416331ca
XL
402 for &job_ref in injected_jobs {
403 self.injected_jobs.push(job_ref);
2c00a5a8 404 }
1b1a35ee
XL
405
406 self.sleep
407 .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
408 }
409
410 fn has_injected_job(&self) -> bool {
411 !self.injected_jobs.is_empty()
2c00a5a8
XL
412 }
413
414 fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
1b1a35ee
XL
415 loop {
416 match self.injected_jobs.steal() {
417 Steal::Success(job) => {
418 self.log(|| JobUninjected {
419 worker: worker_index,
420 });
421 return Some(job);
422 }
423 Steal::Empty => return None,
424 Steal::Retry => {}
425 }
2c00a5a8
XL
426 }
427 }
428
429 /// If already in a worker-thread of this registry, just execute `op`.
430 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
431 /// completes and return its return value. If `op` panics, that panic will
432 /// be propagated as well. The second argument indicates `true` if injection
433 /// was performed, `false` if executed directly.
416331ca
XL
434 pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
435 where
436 OP: FnOnce(&WorkerThread, bool) -> R + Send,
437 R: Send,
2c00a5a8
XL
438 {
439 unsafe {
440 let worker_thread = WorkerThread::current();
441 if worker_thread.is_null() {
442 self.in_worker_cold(op)
443 } else if (*worker_thread).registry().id() != self.id() {
444 self.in_worker_cross(&*worker_thread, op)
445 } else {
446 // Perfectly valid to give them a `&T`: this is the
447 // current thread, so we know the data structure won't be
448 // invalidated until we return.
449 op(&*worker_thread, false)
450 }
451 }
452 }
453
454 #[cold]
455 unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
416331ca
XL
456 where
457 OP: FnOnce(&WorkerThread, bool) -> R + Send,
458 R: Send,
2c00a5a8 459 {
e74abb32
XL
460 thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
461
462 LOCK_LATCH.with(|l| {
463 // This thread isn't a member of *any* thread pool, so just block.
464 debug_assert!(WorkerThread::current().is_null());
465 let job = StackJob::new(
466 |injected| {
467 let worker_thread = WorkerThread::current();
468 assert!(injected && !worker_thread.is_null());
469 op(&*worker_thread, true)
470 },
471 l,
472 );
473 self.inject(&[job.as_job_ref()]);
474 job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
1b1a35ee
XL
475
476 // flush accumulated logs as we exit the thread
477 self.logger.log(|| Flush);
478
e74abb32
XL
479 job.into_result()
480 })
2c00a5a8
XL
481 }
482
483 #[cold]
484 unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
416331ca
XL
485 where
486 OP: FnOnce(&WorkerThread, bool) -> R + Send,
487 R: Send,
2c00a5a8
XL
488 {
489 // This thread is a member of a different pool, so let it process
490 // other work while waiting for this `op` to complete.
491 debug_assert!(current_thread.registry().id() != self.id());
1b1a35ee 492 let latch = SpinLatch::cross(current_thread);
416331ca
XL
493 let job = StackJob::new(
494 |injected| {
495 let worker_thread = WorkerThread::current();
496 assert!(injected && !worker_thread.is_null());
497 op(&*worker_thread, true)
498 },
499 latch,
500 );
2c00a5a8
XL
501 self.inject(&[job.as_job_ref()]);
502 current_thread.wait_until(&job.latch);
503 job.into_result()
504 }
505
f035d41b 506 /// Increments the terminate counter. This increment should be
2c00a5a8
XL
507 /// balanced by a call to `terminate`, which will decrement. This
508 /// is used when spawning asynchronous work, which needs to
509 /// prevent the registry from terminating so long as it is active.
510 ///
511 /// Note that blocking functions such as `join` and `scope` do not
512 /// need to concern themselves with this fn; their context is
513 /// responsible for ensuring the current thread-pool will not
514 /// terminate until they return.
515 ///
516 /// The global thread-pool always has an outstanding reference
517 /// (the initial one). Custom thread-pools have one outstanding
518 /// reference that is dropped when the `ThreadPool` is dropped:
519 /// since installing the thread-pool blocks until any joins/scopes
520 /// complete, this ensures that joins/scopes are covered.
521 ///
522 /// The exception is `::spawn()`, which can create a job outside
523 /// of any blocking scope. In that case, the job itself holds a
524 /// terminate count and is responsible for invoking `terminate()`
525 /// when finished.
416331ca 526 pub(super) fn increment_terminate_count(&self) {
1b1a35ee
XL
527 let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
528 debug_assert!(previous != 0, "registry ref count incremented from zero");
529 assert!(
530 previous != std::usize::MAX,
531 "overflow in registry ref count"
532 );
2c00a5a8
XL
533 }
534
535 /// Signals that the thread-pool which owns this registry has been
536 /// dropped. The worker threads will gradually terminate, once any
537 /// extant work is completed.
416331ca 538 pub(super) fn terminate(&self) {
1b1a35ee
XL
539 if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
540 for (i, thread_info) in self.thread_infos.iter().enumerate() {
541 thread_info.terminate.set_and_tickle_one(self, i);
542 }
543 }
2c00a5a8 544 }
f035d41b 545
1b1a35ee
XL
546 /// Notify the worker that the latch they are sleeping on has been "set".
547 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
548 self.sleep.notify_worker_latch_is_set(target_worker_index);
f035d41b 549 }
2c00a5a8
XL
550}
551
552#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
416331ca
XL
553pub(super) struct RegistryId {
554 addr: usize,
2c00a5a8
XL
555}
556
557struct ThreadInfo {
558 /// Latch set once thread has started and we are entering into the
559 /// main loop. Used to wait for worker threads to become primed,
560 /// primarily of interest for benchmarking.
561 primed: LockLatch,
562
563 /// Latch is set once worker thread has completed. Used to wait
564 /// until workers have stopped; only used for tests.
565 stopped: LockLatch,
566
1b1a35ee
XL
567 /// The latch used to signal that terminated has been requested.
568 /// This latch is *set* by the `terminate` method on the
569 /// `Registry`, once the registry's main "terminate" counter
570 /// reaches zero.
571 ///
572 /// NB. We use a `CountLatch` here because it has no lifetimes and is
573 /// meant for async use, but the count never gets higher than one.
574 terminate: CountLatch,
575
2c00a5a8
XL
576 /// the "stealer" half of the worker's deque
577 stealer: Stealer<JobRef>,
578}
579
580impl ThreadInfo {
581 fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
582 ThreadInfo {
583 primed: LockLatch::new(),
584 stopped: LockLatch::new(),
1b1a35ee 585 terminate: CountLatch::new(),
416331ca 586 stealer,
2c00a5a8
XL
587 }
588 }
589}
590
591/// ////////////////////////////////////////////////////////////////////////
592/// WorkerThread identifiers
593
416331ca 594pub(super) struct WorkerThread {
2c00a5a8 595 /// the "worker" half of our local deque
416331ca 596 worker: Worker<JobRef>,
2c00a5a8 597
416331ca
XL
598 /// local queue used for `spawn_fifo` indirection
599 fifo: JobFifo,
2c00a5a8 600
416331ca 601 index: usize,
2c00a5a8
XL
602
603 /// A weak random number generator.
dc9dc135 604 rng: XorShift64Star,
2c00a5a8
XL
605
606 registry: Arc<Registry>,
607}
608
609// This is a bit sketchy, but basically: the WorkerThread is
610// allocated on the stack of the worker on entry and stored into this
611// thread local variable. So it will remain valid at least until the
612// worker is fully unwound. Using an unsafe pointer avoids the need
613// for a RefCell<T> etc.
614thread_local! {
416331ca
XL
615 static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
616}
617
618impl Drop for WorkerThread {
619 fn drop(&mut self) {
620 // Undo `set_current`
621 WORKER_THREAD_STATE.with(|t| {
622 assert!(t.get().eq(&(self as *const _)));
623 t.set(ptr::null());
624 });
625 }
2c00a5a8
XL
626}
627
628impl WorkerThread {
629 /// Gets the `WorkerThread` index for the current thread; returns
630 /// NULL if this is not a worker thread. This pointer is valid
631 /// anywhere on the current thread.
632 #[inline]
416331ca
XL
633 pub(super) fn current() -> *const WorkerThread {
634 WORKER_THREAD_STATE.with(Cell::get)
2c00a5a8
XL
635 }
636
637 /// Sets `self` as the worker thread index for the current thread.
638 /// This is done during worker thread startup.
639 unsafe fn set_current(thread: *const WorkerThread) {
640 WORKER_THREAD_STATE.with(|t| {
641 assert!(t.get().is_null());
642 t.set(thread);
643 });
644 }
645
646 /// Returns the registry that owns this worker thread.
1b1a35ee 647 #[inline]
416331ca 648 pub(super) fn registry(&self) -> &Arc<Registry> {
2c00a5a8
XL
649 &self.registry
650 }
651
1b1a35ee
XL
652 #[inline]
653 pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
654 self.registry.logger.log(event)
655 }
656
2c00a5a8
XL
657 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
658 #[inline]
416331ca 659 pub(super) fn index(&self) -> usize {
2c00a5a8
XL
660 self.index
661 }
662
663 #[inline]
416331ca 664 pub(super) unsafe fn push(&self, job: JobRef) {
1b1a35ee
XL
665 self.log(|| JobPushed { worker: self.index });
666 let queue_was_empty = self.worker.is_empty();
2c00a5a8 667 self.worker.push(job);
1b1a35ee
XL
668 self.registry
669 .sleep
670 .new_internal_jobs(self.index, 1, queue_was_empty);
2c00a5a8
XL
671 }
672
673 #[inline]
416331ca
XL
674 pub(super) unsafe fn push_fifo(&self, job: JobRef) {
675 self.push(self.fifo.push(job));
676 }
677
678 #[inline]
679 pub(super) fn local_deque_is_empty(&self) -> bool {
680 self.worker.is_empty()
2c00a5a8
XL
681 }
682
683 /// Attempts to obtain a "local" job -- typically this means
684 /// popping from the top of the stack, though if we are configured
685 /// for breadth-first execution, it would mean dequeuing from the
686 /// bottom.
687 #[inline]
416331ca 688 pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
1b1a35ee
XL
689 let popped_job = self.worker.pop();
690
691 if popped_job.is_some() {
692 self.log(|| JobPopped { worker: self.index });
693 }
694
695 popped_job
2c00a5a8
XL
696 }
697
698 /// Wait until the latch is set. Try to keep busy by popping and
699 /// stealing tasks as necessary.
700 #[inline]
1b1a35ee
XL
701 pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
702 let latch = latch.as_core_latch();
2c00a5a8
XL
703 if !latch.probe() {
704 self.wait_until_cold(latch);
705 }
706 }
707
708 #[cold]
1b1a35ee 709 unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
2c00a5a8
XL
710 // the code below should swallow all panics and hence never
711 // unwind; but if something does wrong, we want to abort,
712 // because otherwise other code in rayon may assume that the
713 // latch has been signaled, and that can lead to random memory
714 // accesses, which would be *very bad*
715 let abort_guard = unwind::AbortIfPanic;
716
1b1a35ee 717 let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
2c00a5a8
XL
718 while !latch.probe() {
719 // Try to find some work to do. We give preference first
720 // to things in our local deque, then in other workers
721 // deques, and finally to injected jobs from the
722 // outside. The idea is to finish what we started before
723 // we take on something new.
416331ca
XL
724 if let Some(job) = self
725 .take_local_job()
726 .or_else(|| self.steal())
727 .or_else(|| self.registry.pop_injected_job(self.index))
728 {
1b1a35ee 729 self.registry.sleep.work_found(idle_state);
2c00a5a8 730 self.execute(job);
1b1a35ee 731 idle_state = self.registry.sleep.start_looking(self.index, latch);
2c00a5a8 732 } else {
1b1a35ee
XL
733 self.registry
734 .sleep
735 .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job())
2c00a5a8
XL
736 }
737 }
738
739 // If we were sleepy, we are not anymore. We "found work" --
740 // whatever the surrounding thread was doing before it had to
741 // wait.
1b1a35ee 742 self.registry.sleep.work_found(idle_state);
2c00a5a8 743
1b1a35ee
XL
744 self.log(|| ThreadSawLatchSet {
745 worker: self.index,
746 latch_addr: latch.addr(),
747 });
2c00a5a8
XL
748 mem::forget(abort_guard); // successful execution, do not abort
749 }
750
1b1a35ee 751 #[inline]
416331ca 752 pub(super) unsafe fn execute(&self, job: JobRef) {
2c00a5a8 753 job.execute();
2c00a5a8
XL
754 }
755
756 /// Try to steal a single job and return it.
757 ///
758 /// This should only be done as a last resort, when there is no
759 /// local work to do.
760 unsafe fn steal(&self) -> Option<JobRef> {
761 // we only steal when we don't have any work to do locally
416331ca 762 debug_assert!(self.local_deque_is_empty());
2c00a5a8
XL
763
764 // otherwise, try to steal
1b1a35ee
XL
765 let thread_infos = &self.registry.thread_infos.as_slice();
766 let num_threads = thread_infos.len();
2c00a5a8
XL
767 if num_threads <= 1 {
768 return None;
769 }
dc9dc135 770
1b1a35ee
XL
771 loop {
772 let mut retry = false;
773 let start = self.rng.next_usize(num_threads);
774 let job = (start..num_threads)
775 .chain(0..start)
776 .filter(move |&i| i != self.index)
777 .find_map(|victim_index| {
778 let victim = &thread_infos[victim_index];
2c00a5a8 779 match victim.stealer.steal() {
1b1a35ee
XL
780 Steal::Success(job) => {
781 self.log(|| JobStolen {
2c00a5a8 782 worker: self.index,
1b1a35ee 783 victim: victim_index,
2c00a5a8 784 });
1b1a35ee
XL
785 Some(job)
786 }
787 Steal::Empty => None,
788 Steal::Retry => {
789 retry = true;
790 None
416331ca 791 }
2c00a5a8 792 }
1b1a35ee
XL
793 });
794 if job.is_some() || !retry {
795 return job;
796 }
797 }
2c00a5a8
XL
798 }
799}
800
801/// ////////////////////////////////////////////////////////////////////////
802
416331ca
XL
803unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
804 let worker_thread = &WorkerThread {
805 worker,
806 fifo: JobFifo::new(),
807 index,
dc9dc135 808 rng: XorShift64Star::new(),
2c00a5a8
XL
809 registry: registry.clone(),
810 };
416331ca 811 WorkerThread::set_current(worker_thread);
2c00a5a8
XL
812
813 // let registry know we are ready to do work
814 registry.thread_infos[index].primed.set();
815
816 // Worker threads should not panic. If they do, just abort, as the
817 // internal state of the threadpool is corrupted. Note that if
818 // **user code** panics, we should catch that and redirect.
819 let abort_guard = unwind::AbortIfPanic;
820
821 // Inform a user callback that we started a thread.
822 if let Some(ref handler) = registry.start_handler {
823 let registry = registry.clone();
824 match unwind::halt_unwinding(|| handler(index)) {
416331ca 825 Ok(()) => {}
2c00a5a8
XL
826 Err(err) => {
827 registry.handle_panic(err);
828 }
829 }
830 }
831
1b1a35ee
XL
832 let my_terminate_latch = &registry.thread_infos[index].terminate;
833 worker_thread.log(|| ThreadStart {
834 worker: index,
835 terminate_addr: my_terminate_latch.as_core_latch().addr(),
836 });
837 worker_thread.wait_until(my_terminate_latch);
2c00a5a8
XL
838
839 // Should not be any work left in our queue.
840 debug_assert!(worker_thread.take_local_job().is_none());
841
842 // let registry know we are done
843 registry.thread_infos[index].stopped.set();
844
845 // Normal termination, do not abort.
846 mem::forget(abort_guard);
847
1b1a35ee
XL
848 worker_thread.log(|| ThreadTerminate { worker: index });
849
2c00a5a8
XL
850 // Inform a user callback that we exited a thread.
851 if let Some(ref handler) = registry.exit_handler {
852 let registry = registry.clone();
853 match unwind::halt_unwinding(|| handler(index)) {
416331ca 854 Ok(()) => {}
2c00a5a8
XL
855 Err(err) => {
856 registry.handle_panic(err);
857 }
858 }
859 // We're already exiting the thread, there's nothing else to do.
860 }
861}
862
863/// If already in a worker-thread, just execute `op`. Otherwise,
864/// execute `op` in the default thread-pool. Either way, block until
865/// `op` completes and return its return value. If `op` panics, that
866/// panic will be propagated as well. The second argument indicates
867/// `true` if injection was performed, `false` if executed directly.
416331ca
XL
868pub(super) fn in_worker<OP, R>(op: OP) -> R
869where
870 OP: FnOnce(&WorkerThread, bool) -> R + Send,
871 R: Send,
2c00a5a8
XL
872{
873 unsafe {
874 let owner_thread = WorkerThread::current();
875 if !owner_thread.is_null() {
876 // Perfectly valid to give them a `&T`: this is the
877 // current thread, so we know the data structure won't be
878 // invalidated until we return.
879 op(&*owner_thread, false)
880 } else {
881 global_registry().in_worker_cold(op)
882 }
883 }
884}
dc9dc135
XL
885
886/// [xorshift*] is a fast pseudorandom number generator which will
887/// even tolerate weak seeding, as long as it's not zero.
888///
889/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
890struct XorShift64Star {
891 state: Cell<u64>,
892}
893
894impl XorShift64Star {
895 fn new() -> Self {
896 // Any non-zero seed will do -- this uses the hash of a global counter.
897 let mut seed = 0;
898 while seed == 0 {
899 let mut hasher = DefaultHasher::new();
416331ca 900 #[allow(deprecated)]
dc9dc135
XL
901 static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
902 hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
903 seed = hasher.finish();
904 }
905
906 XorShift64Star {
907 state: Cell::new(seed),
908 }
909 }
910
911 fn next(&self) -> u64 {
912 let mut x = self.state.get();
913 debug_assert_ne!(x, 0);
914 x ^= x >> 12;
915 x ^= x << 25;
916 x ^= x >> 27;
917 self.state.set(x);
918 x.wrapping_mul(0x2545_f491_4f6c_dd1d)
919 }
920
921 /// Return a value from `0..n`.
922 fn next_usize(&self, n: usize) -> usize {
923 (self.next() % n as u64) as usize
924 }
925}