use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch};
+use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
use crate::log::Event::*;
+use crate::log::Logger;
use crate::sleep::Sleep;
use crate::unwind;
-use crate::util::leak;
use crate::{
AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
}
impl ThreadBuilder {
- /// Get the index of this thread in the pool, within `0..num_threads`.
+ /// Gets the index of this thread in the pool, within `0..num_threads`.
pub fn index(&self) -> usize {
self.index
}
- /// Get the string that was specified by `ThreadPoolBuilder::name()`.
+ /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
pub fn name(&self) -> Option<&str> {
self.name.as_ref().map(String::as_str)
}
- /// Get the value that was specified by `ThreadPoolBuilder::stack_size()`.
+ /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
pub fn stack_size(&self) -> Option<usize> {
self.stack_size
}
- /// Execute the main loop for this thread. This will not return until the
+ /// Executes the main loop for this thread. This will not return until the
/// thread pool is dropped.
pub fn run(self) {
unsafe { main_loop(self.worker, self.registry, self.index) }
}
pub struct Registry {
+ logger: Logger,
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: Injector<JobRef>,
// - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
// These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
// and that job will keep the pool alive.
- terminate_latch: CountLatch,
+ terminate_count: AtomicUsize,
}
/// ////////////////////////////////////////////////////////////////////////
/// Initialization
-static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None;
+static mut THE_REGISTRY: Option<Arc<Registry>> = None;
static THE_REGISTRY_SET: Once = Once::new();
/// Starts the worker threads (if that has not already happened). If
/// initialization has not already occurred, use the default
/// configuration.
-fn global_registry() -> &'static Arc<Registry> {
+pub(super) fn global_registry() -> &'static Arc<Registry> {
set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
- .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) })
+ .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
let mut result = Err(ThreadPoolBuildError::new(
ErrorKind::GlobalPoolAlreadyInitialized,
));
+
THE_REGISTRY_SET.call_once(|| {
- result = registry().map(|registry| {
- let registry = leak(registry);
- unsafe {
- THE_REGISTRY = Some(registry);
- }
- registry
- });
+ result = registry()
+ .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) })
});
+
result
}
where
S: ThreadSpawn,
{
- let n_threads = builder.get_num_threads();
+ // Soft-limit the number of threads that we can actually support.
+ let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads());
+
let breadth_first = builder.get_breadth_first();
let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
})
.unzip();
+ let logger = Logger::new(n_threads);
let registry = Arc::new(Registry {
+ logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
- sleep: Sleep::new(n_threads),
+ sleep: Sleep::new(logger, n_threads),
injected_jobs: Injector::new(),
- terminate_latch: CountLatch::new(),
+ terminate_count: AtomicUsize::new(1),
panic_handler: builder.take_panic_handler(),
deadlock_handler: builder.take_deadlock_handler(),
start_handler: builder.take_start_handler(),
let thread = ThreadBuilder {
name: builder.get_thread_name(index),
stack_size: builder.get_stack_size(),
- registry: registry.clone(),
+ registry: Arc::clone(®istry),
worker,
index,
};
// Returning normally now, without termination.
mem::forget(t1000);
- Ok(registry.clone())
+ Ok(registry)
}
pub fn current() -> Arc<Registry> {
unsafe {
let worker_thread = WorkerThread::current();
- if worker_thread.is_null() {
- global_registry().clone()
+ let registry = if worker_thread.is_null() {
+ global_registry()
} else {
- (*worker_thread).registry.clone()
- }
+ &(*worker_thread).registry
+ };
+ Arc::clone(registry)
}
}
}
}
+ #[inline]
+ pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
+ self.logger.log(event)
+ }
+
pub(super) fn num_threads(&self) -> usize {
self.thread_infos.len()
}
/// whatever worker has nothing to do. Use this is you know that
/// you are not on a worker of this registry.
pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
- log!(InjectJobs {
- count: injected_jobs.len()
+ self.log(|| JobsInjected {
+ count: injected_jobs.len(),
});
// It should not be possible for `state.terminate` to be true
// drops) a `ThreadPool`; and, in that case, they cannot be
// calling `inject()` later, since they dropped their
// `ThreadPool`.
- assert!(
- !self.terminate_latch.probe(),
+ debug_assert_ne!(
+ self.terminate_count.load(Ordering::Acquire),
+ 0,
"inject() sees state.terminate as true"
);
+ let queue_was_empty = self.injected_jobs.is_empty();
+
for &job_ref in injected_jobs {
self.injected_jobs.push(job_ref);
}
- self.sleep.tickle(usize::MAX);
+
+ self.sleep
+ .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
+ }
+
+ pub(crate) fn has_injected_job(&self) -> bool {
+ !self.injected_jobs.is_empty()
}
fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
loop {
match self.injected_jobs.steal() {
Steal::Success(job) => {
- log!(UninjectedWork {
- worker: worker_index
+ self.log(|| JobUninjected {
+ worker: worker_index,
});
return Some(job);
}
self.release_thread();
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
self.acquire_thread();
+
+ // flush accumulated logs as we exit the thread
+ self.logger.log(|| Flush);
+
job.into_result()
})
}
// This thread is a member of a different pool, so let it process
// other work while waiting for this `op` to complete.
debug_assert!(current_thread.registry().id() != self.id());
- let latch = TickleLatch::new(SpinLatch::new(), ¤t_thread.registry().sleep);
+ let latch = SpinLatch::cross(current_thread);
let job = StackJob::new(
0,
|injected| {
job.into_result()
}
- /// Increment the terminate counter. This increment should be
+ /// Increments the terminate counter. This increment should be
/// balanced by a call to `terminate`, which will decrement. This
/// is used when spawning asynchronous work, which needs to
/// prevent the registry from terminating so long as it is active.
/// terminate count and is responsible for invoking `terminate()`
/// when finished.
pub(super) fn increment_terminate_count(&self) {
- self.terminate_latch.increment();
+ let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
+ debug_assert!(previous != 0, "registry ref count incremented from zero");
+ assert!(
+ previous != std::usize::MAX,
+ "overflow in registry ref count"
+ );
}
/// Signals that the thread-pool which owns this registry has been
/// dropped. The worker threads will gradually terminate, once any
/// extant work is completed.
pub(super) fn terminate(&self) {
- self.terminate_latch.set();
- self.sleep.tickle(usize::MAX);
+ if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
+ for (i, thread_info) in self.thread_infos.iter().enumerate() {
+ thread_info.terminate.set_and_tickle_one(self, i);
+ }
+ }
+ }
+
+ /// Notify the worker that the latch they are sleeping on has been "set".
+ pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
+ self.sleep.notify_worker_latch_is_set(target_worker_index);
}
}
/// until workers have stopped; only used for tests.
stopped: LockLatch,
+ /// The latch used to signal that terminated has been requested.
+ /// This latch is *set* by the `terminate` method on the
+ /// `Registry`, once the registry's main "terminate" counter
+ /// reaches zero.
+ ///
+ /// NB. We use a `CountLatch` here because it has no lifetimes and is
+ /// meant for async use, but the count never gets higher than one.
+ terminate: CountLatch,
+
/// the "stealer" half of the worker's deque
stealer: Stealer<JobRef>,
}
ThreadInfo {
primed: LockLatch::new(),
stopped: LockLatch::new(),
+ terminate: CountLatch::new(),
stealer,
}
}
}
/// Returns the registry that owns this worker thread.
+ #[inline]
pub(super) fn registry(&self) -> &Arc<Registry> {
&self.registry
}
+ #[inline]
+ pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
+ self.registry.logger.log(event)
+ }
+
/// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
#[inline]
pub(super) fn index(&self) -> usize {
#[inline]
pub(super) unsafe fn push(&self, job: JobRef) {
+ self.log(|| JobPushed { worker: self.index });
+ let queue_was_empty = self.worker.is_empty();
self.worker.push(job);
- self.registry.sleep.tickle(self.index);
+ self.registry
+ .sleep
+ .new_internal_jobs(self.index, 1, queue_was_empty);
}
#[inline]
/// bottom.
#[inline]
pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
- self.worker.pop()
+ let popped_job = self.worker.pop();
+
+ if popped_job.is_some() {
+ self.log(|| JobPopped { worker: self.index });
+ }
+
+ popped_job
}
/// Wait until the latch is set. Try to keep busy by popping and
/// stealing tasks as necessary.
#[inline]
- pub(super) unsafe fn wait_until<L: LatchProbe + ?Sized>(&self, latch: &L) {
- log!(WaitUntil { worker: self.index });
+ pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
+ let latch = latch.as_core_latch();
if !latch.probe() {
self.wait_until_cold(latch);
}
}
#[cold]
- unsafe fn wait_until_cold<L: LatchProbe + ?Sized>(&self, latch: &L) {
+ unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
// the code below should swallow all panics and hence never
// unwind; but if something does wrong, we want to abort,
// because otherwise other code in rayon may assume that the
// accesses, which would be *very bad*
let abort_guard = unwind::AbortIfPanic;
- let mut yields = 0;
+ let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
while !latch.probe() {
// Try to find some work to do. We give preference first
// to things in our local deque, then in other workers
.or_else(|| self.steal())
.or_else(|| self.registry.pop_injected_job(self.index))
{
- yields = self.registry.sleep.work_found(self.index, yields);
+ self.registry.sleep.work_found(idle_state);
self.execute(job);
+ idle_state = self.registry.sleep.start_looking(self.index, latch);
} else {
- yields = self
- .registry
+ self.registry
.sleep
- .no_work_found(self.index, yields, &self.registry);
+ .no_work_found(&mut idle_state, latch, &self.registry)
}
}
// If we were sleepy, we are not anymore. We "found work" --
// whatever the surrounding thread was doing before it had to
// wait.
- self.registry.sleep.work_found(self.index, yields);
+ self.registry.sleep.work_found(idle_state);
- log!(LatchSet { worker: self.index });
+ self.log(|| ThreadSawLatchSet {
+ worker: self.index,
+ latch_addr: latch.addr(),
+ });
mem::forget(abort_guard); // successful execution, do not abort
}
+ #[inline]
pub(super) unsafe fn execute(&self, job: JobRef) {
job.execute();
-
- // Subtle: executing this job will have `set()` some of its
- // latches. This may mean that a sleepy (or sleeping) worker
- // can now make progress. So we have to tickle them to let
- // them know.
- self.registry.sleep.tickle(self.index);
}
/// Try to steal a single job and return it.
debug_assert!(self.local_deque_is_empty());
// otherwise, try to steal
- let num_threads = self.registry.thread_infos.len();
+ let thread_infos = &self.registry.thread_infos.as_slice();
+ let num_threads = thread_infos.len();
if num_threads <= 1 {
return None;
}
- let start = self.rng.next_usize(num_threads);
- (start..num_threads)
- .chain(0..start)
- .filter(|&i| i != self.index)
- .filter_map(|victim_index| {
- let victim = &self.registry.thread_infos[victim_index];
- loop {
+ loop {
+ let mut retry = false;
+ let start = self.rng.next_usize(num_threads);
+ let job = (start..num_threads)
+ .chain(0..start)
+ .filter(move |&i| i != self.index)
+ .find_map(|victim_index| {
+ let victim = &thread_infos[victim_index];
match victim.stealer.steal() {
- Steal::Empty => return None,
- Steal::Success(d) => {
- log!(StoleWork {
+ Steal::Success(job) => {
+ self.log(|| JobStolen {
worker: self.index,
- victim: victim_index
+ victim: victim_index,
});
- return Some(d);
+ Some(job)
+ }
+ Steal::Empty => None,
+ Steal::Retry => {
+ retry = true;
+ None
}
- Steal::Retry => {}
}
- }
- })
- .next()
+ });
+ if job.is_some() || !retry {
+ return job;
+ }
+ }
}
}
fifo: JobFifo::new(),
index,
rng: XorShift64Star::new(),
- registry: registry.clone(),
+ registry,
};
WorkerThread::set_current(worker_thread);
+ let registry = &*worker_thread.registry;
// let registry know we are ready to do work
registry.thread_infos[index].primed.set();
// Inform a user callback that we started a thread.
if let Some(ref handler) = registry.start_handler {
- let registry = registry.clone();
match unwind::halt_unwinding(|| handler(index)) {
Ok(()) => {}
Err(err) => {
}
}
+ let my_terminate_latch = ®istry.thread_infos[index].terminate;
+ worker_thread.log(|| ThreadStart {
+ worker: index,
+ terminate_addr: my_terminate_latch.as_core_latch().addr(),
+ });
registry.acquire_thread();
- worker_thread.wait_until(®istry.terminate_latch);
+ worker_thread.wait_until(my_terminate_latch);
// Should not be any work left in our queue.
debug_assert!(worker_thread.take_local_job().is_none());
// Normal termination, do not abort.
mem::forget(abort_guard);
+ worker_thread.log(|| ThreadTerminate { worker: index });
+
// Inform a user callback that we exited a thread.
if let Some(ref handler) = registry.exit_handler {
- let registry = registry.clone();
match unwind::halt_unwinding(|| handler(index)) {
Ok(()) => {}
Err(err) => {