]> git.proxmox.com Git - rustc.git/blobdiff - vendor/rustc-rayon-core/src/registry.rs
New upstream version 1.63.0+dfsg1
[rustc.git] / vendor / rustc-rayon-core / src / registry.rs
index 0ad6bd3257c2fee042c961bd6d622fdb3980b9cc..24f73f31973d99d8bd116867f3c1164e54b906d9 100644 (file)
@@ -1,9 +1,9 @@
 use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch};
+use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
 use crate::log::Event::*;
+use crate::log::Logger;
 use crate::sleep::Sleep;
 use crate::unwind;
-use crate::util::leak;
 use crate::{
     AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
     ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
@@ -35,22 +35,22 @@ pub struct ThreadBuilder {
 }
 
 impl ThreadBuilder {
-    /// Get the index of this thread in the pool, within `0..num_threads`.
+    /// Gets the index of this thread in the pool, within `0..num_threads`.
     pub fn index(&self) -> usize {
         self.index
     }
 
-    /// Get the string that was specified by `ThreadPoolBuilder::name()`.
+    /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
     pub fn name(&self) -> Option<&str> {
         self.name.as_ref().map(String::as_str)
     }
 
-    /// Get the value that was specified by `ThreadPoolBuilder::stack_size()`.
+    /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
     pub fn stack_size(&self) -> Option<usize> {
         self.stack_size
     }
 
-    /// Execute the main loop for this thread. This will not return until the
+    /// Executes the main loop for this thread. This will not return until the
     /// thread pool is dropped.
     pub fn run(self) {
         unsafe { main_loop(self.worker, self.registry, self.index) }
@@ -132,6 +132,7 @@ where
 }
 
 pub struct Registry {
+    logger: Logger,
     thread_infos: Vec<ThreadInfo>,
     sleep: Sleep,
     injected_jobs: Injector<JobRef>,
@@ -155,21 +156,21 @@ pub struct Registry {
     // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
     //   These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
     //   and that job will keep the pool alive.
-    terminate_latch: CountLatch,
+    terminate_count: AtomicUsize,
 }
 
 /// ////////////////////////////////////////////////////////////////////////
 /// Initialization
 
-static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None;
+static mut THE_REGISTRY: Option<Arc<Registry>> = None;
 static THE_REGISTRY_SET: Once = Once::new();
 
 /// Starts the worker threads (if that has not already happened). If
 /// initialization has not already occurred, use the default
 /// configuration.
-fn global_registry() -> &'static Arc<Registry> {
+pub(super) fn global_registry() -> &'static Arc<Registry> {
     set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
-        .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) })
+        .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
         .expect("The global thread pool has not been initialized.")
 }
 
@@ -193,15 +194,12 @@ where
     let mut result = Err(ThreadPoolBuildError::new(
         ErrorKind::GlobalPoolAlreadyInitialized,
     ));
+
     THE_REGISTRY_SET.call_once(|| {
-        result = registry().map(|registry| {
-            let registry = leak(registry);
-            unsafe {
-                THE_REGISTRY = Some(registry);
-            }
-            registry
-        });
+        result = registry()
+            .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) })
     });
+
     result
 }
 
@@ -220,7 +218,9 @@ impl Registry {
     where
         S: ThreadSpawn,
     {
-        let n_threads = builder.get_num_threads();
+        // Soft-limit the number of threads that we can actually support.
+        let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads());
+
         let breadth_first = builder.get_breadth_first();
 
         let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
@@ -236,11 +236,13 @@ impl Registry {
             })
             .unzip();
 
