]> git.proxmox.com Git - rustc.git/blob - vendor/rayon-core/src/registry.rs
New upstream version 1.37.0+dfsg1
[rustc.git] / vendor / rayon-core / src / registry.rs
1 use ::{ExitHandler, PanicHandler, StartHandler, ThreadPoolBuilder, ThreadPoolBuildError, ErrorKind};
2 use crossbeam_deque::{Deque, Steal, Stealer};
3 use job::{JobRef, StackJob};
4 #[cfg(rayon_unstable)]
5 use job::Job;
6 #[cfg(rayon_unstable)]
7 use internal::task::Task;
8 use latch::{LatchProbe, Latch, CountLatch, LockLatch, SpinLatch, TickleLatch};
9 use log::Event::*;
10 use sleep::Sleep;
11 use std::any::Any;
12 use std::cell::Cell;
13 use std::collections::hash_map::DefaultHasher;
14 use std::hash::Hasher;
15 use std::sync::{Arc, Mutex, Once, ONCE_INIT};
16 use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
17 use std::thread;
18 use std::mem;
19 use std::usize;
20 use unwind;
21 use util::leak;
22
23 pub struct Registry {
24 thread_infos: Vec<ThreadInfo>,
25 state: Mutex<RegistryState>,
26 sleep: Sleep,
27 job_uninjector: Stealer<JobRef>,
28 panic_handler: Option<Box<PanicHandler>>,
29 start_handler: Option<Box<StartHandler>>,
30 exit_handler: Option<Box<ExitHandler>>,
31
32 // When this latch reaches 0, it means that all work on this
33 // registry must be complete. This is ensured in the following ways:
34 //
35 // - if this is the global registry, there is a ref-count that never
36 // gets released.
37 // - if this is a user-created thread-pool, then so long as the thread-pool
38 // exists, it holds a reference.
39 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
40 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
41 // return until the blocking job is complete, that ref will continue to be held.
42 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
43 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
44 // and that job will keep the pool alive.
45 terminate_latch: CountLatch,
46 }
47
48 struct RegistryState {
49 job_injector: Deque<JobRef>,
50 }
51
52 /// ////////////////////////////////////////////////////////////////////////
53 /// Initialization
54
55 static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None;
56 static THE_REGISTRY_SET: Once = ONCE_INIT;
57
58 /// Starts the worker threads (if that has not already happened). If
59 /// initialization has not already occurred, use the default
60 /// configuration.
61 fn global_registry() -> &'static Arc<Registry> {
62 THE_REGISTRY_SET.call_once(|| unsafe { init_registry(ThreadPoolBuilder::new()).unwrap() });
63 unsafe { THE_REGISTRY.expect("The global thread pool has not been initialized.") }
64 }
65
66 /// Starts the worker threads (if that has not already happened) with
67 /// the given builder.
68 pub fn init_global_registry(builder: ThreadPoolBuilder) -> Result<&'static Registry, ThreadPoolBuildError> {
69 let mut called = false;
70 let mut init_result = Ok(());;
71 THE_REGISTRY_SET.call_once(|| unsafe {
72 init_result = init_registry(builder);
73 called = true;
74 });
75 if called {
76 init_result.map(|()| &**global_registry())
77 } else {
78 Err(ThreadPoolBuildError::new(ErrorKind::GlobalPoolAlreadyInitialized))
79 }
80 }
81
82 /// Initializes the global registry with the given builder.
83 /// Meant to be called from within the `THE_REGISTRY_SET` once
84 /// function. Declared `unsafe` because it writes to `THE_REGISTRY` in
85 /// an unsynchronized fashion.
86 unsafe fn init_registry(builder: ThreadPoolBuilder) -> Result<(), ThreadPoolBuildError> {
87 Registry::new(builder).map(|registry| THE_REGISTRY = Some(leak(registry)))
88 }
89
90 struct Terminator<'a>(&'a Arc<Registry>);
91
92 impl<'a> Drop for Terminator<'a> {
93 fn drop(&mut self) {
94 self.0.terminate()
95 }
96 }
97
98 impl Registry {
99 pub fn new(mut builder: ThreadPoolBuilder) -> Result<Arc<Registry>, ThreadPoolBuildError> {
100 let n_threads = builder.get_num_threads();
101 let breadth_first = builder.get_breadth_first();
102
103 let inj_worker = Deque::new();
104 let inj_stealer = inj_worker.stealer();
105 let workers: Vec<_> = (0..n_threads)
106 .map(|_| Deque::new())
107 .collect();
108 let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();
109
110 let registry = Arc::new(Registry {
111 thread_infos: stealers.into_iter()
112 .map(|s| ThreadInfo::new(s))
113 .collect(),
114 state: Mutex::new(RegistryState::new(inj_worker)),
115 sleep: Sleep::new(),
116 job_uninjector: inj_stealer,
117 terminate_latch: CountLatch::new(),
118 panic_handler: builder.take_panic_handler(),
119 start_handler: builder.take_start_handler(),
120 exit_handler: builder.take_exit_handler(),
121 });
122
123 // If we return early or panic, make sure to terminate existing threads.
124 let t1000 = Terminator(&registry);
125
126 for (index, worker) in workers.into_iter().enumerate() {
127 let registry = registry.clone();
128 let mut b = thread::Builder::new();
129 if let Some(name) = builder.get_thread_name(index) {
130 b = b.name(name);
131 }
132 if let Some(stack_size) = builder.get_stack_size() {
133 b = b.stack_size(stack_size);
134 }
135 if let Err(e) = b.spawn(move || unsafe { main_loop(worker, registry, index, breadth_first) }) {
136 return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)))
137 }
138 }
139
140 // Returning normally now, without termination.
141 mem::forget(t1000);
142
143 Ok(registry.clone())
144 }
145
146 #[cfg(rayon_unstable)]
147 pub fn global() -> Arc<Registry> {
148 global_registry().clone()
149 }
150
151 pub fn current() -> Arc<Registry> {
152 unsafe {
153 let worker_thread = WorkerThread::current();
154 if worker_thread.is_null() {
155 global_registry().clone()
156 } else {
157 (*worker_thread).registry.clone()
158 }
159 }
160 }
161
162 /// Returns the number of threads in the current registry. This
163 /// is better than `Registry::current().num_threads()` because it
164 /// avoids incrementing the `Arc`.
165 pub fn current_num_threads() -> usize {
166 unsafe {
167 let worker_thread = WorkerThread::current();
168 if worker_thread.is_null() {
169 global_registry().num_threads()
170 } else {
171 (*worker_thread).registry.num_threads()
172 }
173 }
174 }
175
176
177 /// Returns an opaque identifier for this registry.
178 pub fn id(&self) -> RegistryId {
179 // We can rely on `self` not to change since we only ever create
180 // registries that are boxed up in an `Arc` (see `new()` above).
181 RegistryId { addr: self as *const Self as usize }
182 }
183
184 pub fn num_threads(&self) -> usize {
185 self.thread_infos.len()
186 }
187
188 pub fn handle_panic(&self, err: Box<Any + Send>) {
189 match self.panic_handler {
190 Some(ref handler) => {
191 // If the customizable panic handler itself panics,
192 // then we abort.
193 let abort_guard = unwind::AbortIfPanic;
194 handler(err);
195 mem::forget(abort_guard);
196 }
197 None => {
198 // Default panic handler aborts.
199 let _ = unwind::AbortIfPanic; // let this drop.
200 }
201 }
202 }
203
204 /// Waits for the worker threads to get up and running. This is
205 /// meant to be used for benchmarking purposes, primarily, so that
206 /// you can get more consistent numbers by having everything
207 /// "ready to go".
208 pub fn wait_until_primed(&self) {
209 for info in &self.thread_infos {
210 info.primed.wait();
211 }
212 }
213
214 /// Waits for the worker threads to stop. This is used for testing
215 /// -- so we can check that termination actually works.
216 #[cfg(test)]
217 pub fn wait_until_stopped(&self) {
218 for info in &self.thread_infos {
219 info.stopped.wait();
220 }
221 }
222
223 /// ////////////////////////////////////////////////////////////////////////
224 /// MAIN LOOP
225 ///
226 /// So long as all of the worker threads are hanging out in their
227 /// top-level loop, there is no work to be done.
228
229 /// Push a job into the given `registry`. If we are running on a
230 /// worker thread for the registry, this will push onto the
231 /// deque. Else, it will inject from the outside (which is slower).
232 pub fn inject_or_push(&self, job_ref: JobRef) {
233 let worker_thread = WorkerThread::current();
234 unsafe {
235 if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
236 (*worker_thread).push(job_ref);
237 } else {
238 self.inject(&[job_ref]);
239 }
240 }
241 }
242
243 /// Unsafe: the caller must guarantee that `task` will stay valid
244 /// until it executes.
245 #[cfg(rayon_unstable)]
246 pub unsafe fn submit_task<T>(&self, task: Arc<T>)
247 where T: Task
248 {
249 let task_job = TaskJob::new(task);
250 let task_job_ref = TaskJob::into_job_ref(task_job);
251 return self.inject_or_push(task_job_ref);
252
253 /// A little newtype wrapper for `T`, just because I did not
254 /// want to implement `Job` for all `T: Task`.
255 struct TaskJob<T: Task> {
256 _data: T
257 }
258
259 impl<T: Task> TaskJob<T> {
260 fn new(arc: Arc<T>) -> Arc<Self> {
261 // `TaskJob<T>` has the same layout as `T`, so we can safely
262 // tranmsute this `T` into a `TaskJob<T>`. This lets us write our
263 // impls of `Job` for `TaskJob<T>`, making them more restricted.
264 // Since `Job` is a private trait, this is not strictly necessary,
265 // I don't think, but makes me feel better.
266 unsafe { mem::transmute(arc) }
267 }
268
269 pub fn into_task(this: Arc<TaskJob<T>>) -> Arc<T> {
270 // Same logic as `new()`
271 unsafe { mem::transmute(this) }
272 }
273
274 unsafe fn into_job_ref(this: Arc<Self>) -> JobRef {
275 let this: *const Self = mem::transmute(this);
276 JobRef::new(this)
277 }
278 }
279
280 impl<T: Task> Job for TaskJob<T> {
281 unsafe fn execute(this: *const Self) {
282 let this: Arc<Self> = mem::transmute(this);
283 let task: Arc<T> = TaskJob::into_task(this);
284 Task::execute(task);
285 }
286 }
287 }
288
289 /// Push a job into the "external jobs" queue; it will be taken by
290 /// whatever worker has nothing to do. Use this is you know that
291 /// you are not on a worker of this registry.
292 pub fn inject(&self, injected_jobs: &[JobRef]) {
293 log!(InjectJobs { count: injected_jobs.len() });
294 {
295 let state = self.state.lock().unwrap();
296
297 // It should not be possible for `state.terminate` to be true
298 // here. It is only set to true when the user creates (and
299 // drops) a `ThreadPool`; and, in that case, they cannot be
300 // calling `inject()` later, since they dropped their
301 // `ThreadPool`.
302 assert!(!self.terminate_latch.probe(), "inject() sees state.terminate as true");
303
304 for &job_ref in injected_jobs {
305 state.job_injector.push(job_ref);
306 }
307 }
308 self.sleep.tickle(usize::MAX);
309 }
310
311 fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
312 loop {
313 match self.job_uninjector.steal() {
314 Steal::Empty => return None,
315 Steal::Data(d) => {
316 log!(UninjectedWork { worker: worker_index });
317 return Some(d);
318 },
319 Steal::Retry => {},
320 }
321 }
322 }
323
324 /// If already in a worker-thread of this registry, just execute `op`.
325 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
326 /// completes and return its return value. If `op` panics, that panic will
327 /// be propagated as well. The second argument indicates `true` if injection
328 /// was performed, `false` if executed directly.
329 pub fn in_worker<OP, R>(&self, op: OP) -> R
330 where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send
331 {
332 unsafe {
333 let worker_thread = WorkerThread::current();
334 if worker_thread.is_null() {
335 self.in_worker_cold(op)
336 } else if (*worker_thread).registry().id() != self.id() {
337 self.in_worker_cross(&*worker_thread, op)
338 } else {
339 // Perfectly valid to give them a `&T`: this is the
340 // current thread, so we know the data structure won't be
341 // invalidated until we return.
342 op(&*worker_thread, false)
343 }
344 }
345 }
346
347 #[cold]
348 unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
349 where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send
350 {
351 // This thread isn't a member of *any* thread pool, so just block.
352 debug_assert!(WorkerThread::current().is_null());
353 let job = StackJob::new(|injected| {
354 let worker_thread = WorkerThread::current();
355 assert!(injected && !worker_thread.is_null());
356 op(&*worker_thread, true)
357 }, LockLatch::new());
358 self.inject(&[job.as_job_ref()]);
359 job.latch.wait();
360 job.into_result()
361 }
362
363 #[cold]
364 unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
365 where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send
366 {
367 // This thread is a member of a different pool, so let it process
368 // other work while waiting for this `op` to complete.
369 debug_assert!(current_thread.registry().id() != self.id());
370 let latch = TickleLatch::new(SpinLatch::new(), &current_thread.registry().sleep);
371 let job = StackJob::new(|injected| {
372 let worker_thread = WorkerThread::current();
373 assert!(injected && !worker_thread.is_null());
374 op(&*worker_thread, true)
375 }, latch);
376 self.inject(&[job.as_job_ref()]);
377 current_thread.wait_until(&job.latch);
378 job.into_result()
379 }
380
381 /// Increment the terminate counter. This increment should be
382 /// balanced by a call to `terminate`, which will decrement. This
383 /// is used when spawning asynchronous work, which needs to
384 /// prevent the registry from terminating so long as it is active.
385 ///
386 /// Note that blocking functions such as `join` and `scope` do not
387 /// need to concern themselves with this fn; their context is
388 /// responsible for ensuring the current thread-pool will not
389 /// terminate until they return.
390 ///
391 /// The global thread-pool always has an outstanding reference
392 /// (the initial one). Custom thread-pools have one outstanding
393 /// reference that is dropped when the `ThreadPool` is dropped:
394 /// since installing the thread-pool blocks until any joins/scopes
395 /// complete, this ensures that joins/scopes are covered.
396 ///
397 /// The exception is `::spawn()`, which can create a job outside
398 /// of any blocking scope. In that case, the job itself holds a
399 /// terminate count and is responsible for invoking `terminate()`
400 /// when finished.
401 pub fn increment_terminate_count(&self) {
402 self.terminate_latch.increment();
403 }
404
405 /// Signals that the thread-pool which owns this registry has been
406 /// dropped. The worker threads will gradually terminate, once any
407 /// extant work is completed.
408 pub fn terminate(&self) {
409 self.terminate_latch.set();
410 self.sleep.tickle(usize::MAX);
411 }
412 }
413
414 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
415 pub struct RegistryId {
416 addr: usize
417 }
418
419 impl RegistryState {
420 pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
421 RegistryState {
422 job_injector: job_injector,
423 }
424 }
425 }
426
427 struct ThreadInfo {
428 /// Latch set once thread has started and we are entering into the
429 /// main loop. Used to wait for worker threads to become primed,
430 /// primarily of interest for benchmarking.
431 primed: LockLatch,
432
433 /// Latch is set once worker thread has completed. Used to wait
434 /// until workers have stopped; only used for tests.
435 stopped: LockLatch,
436
437 /// the "stealer" half of the worker's deque
438 stealer: Stealer<JobRef>,
439 }
440
441 impl ThreadInfo {
442 fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
443 ThreadInfo {
444 primed: LockLatch::new(),
445 stopped: LockLatch::new(),
446 stealer: stealer,
447 }
448 }
449 }
450
451 /// ////////////////////////////////////////////////////////////////////////
452 /// WorkerThread identifiers
453
454 pub struct WorkerThread {
455 /// the "worker" half of our local deque
456 worker: Deque<JobRef>,
457
458 index: usize,
459
460 /// are these workers configured to steal breadth-first or not?
461 breadth_first: bool,
462
463 /// A weak random number generator.
464 rng: XorShift64Star,
465
466 registry: Arc<Registry>,
467 }
468
469 // This is a bit sketchy, but basically: the WorkerThread is
470 // allocated on the stack of the worker on entry and stored into this
471 // thread local variable. So it will remain valid at least until the
472 // worker is fully unwound. Using an unsafe pointer avoids the need
473 // for a RefCell<T> etc.
474 thread_local! {
475 static WORKER_THREAD_STATE: Cell<*const WorkerThread> =
476 Cell::new(0 as *const WorkerThread)
477 }
478
479 impl WorkerThread {
480 /// Gets the `WorkerThread` index for the current thread; returns
481 /// NULL if this is not a worker thread. This pointer is valid
482 /// anywhere on the current thread.
483 #[inline]
484 pub fn current() -> *const WorkerThread {
485 WORKER_THREAD_STATE.with(|t| t.get())
486 }
487
488 /// Sets `self` as the worker thread index for the current thread.
489 /// This is done during worker thread startup.
490 unsafe fn set_current(thread: *const WorkerThread) {
491 WORKER_THREAD_STATE.with(|t| {
492 assert!(t.get().is_null());
493 t.set(thread);
494 });
495 }
496
497 /// Returns the registry that owns this worker thread.
498 pub fn registry(&self) -> &Arc<Registry> {
499 &self.registry
500 }
501
502 /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
503 #[inline]
504 pub fn index(&self) -> usize {
505 self.index
506 }
507
508 #[inline]
509 pub unsafe fn push(&self, job: JobRef) {
510 self.worker.push(job);
511 self.registry.sleep.tickle(self.index);
512 }
513
514 #[inline]
515 pub fn local_deque_is_empty(&self) -> bool {
516 self.worker.len() == 0
517 }
518
519 /// Attempts to obtain a "local" job -- typically this means
520 /// popping from the top of the stack, though if we are configured
521 /// for breadth-first execution, it would mean dequeuing from the
522 /// bottom.
523 #[inline]
524 pub unsafe fn take_local_job(&self) -> Option<JobRef> {
525 if !self.breadth_first {
526 self.worker.pop()
527 } else {
528 loop {
529 match self.worker.steal() {
530 Steal::Empty => return None,
531 Steal::Data(d) => return Some(d),
532 Steal::Retry => {},
533 }
534 }
535 }
536 }
537
538 /// Wait until the latch is set. Try to keep busy by popping and
539 /// stealing tasks as necessary.
540 #[inline]
541 pub unsafe fn wait_until<L: LatchProbe + ?Sized>(&self, latch: &L) {
542 log!(WaitUntil { worker: self.index });
543 if !latch.probe() {
544 self.wait_until_cold(latch);
545 }
546 }
547
548 #[cold]
549 unsafe fn wait_until_cold<L: LatchProbe + ?Sized>(&self, latch: &L) {
550 // the code below should swallow all panics and hence never
551 // unwind; but if something does wrong, we want to abort,
552 // because otherwise other code in rayon may assume that the
553 // latch has been signaled, and that can lead to random memory
554 // accesses, which would be *very bad*
555 let abort_guard = unwind::AbortIfPanic;
556
557 let mut yields = 0;
558 while !latch.probe() {
559 // Try to find some work to do. We give preference first
560 // to things in our local deque, then in other workers
561 // deques, and finally to injected jobs from the
562 // outside. The idea is to finish what we started before
563 // we take on something new.
564 if let Some(job) = self.take_local_job()
565 .or_else(|| self.steal())
566 .or_else(|| self.registry.pop_injected_job(self.index)) {
567 yields = self.registry.sleep.work_found(self.index, yields);
568 self.execute(job);
569 } else {
570 yields = self.registry.sleep.no_work_found(self.index, yields);
571 }
572 }
573
574 // If we were sleepy, we are not anymore. We "found work" --
575 // whatever the surrounding thread was doing before it had to
576 // wait.
577 self.registry.sleep.work_found(self.index, yields);
578
579 log!(LatchSet { worker: self.index });
580 mem::forget(abort_guard); // successful execution, do not abort
581 }
582
583 pub unsafe fn execute(&self, job: JobRef) {
584 job.execute();
585
586 // Subtle: executing this job will have `set()` some of its
587 // latches. This may mean that a sleepy (or sleeping) worker
588 // can now make progress. So we have to tickle them to let
589 // them know.
590 self.registry.sleep.tickle(self.index);
591 }
592
593 /// Try to steal a single job and return it.
594 ///
595 /// This should only be done as a last resort, when there is no
596 /// local work to do.
597 unsafe fn steal(&self) -> Option<JobRef> {
598 // we only steal when we don't have any work to do locally
599 debug_assert!(self.worker.pop().is_none());
600
601 // otherwise, try to steal
602 let num_threads = self.registry.thread_infos.len();
603 if num_threads <= 1 {
604 return None;
605 }
606
607 let start = self.rng.next_usize(num_threads);
608 (start .. num_threads)
609 .chain(0 .. start)
610 .filter(|&i| i != self.index)
611 .filter_map(|victim_index| {
612 let victim = &self.registry.thread_infos[victim_index];
613 loop {
614 match victim.stealer.steal() {
615 Steal::Empty => return None,
616 Steal::Data(d) => {
617 log!(StoleWork {
618 worker: self.index,
619 victim: victim_index
620 });
621 return Some(d);
622 },
623 Steal::Retry => {},
624 }
625 }
626 })
627 .next()
628 }
629 }
630
631 /// ////////////////////////////////////////////////////////////////////////
632
633 unsafe fn main_loop(worker: Deque<JobRef>,
634 registry: Arc<Registry>,
635 index: usize,
636 breadth_first: bool) {
637 let worker_thread = WorkerThread {
638 worker: worker,
639 breadth_first: breadth_first,
640 index: index,
641 rng: XorShift64Star::new(),
642 registry: registry.clone(),
643 };
644 WorkerThread::set_current(&worker_thread);
645
646 // let registry know we are ready to do work
647 registry.thread_infos[index].primed.set();
648
649 // Worker threads should not panic. If they do, just abort, as the
650 // internal state of the threadpool is corrupted. Note that if
651 // **user code** panics, we should catch that and redirect.
652 let abort_guard = unwind::AbortIfPanic;
653
654 // Inform a user callback that we started a thread.
655 if let Some(ref handler) = registry.start_handler {
656 let registry = registry.clone();
657 match unwind::halt_unwinding(|| handler(index)) {
658 Ok(()) => {
659 }
660 Err(err) => {
661 registry.handle_panic(err);
662 }
663 }
664 }
665
666 worker_thread.wait_until(&registry.terminate_latch);
667
668 // Should not be any work left in our queue.
669 debug_assert!(worker_thread.take_local_job().is_none());
670
671 // let registry know we are done
672 registry.thread_infos[index].stopped.set();
673
674 // Normal termination, do not abort.
675 mem::forget(abort_guard);
676
677 // Inform a user callback that we exited a thread.
678 if let Some(ref handler) = registry.exit_handler {
679 let registry = registry.clone();
680 match unwind::halt_unwinding(|| handler(index)) {
681 Ok(()) => {
682 }
683 Err(err) => {
684 registry.handle_panic(err);
685 }
686 }
687 // We're already exiting the thread, there's nothing else to do.
688 }
689 }
690
691 /// If already in a worker-thread, just execute `op`. Otherwise,
692 /// execute `op` in the default thread-pool. Either way, block until
693 /// `op` completes and return its return value. If `op` panics, that
694 /// panic will be propagated as well. The second argument indicates
695 /// `true` if injection was performed, `false` if executed directly.
696 pub fn in_worker<OP, R>(op: OP) -> R
697 where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send
698 {
699 unsafe {
700 let owner_thread = WorkerThread::current();
701 if !owner_thread.is_null() {
702 // Perfectly valid to give them a `&T`: this is the
703 // current thread, so we know the data structure won't be
704 // invalidated until we return.
705 op(&*owner_thread, false)
706 } else {
707 global_registry().in_worker_cold(op)
708 }
709 }
710 }
711
712 /// [xorshift*] is a fast pseudorandom number generator which will
713 /// even tolerate weak seeding, as long as it's not zero.
714 ///
715 /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
716 struct XorShift64Star {
717 state: Cell<u64>,
718 }
719
720 impl XorShift64Star {
721 fn new() -> Self {
722 // Any non-zero seed will do -- this uses the hash of a global counter.
723 let mut seed = 0;
724 while seed == 0 {
725 let mut hasher = DefaultHasher::new();
726 static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
727 hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
728 seed = hasher.finish();
729 }
730
731 XorShift64Star {
732 state: Cell::new(seed),
733 }
734 }
735
736 fn next(&self) -> u64 {
737 let mut x = self.state.get();
738 debug_assert_ne!(x, 0);
739 x ^= x >> 12;
740 x ^= x << 25;
741 x ^= x >> 27;
742 self.state.set(x);
743 x.wrapping_mul(0x2545_f491_4f6c_dd1d)
744 }
745
746 /// Return a value from `0..n`.
747 fn next_usize(&self, n: usize) -> usize {
748 (self.next() % n as u64) as usize
749 }
750 }