]> git.proxmox.com Git - rustc.git/blame - vendor/rustc-rayon-core/src/registry.rs
New upstream version 1.52.0~beta.3+dfsg1
[rustc.git] / vendor / rustc-rayon-core / src / registry.rs
CommitLineData
6a06907d
XL
1use crate::job::{JobFifo, JobRef, StackJob};
2use crate::latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch};
3use crate::log::Event::*;
4use crate::sleep::Sleep;
5use crate::unwind;
6use crate::util::leak;
7use crate::{
8 AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
9 ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
10};
e74abb32
XL
11use crossbeam_deque::{Steal, Stealer, Worker};
12use crossbeam_queue::SegQueue;
94b46f34 13use std::any::Any;
532ac7d7
XL
14use std::cell::Cell;
15use std::collections::hash_map::DefaultHasher;
e74abb32 16use std::fmt;
532ac7d7 17use std::hash::Hasher;
e74abb32 18use std::io;
532ac7d7 19use std::mem;
e74abb32
XL
20use std::ptr;
21#[allow(deprecated)]
22use std::sync::atomic::ATOMIC_USIZE_INIT;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use std::sync::{Arc, Once};
94b46f34 25use std::thread;
94b46f34 26use std::usize;
e74abb32
XL
27
28/// Thread builder used for customization via
29/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
30pub struct ThreadBuilder {
31 name: Option<String>,
32 stack_size: Option<usize>,
33 worker: Worker<JobRef>,
34 registry: Arc<Registry>,
35 index: usize,
36}
37
38impl ThreadBuilder {
39 /// Get the index of this thread in the pool, within `0..num_threads`.
40 pub fn index(&self) -> usize {
41 self.index
42 }
43
44 /// Get the string that was specified by `ThreadPoolBuilder::name()`.
45 pub fn name(&self) -> Option<&str> {
46 self.name.as_ref().map(String::as_str)
47 }
48
49 /// Get the value that was specified by `ThreadPoolBuilder::stack_size()`.
50 pub fn stack_size(&self) -> Option<usize> {
51 self.stack_size
52 }
53
54 /// Execute the main loop for this thread. This will not return until the
55 /// thread pool is dropped.
56 pub fn run(self) {
57 unsafe { main_loop(self.worker, self.registry, self.index) }
58 }
59}
60
61impl fmt::Debug for ThreadBuilder {
6a06907d 62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
e74abb32
XL
63 f.debug_struct("ThreadBuilder")
64 .field("pool", &self.registry.id())
65 .field("index", &self.index)
66 .field("name", &self.name)
67 .field("stack_size", &self.stack_size)
68 .finish()
69 }
70}
71
72/// Generalized trait for spawning a thread in the `Registry`.
73///
74/// This trait is pub-in-private -- E0445 forces us to make it public,
75/// but we don't actually want to expose these details in the API.
76pub trait ThreadSpawn {
77 private_decl! {}
78
79 /// Spawn a thread with the `ThreadBuilder` parameters, and then
80 /// call `ThreadBuilder::run()`.
6a06907d 81 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
e74abb32
XL
82}
83
84/// Spawns a thread in the "normal" way with `std::thread::Builder`.
85///
86/// This type is pub-in-private -- E0445 forces us to make it public,
87/// but we don't actually want to expose these details in the API.
88#[derive(Debug, Default)]
89pub struct DefaultSpawn;
90
91impl ThreadSpawn for DefaultSpawn {
92 private_impl! {}
93
94 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
95 let mut b = thread::Builder::new();
96 if let Some(name) = thread.name() {
97 b = b.name(name.to_owned());
98 }
99 if let Some(stack_size) = thread.stack_size() {
100 b = b.stack_size(stack_size);
101 }
102 b.spawn(|| thread.run())?;
103 Ok(())
104 }
105}
106
107/// Spawns a thread with a user's custom callback.
108///
109/// This type is pub-in-private -- E0445 forces us to make it public,
110/// but we don't actually want to expose these details in the API.
111#[derive(Debug)]
112pub struct CustomSpawn<F>(F);
113
114impl<F> CustomSpawn<F>
115where
116 F: FnMut(ThreadBuilder) -> io::Result<()>,
117{
118 pub(super) fn new(spawn: F) -> Self {
119 CustomSpawn(spawn)
120 }
121}
122
123impl<F> ThreadSpawn for CustomSpawn<F>
124where
125 F: FnMut(ThreadBuilder) -> io::Result<()>,
126{
127 private_impl! {}
128
129 #[inline]
130 fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
131 (self.0)(thread)
132 }
133}
94b46f34
XL
134
135pub struct Registry {
136 thread_infos: Vec<ThreadInfo>,
94b46f34 137 sleep: Sleep,
e74abb32 138 injected_jobs: SegQueue<JobRef>,
94b46f34 139 panic_handler: Option<Box<PanicHandler>>,
532ac7d7 140 pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
94b46f34
XL
141 start_handler: Option<Box<StartHandler>>,
142 exit_handler: Option<Box<ExitHandler>>,
532ac7d7
XL
143 pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
144 pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>,
94b46f34
XL
145
146 // When this latch reaches 0, it means that all work on this
147 // registry must be complete. This is ensured in the following ways:
148 //
149 // - if this is the global registry, there is a ref-count that never
150 // gets released.
151 // - if this is a user-created thread-pool, then so long as the thread-pool
152 // exists, it holds a reference.
153 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
154 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
155 // return until the blocking job is complete, that ref will continue to be held.
156 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
157 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
158 // and that job will keep the pool alive.
159 terminate_latch: CountLatch,
160}
161
94b46f34
XL
162/// ////////////////////////////////////////////////////////////////////////
163/// Initialization
164
165static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None;
e74abb32 166static THE_REGISTRY_SET: Once = Once::new();
94b46f34
XL
167
168/// Starts the worker threads (if that has not already happened). If
169/// initialization has not already occurred, use the default
170/// configuration.
171fn global_registry() -> &'static Arc<Registry> {
e74abb32
XL
172 set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
173 .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) })
174 .expect("The global thread pool has not been initialized.")
94b46f34
XL
175}
176
177/// Starts the worker threads (if that has not already happened) with
178/// the given builder.
e74abb32
XL
179pub(super) fn init_global_registry<S>(
180 builder: ThreadPoolBuilder<S>,
181) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
182where
183 S: ThreadSpawn,
184{
185 set_global_registry(|| Registry::new(builder))
94b46f34
XL
186}
187
e74abb32
XL
188/// Starts the worker threads (if that has not already happened)
189/// by creating a registry with the given callback.
190fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
191where
192 F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
193{
194 let mut result = Err(ThreadPoolBuildError::new(
195 ErrorKind::GlobalPoolAlreadyInitialized,
196 ));
197 THE_REGISTRY_SET.call_once(|| {
198 result = registry().map(|registry| {
199 let registry = leak(registry);
200 unsafe {
201 THE_REGISTRY = Some(registry);
202 }
203 registry
204 });
205 });
206 result
94b46f34
XL
207}
208
209struct Terminator<'a>(&'a Arc<Registry>);
210
211impl<'a> Drop for Terminator<'a> {
212 fn drop(&mut self) {
213 self.0.terminate()
214 }
215}
216
217impl Registry {
e74abb32
XL
218 pub(super) fn new<S>(
219 mut builder: ThreadPoolBuilder<S>,
220 ) -> Result<Arc<Self>, ThreadPoolBuildError>
221 where
222 S: ThreadSpawn,
223 {
94b46f34
XL
224 let n_threads = builder.get_num_threads();
225 let breadth_first = builder.get_breadth_first();
226
e74abb32
XL
227 let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
228 .map(|_| {
229 let worker = if breadth_first {
230 Worker::new_fifo()
231 } else {
232 Worker::new_lifo()
233 };
234
235 let stealer = worker.stealer();
236 (worker, stealer)
237 })
238 .unzip();
94b46f34
XL
239
240 let registry = Arc::new(Registry {
e74abb32 241 thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
94b46f34 242 sleep: Sleep::new(n_threads),
e74abb32 243 injected_jobs: SegQueue::new(),
94b46f34
XL
244 terminate_latch: CountLatch::new(),
245 panic_handler: builder.take_panic_handler(),
246 deadlock_handler: builder.take_deadlock_handler(),
247 start_handler: builder.take_start_handler(),
94b46f34 248 exit_handler: builder.take_exit_handler(),
532ac7d7
XL
249 acquire_thread_handler: builder.take_acquire_thread_handler(),
250 release_thread_handler: builder.take_release_thread_handler(),
94b46f34
XL
251 });
252
253 // If we return early or panic, make sure to terminate existing threads.
254 let t1000 = Terminator(&registry);
255
256 for (index, worker) in workers.into_iter().enumerate() {
e74abb32
XL
257 let thread = ThreadBuilder {
258 name: builder.get_thread_name(index),
259 stack_size: builder.get_stack_size(),
260 registry: registry.clone(),
261 worker,
262 index,
263 };
264 if let Err(e) = builder.get_spawn_handler().spawn(thread) {
532ac7d7 265 return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
94b46f34
XL
266 }
267 }
268
269 // Returning normally now, without termination.
270 mem::forget(t1000);
271
272 Ok(registry.clone())
273 }
274
94b46f34
XL
275 pub fn current() -> Arc<Registry> {
276 unsafe {
277 let worker_thread = WorkerThread::current();
278 if worker_thread.is_null() {
279 global_registry().clone()
280 } else {
281 (*worker_thread).registry.clone()
282 }
283 }
284 }
285
286 /// Returns the number of threads in the current registry. This
287 /// is better than `Registry::current().num_threads()` because it
288 /// avoids incrementing the `Arc`.
e74abb32 289 pub(super) fn current_num_threads() -> usize {
94b46f34
XL
290 unsafe {
291 let worker_thread = WorkerThread::current();
292 if worker_thread.is_null() {
293 global_registry().num_threads()
294 } else {
295 (*worker_thread).registry.num_threads()
296 }
297 }
298 }
299
e74abb32
XL
300 /// Returns the current `WorkerThread` if it's part of this `Registry`.
301 pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
302 unsafe {
303 let worker = WorkerThread::current().as_ref()?;
304 if worker.registry().id() == self.id() {
305 Some(worker)
306 } else {
307 None
308 }
309 }
310 }
311
94b46f34 312 /// Returns an opaque identifier for this registry.
e74abb32 313 pub(super) fn id(&self) -> RegistryId {
94b46f34
XL
314 // We can rely on `self` not to change since we only ever create
315 // registries that are boxed up in an `Arc` (see `new()` above).
532ac7d7
XL
316 RegistryId {
317 addr: self as *const Self as usize,
318 }
94b46f34
XL
319 }
320
e74abb32 321 pub(super) fn num_threads(&self) -> usize {
94b46f34
XL
322 self.thread_infos.len()
323 }
324
e74abb32 325 pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
94b46f34
XL
326 match self.panic_handler {
327 Some(ref handler) => {
328 // If the customizable panic handler itself panics,
329 // then we abort.
330 let abort_guard = unwind::AbortIfPanic;
331 handler(err);
332 mem::forget(abort_guard);
333 }
334 None => {
335 // Default panic handler aborts.
336 let _ = unwind::AbortIfPanic; // let this drop.
337 }
338 }
339 }
340
341 /// Waits for the worker threads to get up and running. This is
342 /// meant to be used for benchmarking purposes, primarily, so that
343 /// you can get more consistent numbers by having everything
344 /// "ready to go".
e74abb32 345 pub(super) fn wait_until_primed(&self) {
94b46f34
XL
346 for info in &self.thread_infos {
347 info.primed.wait();
348 }
349 }
350
351 /// Waits for the worker threads to stop. This is used for testing
352 /// -- so we can check that termination actually works.
e74abb32 353 pub(super) fn wait_until_stopped(&self) {
532ac7d7 354 self.release_thread();
94b46f34
XL
355 for info in &self.thread_infos {
356 info.stopped.wait();
357 }
532ac7d7
XL
358 self.acquire_thread();
359 }
360
361 pub(crate) fn acquire_thread(&self) {
362 if let Some(ref acquire_thread_handler) = self.acquire_thread_handler {
363 acquire_thread_handler();
364 }
365 }
366
367 pub(crate) fn release_thread(&self) {
368 if let Some(ref release_thread_handler) = self.release_thread_handler {
369 release_thread_handler();
370 }
94b46f34
XL
371 }
372
373 /// ////////////////////////////////////////////////////////////////////////
374 /// MAIN LOOP
375 ///
376 /// So long as all of the worker threads are hanging out in their
377 /// top-level loop, there is no work to be done.
378
379 /// Push a job into the given `registry`. If we are running on a
380 /// worker thread for the registry, this will push onto the
381 /// deque. Else, it will inject from the outside (which is slower).
e74abb32 382 pub(super) fn inject_or_push(&self, job_ref: JobRef) {
94b46f34
XL
383 let worker_thread = WorkerThread::current();
384 unsafe {
385 if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
386 (*worker_thread).push(job_ref);
387 } else {
388 self.inject(&[job_ref]);
389 }
390 }
391 }
392
94b46f34
XL
393 /// Push a job into the "external jobs" queue; it will be taken by
394 /// whatever worker has nothing to do. Use this is you know that
395 /// you are not on a worker of this registry.
e74abb32 396 pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
532ac7d7
XL
397 log!(InjectJobs {
398 count: injected_jobs.len()
399 });
94b46f34 400
e74abb32
XL
401 // It should not be possible for `state.terminate` to be true
402 // here. It is only set to true when the user creates (and
403 // drops) a `ThreadPool`; and, in that case, they cannot be
404 // calling `inject()` later, since they dropped their
405 // `ThreadPool`.
406 assert!(
407 !self.terminate_latch.probe(),
408 "inject() sees state.terminate as true"
409 );
410
411 for &job_ref in injected_jobs {
412 self.injected_jobs.push(job_ref);
94b46f34
XL
413 }
414 self.sleep.tickle(usize::MAX);
415 }
416
417 fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
e74abb32
XL
418 let job = self.injected_jobs.pop().ok();
419 if job.is_some() {
420 log!(UninjectedWork {
421 worker: worker_index
422 });
94b46f34 423 }
e74abb32 424 job
94b46f34
XL
425 }
426
427 /// If already in a worker-thread of this registry, just execute `op`.
428 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
429 /// completes and return its return value. If `op` panics, that panic will
430 /// be propagated as well. The second argument indicates `true` if injection
431 /// was performed, `false` if executed directly.
e74abb32 432 pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
532ac7d7
XL
433 where
434 OP: FnOnce(&WorkerThread, bool) -> R + Send,
435 R: Send,
94b46f34
XL
436 {
437 unsafe {
438 let worker_thread = WorkerThread::current();
439 if worker_thread.is_null() {
440 self.in_worker_cold(op)
441 } else if (*worker_thread).registry().id() != self.id() {
442 self.in_worker_cross(&*worker_thread, op)
443 } else {
444 // Perfectly valid to give them a `&T`: this is the
445 // current thread, so we know the data structure won't be
446 // invalidated until we return.
447 op(&*worker_thread, false)
448 }
449 }
450 }
451
452 #[cold]
453 unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
532ac7d7
XL
454 where
455 OP: FnOnce(&WorkerThread, bool) -> R + Send,
456 R: Send,
94b46f34 457 {
e74abb32
XL
458 thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
459
460 LOCK_LATCH.with(|l| {
461 // This thread isn't a member of *any* thread pool, so just block.
462 debug_assert!(WorkerThread::current().is_null());
463 let job = StackJob::new(
464 0,
465 |injected| {
466 let worker_thread = WorkerThread::current();
467 assert!(injected && !worker_thread.is_null());
468 op(&*worker_thread, true)
469 },
470 l,
471 );
472 self.inject(&[job.as_job_ref()]);
473 self.release_thread();
474 job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
475 self.acquire_thread();
476 job.into_result()
477 })
94b46f34
XL
478 }
479
480 #[cold]
481 unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
532ac7d7
XL
482 where
483 OP: FnOnce(&WorkerThread, bool) -> R + Send,
484 R: Send,
94b46f34
XL
485 {
486 // This thread is a member of a different pool, so let it process
487 // other work while waiting for this `op` to complete.
488 debug_assert!(current_thread.registry().id() != self.id());
489 let latch = TickleLatch::new(SpinLatch::new(), &current_thread.registry().sleep);
532ac7d7 490 let job = StackJob::new(
48663c56 491 0,
532ac7d7
XL
492 |injected| {
493 let worker_thread = WorkerThread::current();
494 assert!(injected && !worker_thread.is_null());
495 op(&*worker_thread, true)
496 },
497 latch,
498 );
94b46f34
XL
499 self.inject(&[job.as_job_ref()]);
500 current_thread.wait_until(&job.latch);
501 job.into_result()
502 }
503
504 /// Increment the terminate counter. This increment should be
505 /// balanced by a call to `terminate`, which will decrement. This
506 /// is used when spawning asynchronous work, which needs to
507 /// prevent the registry from terminating so long as it is active.
508 ///
509 /// Note that blocking functions such as `join` and `scope` do not
510 /// need to concern themselves with this fn; their context is
511 /// responsible for ensuring the current thread-pool will not
512 /// terminate until they return.
513 ///
514 /// The global thread-pool always has an outstanding reference
515 /// (the initial one). Custom thread-pools have one outstanding
516 /// reference that is dropped when the `ThreadPool` is dropped:
517 /// since installing the thread-pool blocks until any joins/scopes
518 /// complete, this ensures that joins/scopes are covered.
519 ///
520 /// The exception is `::spawn()`, which can create a job outside
521 /// of any blocking scope. In that case, the job itself holds a
522 /// terminate count and is responsible for invoking `terminate()`
523 /// when finished.
e74abb32 524 pub(super) fn increment_terminate_count(&self) {
94b46f34
XL
525 self.terminate_latch.increment();
526 }
527
528 /// Signals that the thread-pool which owns this registry has been
529 /// dropped. The worker threads will gradually terminate, once any
530 /// extant work is completed.
e74abb32 531 pub(super) fn terminate(&self) {
94b46f34
XL
532 self.terminate_latch.set();
533 self.sleep.tickle(usize::MAX);
534 }
535}
536
537/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
538/// if no other worker thread is active
539#[inline]
540pub fn mark_blocked() {
541 let worker_thread = WorkerThread::current();
542 assert!(!worker_thread.is_null());
543 unsafe {
544 let registry = &(*worker_thread).registry;
545 registry.sleep.mark_blocked(&registry.deadlock_handler)
546 }
547}
548
549/// Mark a previously blocked Rayon worker thread as unblocked
550#[inline]
551pub fn mark_unblocked(registry: &Registry) {
552 registry.sleep.mark_unblocked()
553}
554
555#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
e74abb32 556pub(super) struct RegistryId {
532ac7d7 557 addr: usize,
94b46f34
XL
558}
559
94b46f34
XL
560struct ThreadInfo {
561 /// Latch set once thread has started and we are entering into the
562 /// main loop. Used to wait for worker threads to become primed,
563 /// primarily of interest for benchmarking.
564 primed: LockLatch,
565
566 /// Latch is set once worker thread has completed. Used to wait
567 /// until workers have stopped; only used for tests.
568 stopped: LockLatch,
569
570 /// the "stealer" half of the worker's deque
571 stealer: Stealer<JobRef>,
572}
573
574impl ThreadInfo {
575 fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
576 ThreadInfo {
577 primed: LockLatch::new(),
578 stopped: LockLatch::new(),
e74abb32 579 stealer,
94b46f34
XL
580 }
581 }
582}
583
584/// ////////////////////////////////////////////////////////////////////////
585/// WorkerThread identifiers
586
e74abb32 587pub(super) struct WorkerThread {
94b46f34 588 /// the "worker" half of our local deque
e74abb32 589 worker: Worker<JobRef>,
94b46f34 590
e74abb32
XL
591 /// local queue used for `spawn_fifo` indirection
592 fifo: JobFifo,
94b46f34 593
e74abb32 594 pub(crate) index: usize,
94b46f34
XL
595
596 /// A weak random number generator.
532ac7d7 597 rng: XorShift64Star,
94b46f34
XL
598
599 pub(crate) registry: Arc<Registry>,
600}
601
602// This is a bit sketchy, but basically: the WorkerThread is
603// allocated on the stack of the worker on entry and stored into this
604// thread local variable. So it will remain valid at least until the
605// worker is fully unwound. Using an unsafe pointer avoids the need
606// for a RefCell<T> etc.
607thread_local! {
e74abb32
XL
608 static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
609}
610
611impl Drop for WorkerThread {
612 fn drop(&mut self) {
613 // Undo `set_current`
614 WORKER_THREAD_STATE.with(|t| {
615 assert!(t.get().eq(&(self as *const _)));
616 t.set(ptr::null());
617 });
618 }
94b46f34
XL
619}
620
621impl WorkerThread {
622 /// Gets the `WorkerThread` index for the current thread; returns
623 /// NULL if this is not a worker thread. This pointer is valid
624 /// anywhere on the current thread.
625 #[inline]
e74abb32
XL
626 pub(super) fn current() -> *const WorkerThread {
627 WORKER_THREAD_STATE.with(Cell::get)
94b46f34
XL
628 }
629
630 /// Sets `self` as the worker thread index for the current thread.
631 /// This is done during worker thread startup.
632 unsafe fn set_current(thread: *const WorkerThread) {
633 WORKER_THREAD_STATE.with(|t| {
634 assert!(t.get().is_null());
635 t.set(thread);
636 });
637 }
638
639 /// Returns the registry that owns this worker thread.
e74abb32 640 pub(super) fn registry(&self) -> &Arc<Registry> {
94b46f34
XL
641 &self.registry
642 }
643
644 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
645 #[inline]
e74abb32 646 pub(super) fn index(&self) -> usize {
94b46f34
XL
647 self.index
648 }
649
650 #[inline]
e74abb32 651 pub(super) unsafe fn push(&self, job: JobRef) {
94b46f34
XL
652 self.worker.push(job);
653 self.registry.sleep.tickle(self.index);
654 }
655
656 #[inline]
e74abb32
XL
657 pub(super) unsafe fn push_fifo(&self, job: JobRef) {
658 self.push(self.fifo.push(job));
659 }
660
661 #[inline]
662 pub(super) fn local_deque_is_empty(&self) -> bool {
663 self.worker.is_empty()
94b46f34
XL
664 }
665
666 /// Attempts to obtain a "local" job -- typically this means
667 /// popping from the top of the stack, though if we are configured
668 /// for breadth-first execution, it would mean dequeuing from the
669 /// bottom.
670 #[inline]
e74abb32
XL
671 pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
672 self.worker.pop()
94b46f34
XL
673 }
674
675 /// Wait until the latch is set. Try to keep busy by popping and
676 /// stealing tasks as necessary.
677 #[inline]
e74abb32 678 pub(super) unsafe fn wait_until<L: LatchProbe + ?Sized>(&self, latch: &L) {
94b46f34
XL
679 log!(WaitUntil { worker: self.index });
680 if !latch.probe() {
681 self.wait_until_cold(latch);
682 }
683 }
684
685 #[cold]
686 unsafe fn wait_until_cold<L: LatchProbe + ?Sized>(&self, latch: &L) {
687 // the code below should swallow all panics and hence never
688 // unwind; but if something does wrong, we want to abort,
689 // because otherwise other code in rayon may assume that the
690 // latch has been signaled, and that can lead to random memory
691 // accesses, which would be *very bad*
692 let abort_guard = unwind::AbortIfPanic;
693
694 let mut yields = 0;
695 while !latch.probe() {
696 // Try to find some work to do. We give preference first
697 // to things in our local deque, then in other workers
698 // deques, and finally to injected jobs from the
699 // outside. The idea is to finish what we started before
700 // we take on something new.
532ac7d7
XL
701 if let Some(job) = self
702 .take_local_job()
703 .or_else(|| self.steal())
704 .or_else(|| self.registry.pop_injected_job(self.index))
705 {
94b46f34
XL
706 yields = self.registry.sleep.work_found(self.index, yields);
707 self.execute(job);
708 } else {
e74abb32
XL
709 yields = self
710 .registry
711 .sleep
712 .no_work_found(self.index, yields, &self.registry);
94b46f34
XL
713 }
714 }
715
716 // If we were sleepy, we are not anymore. We "found work" --
717 // whatever the surrounding thread was doing before it had to
718 // wait.
719 self.registry.sleep.work_found(self.index, yields);
720
721 log!(LatchSet { worker: self.index });
722 mem::forget(abort_guard); // successful execution, do not abort
723 }
724
e74abb32 725 pub(super) unsafe fn execute(&self, job: JobRef) {
94b46f34
XL
726 job.execute();
727
728 // Subtle: executing this job will have `set()` some of its
729 // latches. This may mean that a sleepy (or sleeping) worker
730 // can now make progress. So we have to tickle them to let
731 // them know.
732 self.registry.sleep.tickle(self.index);
733 }
734
735 /// Try to steal a single job and return it.
736 ///
737 /// This should only be done as a last resort, when there is no
738 /// local work to do.
739 unsafe fn steal(&self) -> Option<JobRef> {
740 // we only steal when we don't have any work to do locally
e74abb32 741 debug_assert!(self.local_deque_is_empty());
94b46f34
XL
742
743 // otherwise, try to steal
744 let num_threads = self.registry.thread_infos.len();
745 if num_threads <= 1 {
746 return None;
747 }
532ac7d7
XL
748
749 let start = self.rng.next_usize(num_threads);
750 (start..num_threads)
751 .chain(0..start)
94b46f34
XL
752 .filter(|&i| i != self.index)
753 .filter_map(|victim_index| {
754 let victim = &self.registry.thread_infos[victim_index];
755 loop {
756 match victim.stealer.steal() {
757 Steal::Empty => return None,
e74abb32 758 Steal::Success(d) => {
94b46f34
XL
759 log!(StoleWork {
760 worker: self.index,
761 victim: victim_index
762 });
763 return Some(d);
532ac7d7
XL
764 }
765 Steal::Retry => {}
94b46f34
XL
766 }
767 }
768 })
769 .next()
770 }
771}
772
773/// ////////////////////////////////////////////////////////////////////////
774
e74abb32
XL
775unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
776 let worker_thread = &WorkerThread {
777 worker,
778 fifo: JobFifo::new(),
779 index,
532ac7d7 780 rng: XorShift64Star::new(),
94b46f34
XL
781 registry: registry.clone(),
782 };
e74abb32 783 WorkerThread::set_current(worker_thread);
94b46f34
XL
784
785 // let registry know we are ready to do work
786 registry.thread_infos[index].primed.set();
787
788 // Worker threads should not panic. If they do, just abort, as the
789 // internal state of the threadpool is corrupted. Note that if
790 // **user code** panics, we should catch that and redirect.
791 let abort_guard = unwind::AbortIfPanic;
792
793 // Inform a user callback that we started a thread.
794 if let Some(ref handler) = registry.start_handler {
795 let registry = registry.clone();
796 match unwind::halt_unwinding(|| handler(index)) {
532ac7d7 797 Ok(()) => {}
94b46f34
XL
798 Err(err) => {
799 registry.handle_panic(err);
800 }
801 }
802 }
803
532ac7d7 804 registry.acquire_thread();
e74abb32 805 worker_thread.wait_until(&registry.terminate_latch);
94b46f34
XL
806
807 // Should not be any work left in our queue.
808 debug_assert!(worker_thread.take_local_job().is_none());
809
810 // let registry know we are done
811 registry.thread_infos[index].stopped.set();
812
813 // Normal termination, do not abort.
814 mem::forget(abort_guard);
815
816 // Inform a user callback that we exited a thread.
817 if let Some(ref handler) = registry.exit_handler {
818 let registry = registry.clone();
819 match unwind::halt_unwinding(|| handler(index)) {
532ac7d7 820 Ok(()) => {}
94b46f34
XL
821 Err(err) => {
822 registry.handle_panic(err);
823 }
824 }
825 // We're already exiting the thread, there's nothing else to do.
826 }
532ac7d7
XL
827
828 registry.release_thread();
94b46f34
XL
829}
830
831/// If already in a worker-thread, just execute `op`. Otherwise,
832/// execute `op` in the default thread-pool. Either way, block until
833/// `op` completes and return its return value. If `op` panics, that
834/// panic will be propagated as well. The second argument indicates
835/// `true` if injection was performed, `false` if executed directly.
e74abb32 836pub(super) fn in_worker<OP, R>(op: OP) -> R
532ac7d7
XL
837where
838 OP: FnOnce(&WorkerThread, bool) -> R + Send,
839 R: Send,
94b46f34
XL
840{
841 unsafe {
842 let owner_thread = WorkerThread::current();
843 if !owner_thread.is_null() {
844 // Perfectly valid to give them a `&T`: this is the
845 // current thread, so we know the data structure won't be
846 // invalidated until we return.
847 op(&*owner_thread, false)
848 } else {
849 global_registry().in_worker_cold(op)
850 }
851 }
852}
532ac7d7
XL
853
854/// [xorshift*] is a fast pseudorandom number generator which will
855/// even tolerate weak seeding, as long as it's not zero.
856///
857/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
858struct XorShift64Star {
859 state: Cell<u64>,
860}
861
862impl XorShift64Star {
863 fn new() -> Self {
864 // Any non-zero seed will do -- this uses the hash of a global counter.
865 let mut seed = 0;
866 while seed == 0 {
867 let mut hasher = DefaultHasher::new();
e74abb32 868 #[allow(deprecated)]
532ac7d7
XL
869 static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
870 hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
871 seed = hasher.finish();
872 }
873
874 XorShift64Star {
875 state: Cell::new(seed),
876 }
877 }
878
879 fn next(&self) -> u64 {
880 let mut x = self.state.get();
881 debug_assert_ne!(x, 0);
882 x ^= x >> 12;
883 x ^= x << 25;
884 x ^= x >> 27;
885 self.state.set(x);
886 x.wrapping_mul(0x2545_f491_4f6c_dd1d)
887 }
888
889 /// Return a value from `0..n`.
890 fn next_usize(&self, n: usize) -> usize {
891 (self.next() % n as u64) as usize
892 }
893}