+        let logger = Logger::new(n_threads);
         let registry = Arc::new(Registry {
+            logger: logger.clone(),
             thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
-            sleep: Sleep::new(n_threads),
+            sleep: Sleep::new(logger, n_threads),
             injected_jobs: Injector::new(),
-            terminate_latch: CountLatch::new(),
+            terminate_count: AtomicUsize::new(1),
             panic_handler: builder.take_panic_handler(),
             deadlock_handler: builder.take_deadlock_handler(),
             start_handler: builder.take_start_handler(),
@@ -256,7 +258,7 @@ impl Registry {
             let thread = ThreadBuilder {
                 name: builder.get_thread_name(index),
                 stack_size: builder.get_stack_size(),
-                registry: registry.clone(),
+                registry: Arc::clone(&registry),
                 worker,
                 index,
             };
@@ -268,17 +270,18 @@ impl Registry {
         // Returning normally now, without termination.
         mem::forget(t1000);
 
-        Ok(registry.clone())
+        Ok(registry)
     }
 
     pub fn current() -> Arc<Registry> {
         unsafe {
             let worker_thread = WorkerThread::current();
-            if worker_thread.is_null() {
-                global_registry().clone()
+            let registry = if worker_thread.is_null() {
+                global_registry()
             } else {
-                (*worker_thread).registry.clone()
-            }
+                &(*worker_thread).registry
+            };
+            Arc::clone(registry)
         }
     }
 
@@ -317,6 +320,11 @@ impl Registry {
         }
     }
 
+    #[inline]
+    pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
+        self.logger.log(event)
+    }
+
     pub(super) fn num_threads(&self) -> usize {
         self.thread_infos.len()
     }
@@ -393,8 +401,8 @@ impl Registry {
     /// whatever worker has nothing to do. Use this is you know that
     /// you are not on a worker of this registry.
     pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
-        log!(InjectJobs {
-            count: injected_jobs.len()
+        self.log(|| JobsInjected {
+            count: injected_jobs.len(),
         });
 
         // It should not be possible for `state.terminate` to be true
@@ -402,23 +410,32 @@ impl Registry {
         // drops) a `ThreadPool`; and, in that case, they cannot be
         // calling `inject()` later, since they dropped their
         // `ThreadPool`.
-        assert!(
-            !self.terminate_latch.probe(),
+        debug_assert_ne!(
+            self.terminate_count.load(Ordering::Acquire),
+            0,
             "inject() sees state.terminate as true"
         );
 
+        let queue_was_empty = self.injected_jobs.is_empty();
+
         for &job_ref in injected_jobs {
             self.injected_jobs.push(job_ref);
         }
-        self.sleep.tickle(usize::MAX);
+
+        self.sleep
+            .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
+    }
+
+    pub(crate) fn has_injected_job(&self) -> bool {
+        !self.injected_jobs.is_empty()
     }
 
     fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
         loop {
             match self.injected_jobs.steal() {
                 Steal::Success(job) => {
-                    log!(UninjectedWork {
-                        worker: worker_index
+                    self.log(|| JobUninjected {
+                        worker: worker_index,
                     });
                     return Some(job);
                 }
@@ -477,6 +494,10 @@ impl Registry {
             self.release_thread();
             job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
             self.acquire_thread();
+
+            // flush accumulated logs as we exit the thread
+            self.logger.log(|| Flush);
+
             job.into_result()
         })
     }
@@ -490,7 +511,7 @@ impl Registry {
         // This thread is a member of a different pool, so let it process
         // other work while waiting for this `op` to complete.
         debug_assert!(current_thread.registry().id() != self.id());
-        let latch = TickleLatch::new(SpinLatch::new(), &current_thread.registry().sleep);
+        let latch = SpinLatch::cross(current_thread);
         let job = StackJob::new(
             0,
             |injected| {
@@ -505,7 +526,7 @@ impl Registry {
         job.into_result()
     }
 
-    /// Increment the terminate counter. This increment should be
+    /// Increments the terminate counter. This increment should be
     /// balanced by a call to `terminate`, which will decrement. This
     /// is used when spawning asynchronous work, which needs to
     /// prevent the registry from terminating so long as it is active.
@@ -526,15 +547,28 @@ impl Registry {
     /// terminate count and is responsible for invoking `terminate()`
     /// when finished.
     pub(super) fn increment_terminate_count(&self) {
-        self.terminate_latch.increment();
+        let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
+        debug_assert!(previous != 0, "registry ref count incremented from zero");
+        assert!(
+            previous != std::usize::MAX,
+            "overflow in registry ref count"
+        );
     }
 
     /// Signals that the thread-pool which owns this registry has been
     /// dropped. The worker threads will gradually terminate, once any
     /// extant work is completed.
     pub(super) fn terminate(&self) {
-        self.terminate_latch.set();
-        self.sleep.tickle(usize::MAX);
+        if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
+            for (i, thread_info) in self.thread_infos.iter().enumerate() {
+                thread_info.terminate.set_and_tickle_one(self, i);
+            }
+        }
+    }
+
+    /// Notify the worker that the latch they are sleeping on has been "set".
+    pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
+        self.sleep.notify_worker_latch_is_set(target_worker_index);
     }
 }
 
@@ -571,6 +605,15 @@ struct ThreadInfo {
     /// until workers have stopped; only used for tests.
     stopped: LockLatch,
 
+    /// The latch used to signal that terminated has been requested.
+    /// This latch is *set* by the `terminate` method on the
+    /// `Registry`, once the registry's main "terminate" counter
+    /// reaches zero.
+    ///
+    /// NB. We use a `CountLatch` here because it has no lifetimes and is
+    /// meant for async use, but the count never gets higher than one.
+    terminate: CountLatch,
+
     /// the "stealer" half of the worker's deque
     stealer: Stealer<JobRef>,
 }
@@ -580,6 +623,7 @@ impl ThreadInfo {
         ThreadInfo {
             primed: LockLatch::new(),
             stopped: LockLatch::new(),
+            terminate: CountLatch::new(),
             stealer,
         }
     }
@@ -641,10 +685,16 @@ impl WorkerThread {
     }
 
     /// Returns the registry that owns this worker thread.
+    #[inline]
     pub(super) fn registry(&self) -> &Arc<Registry> {
         &self.registry
     }
 
+    #[inline]
+    pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
+        self.registry.logger.log(event)
+    }
+
     /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
     #[inline]
     pub(super) fn index(&self) -> usize {
@@ -653,8 +703,12 @@ impl WorkerThread {
 
     #[inline]
     pub(super) unsafe fn push(&self, job: JobRef) {
+        self.log(|| JobPushed { worker: self.index });
+        let queue_was_empty = self.worker.is_empty();
         self.worker.push(job);
-        self.registry.sleep.tickle(self.index);
+        self.registry
+            .sleep
+            .new_internal_jobs(self.index, 1, queue_was_empty);
     }
 
     #[inline]
@@ -673,21 +727,27 @@ impl WorkerThread {
     /// bottom.
     #[inline]
     pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
-        self.worker.pop()
+        let popped_job = self.worker.pop();
+
+        if popped_job.is_some() {
+            self.log(|| JobPopped { worker: self.index });
+        }
+
+        popped_job
     }
 
     /// Wait until the latch is set. Try to keep busy by popping and
     /// stealing tasks as necessary.
     #[inline]
-    pub(super) unsafe fn wait_until<L: LatchProbe + ?Sized>(&self, latch: &L) {
-        log!(WaitUntil { worker: self.index });
+    pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
+        let latch = latch.as_core_latch();
         if !latch.probe() {
             self.wait_until_cold(latch);
         }
     }
 
     #[cold]
-    unsafe fn wait_until_cold<L: LatchProbe + ?Sized>(&self, latch: &L) {
+    unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
         // the code below should swallow all panics and hence never
         // unwind; but if something does wrong, we want to abort,
         // because otherwise other code in rayon may assume that the
@@ -695,7 +755,7 @@ impl WorkerThread {
         // accesses, which would be *very bad*
         let abort_guard = unwind::AbortIfPanic;
 
-        let mut yields = 0;
+        let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
         while !latch.probe() {
             // Try to find some work to do. We give preference first
             // to things in our local deque, then in other workers
@@ -707,33 +767,31 @@ impl WorkerThread {
                 .or_else(|| self.steal())
                 .or_else(|| self.registry.pop_injected_job(self.index))
             {
-                yields = self.registry.sleep.work_found(self.index, yields);
+                self.registry.sleep.work_found(idle_state);
                 self.execute(job);
+                idle_state = self.registry.sleep.start_looking(self.index, latch);
             } else {
-                yields = self
-                    .registry
+                self.registry
                     .sleep
-                    .no_work_found(self.index, yields, &self.registry);
+                    .no_work_found(&mut idle_state, latch, &self.registry)
             }
         }
 
         // If we were sleepy, we are not anymore. We "found work" --
         // whatever the surrounding thread was doing before it had to
         // wait.
-        self.registry.sleep.work_found(self.index, yields);
+        self.registry.sleep.work_found(idle_state);
 
-        log!(LatchSet { worker: self.index });
+        self.log(|| ThreadSawLatchSet {
+            worker: self.index,
+            latch_addr: latch.addr(),
+        });
         mem::forget(abort_guard); // successful execution, do not abort
     }
 
+    #[inline]
     pub(super) unsafe fn execute(&self, job: JobRef) {
         job.execute();
-
-        // Subtle: executing this job will have `set()` some of its
-        // latches.  This may mean that a sleepy (or sleeping) worker
-        // can now make progress. So we have to tickle them to let
-        // them know.
-        self.registry.sleep.tickle(self.index);
     }
 
     /// Try to steal a single job and return it.
@@ -745,32 +803,39 @@ impl WorkerThread {
         debug_assert!(self.local_deque_is_empty());
 
         // otherwise, try to steal
-        let num_threads = self.registry.thread_infos.len();
+        let thread_infos = &self.registry.thread_infos.as_slice();
+        let num_threads = thread_infos.len();
         if num_threads <= 1 {
             return None;
         }
 
-        let start = self.rng.next_usize(num_threads);
-        (start..num_threads)
-            .chain(0..start)
-            .filter(|&i| i != self.index)
-            .filter_map(|victim_index| {
-                let victim = &self.registry.thread_infos[victim_index];
-                loop {
+        loop {
+            let mut retry = false;
+            let start = self.rng.next_usize(num_threads);
+            let job = (start..num_threads)
+                .chain(0..start)
+                .filter(move |&i| i != self.index)
+                .find_map(|victim_index| {
+                    let victim = &thread_infos[victim_index];
                     match victim.stealer.steal() {
-                        Steal::Empty => return None,
-                        Steal::Success(d) => {
-                            log!(StoleWork {
+                        Steal::Success(job) => {
+                            self.log(|| JobStolen {
                                 worker: self.index,
-                                victim: victim_index
+                                victim: victim_index,
                             });
-                            return Some(d);
+                            Some(job)
+                        }
+                        Steal::Empty => None,
+                        Steal::Retry => {
+                            retry = true;
+                            None
                         }
-                        Steal::Retry => {}
                     }
-                }
-            })
-            .next()
+                });
+            if job.is_some() || !retry {
+                return job;
+            }
+        }
     }
 }
 
@@ -782,9 +847,10 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
         fifo: JobFifo::new(),
         index,
         rng: XorShift64Star::new(),
-        registry: registry.clone(),
+        registry,
     };
     WorkerThread::set_current(worker_thread);
+    let registry = &*worker_thread.registry;
 
     // let registry know we are ready to do work
     registry.thread_infos[index].primed.set();
@@ -796,7 +862,6 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
 
     // Inform a user callback that we started a thread.
     if let Some(ref handler) = registry.start_handler {
-        let registry = registry.clone();
         match unwind::halt_unwinding(|| handler(index)) {
             Ok(()) => {}
             Err(err) => {
@@ -805,8 +870,13 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
         }
     }
 
+    let my_terminate_latch = &registry.thread_infos[index].terminate;
+    worker_thread.log(|| ThreadStart {
+        worker: index,
+        terminate_addr: my_terminate_latch.as_core_latch().addr(),
+    });
     registry.acquire_thread();
-    worker_thread.wait_until(&registry.terminate_latch);
+    worker_thread.wait_until(my_terminate_latch);
 
     // Should not be any work left in our queue.
     debug_assert!(worker_thread.take_local_job().is_none());
@@ -817,9 +887,10 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
     // Normal termination, do not abort.
     mem::forget(abort_guard);
 
+    worker_thread.log(|| ThreadTerminate { worker: index });
+
     // Inform a user callback that we exited a thread.
     if let Some(ref handler) = registry.exit_handler {
-        let registry = registry.clone();
         match unwind::halt_unwinding(|| handler(index)) {
             Ok(()) => {}
             Err(err) => {