]>
Commit | Line | Data |
---|---|---|
f035d41b | 1 | use crate::job::{JobFifo, JobRef, StackJob}; |
1b1a35ee | 2 | use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch}; |
f035d41b | 3 | use crate::log::Event::*; |
1b1a35ee | 4 | use crate::log::Logger; |
f035d41b XL |
5 | use crate::sleep::Sleep; |
6 | use crate::unwind; | |
7 | use crate::util::leak; | |
8 | use crate::{ | |
9 | ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, | |
10 | }; | |
1b1a35ee | 11 | use crossbeam_deque::{Injector, Steal, Stealer, Worker}; |
2c00a5a8 | 12 | use std::any::Any; |
dc9dc135 XL |
13 | use std::cell::Cell; |
14 | use std::collections::hash_map::DefaultHasher; | |
416331ca | 15 | use std::fmt; |
dc9dc135 | 16 | use std::hash::Hasher; |
416331ca | 17 | use std::io; |
2c00a5a8 | 18 | use std::mem; |
416331ca XL |
19 | use std::ptr; |
20 | #[allow(deprecated)] | |
21 | use std::sync::atomic::ATOMIC_USIZE_INIT; | |
22 | use std::sync::atomic::{AtomicUsize, Ordering}; | |
e74abb32 | 23 | use std::sync::{Arc, Once}; |
416331ca | 24 | use std::thread; |
2c00a5a8 | 25 | use std::usize; |
416331ca XL |
26 | |
27 | /// Thread builder used for customization via | |
28 | /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). | |
29 | pub struct ThreadBuilder { | |
30 | name: Option<String>, | |
31 | stack_size: Option<usize>, | |
32 | worker: Worker<JobRef>, | |
33 | registry: Arc<Registry>, | |
34 | index: usize, | |
35 | } | |
36 | ||
37 | impl ThreadBuilder { | |
f035d41b | 38 | /// Gets the index of this thread in the pool, within `0..num_threads`. |
416331ca XL |
39 | pub fn index(&self) -> usize { |
40 | self.index | |
41 | } | |
42 | ||
f035d41b | 43 | /// Gets the string that was specified by `ThreadPoolBuilder::name()`. |
416331ca XL |
44 | pub fn name(&self) -> Option<&str> { |
45 | self.name.as_ref().map(String::as_str) | |
46 | } | |
47 | ||
f035d41b | 48 | /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`. |
416331ca XL |
49 | pub fn stack_size(&self) -> Option<usize> { |
50 | self.stack_size | |
51 | } | |
52 | ||
f035d41b | 53 | /// Executes the main loop for this thread. This will not return until the |
416331ca XL |
54 | /// thread pool is dropped. |
55 | pub fn run(self) { | |
56 | unsafe { main_loop(self.worker, self.registry, self.index) } | |
57 | } | |
58 | } | |
59 | ||
60 | impl fmt::Debug for ThreadBuilder { | |
f035d41b | 61 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
416331ca XL |
62 | f.debug_struct("ThreadBuilder") |
63 | .field("pool", &self.registry.id()) | |
64 | .field("index", &self.index) | |
65 | .field("name", &self.name) | |
66 | .field("stack_size", &self.stack_size) | |
67 | .finish() | |
68 | } | |
69 | } | |
2c00a5a8 | 70 | |
416331ca XL |
71 | /// Generalized trait for spawning a thread in the `Registry`. |
72 | /// | |
73 | /// This trait is pub-in-private -- E0445 forces us to make it public, | |
74 | /// but we don't actually want to expose these details in the API. | |
75 | pub trait ThreadSpawn { | |
76 | private_decl! {} | |
77 | ||
78 | /// Spawn a thread with the `ThreadBuilder` parameters, and then | |
79 | /// call `ThreadBuilder::run()`. | |
f035d41b | 80 | fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>; |
416331ca XL |
81 | } |
82 | ||
83 | /// Spawns a thread in the "normal" way with `std::thread::Builder`. | |
84 | /// | |
85 | /// This type is pub-in-private -- E0445 forces us to make it public, | |
86 | /// but we don't actually want to expose these details in the API. | |
87 | #[derive(Debug, Default)] | |
88 | pub struct DefaultSpawn; | |
89 | ||
90 | impl ThreadSpawn for DefaultSpawn { | |
91 | private_impl! {} | |
92 | ||
93 | fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { | |
94 | let mut b = thread::Builder::new(); | |
95 | if let Some(name) = thread.name() { | |
96 | b = b.name(name.to_owned()); | |
97 | } | |
98 | if let Some(stack_size) = thread.stack_size() { | |
99 | b = b.stack_size(stack_size); | |
100 | } | |
101 | b.spawn(|| thread.run())?; | |
102 | Ok(()) | |
103 | } | |
104 | } | |
105 | ||
106 | /// Spawns a thread with a user's custom callback. | |
107 | /// | |
108 | /// This type is pub-in-private -- E0445 forces us to make it public, | |
109 | /// but we don't actually want to expose these details in the API. | |
110 | #[derive(Debug)] | |
111 | pub struct CustomSpawn<F>(F); | |
112 | ||
113 | impl<F> CustomSpawn<F> | |
114 | where | |
115 | F: FnMut(ThreadBuilder) -> io::Result<()>, | |
116 | { | |
117 | pub(super) fn new(spawn: F) -> Self { | |
118 | CustomSpawn(spawn) | |
119 | } | |
120 | } | |
121 | ||
122 | impl<F> ThreadSpawn for CustomSpawn<F> | |
123 | where | |
124 | F: FnMut(ThreadBuilder) -> io::Result<()>, | |
125 | { | |
126 | private_impl! {} | |
127 | ||
128 | #[inline] | |
129 | fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { | |
130 | (self.0)(thread) | |
131 | } | |
132 | } | |
133 | ||
134 | pub(super) struct Registry { | |
1b1a35ee | 135 | logger: Logger, |
2c00a5a8 | 136 | thread_infos: Vec<ThreadInfo>, |
2c00a5a8 | 137 | sleep: Sleep, |
1b1a35ee | 138 | injected_jobs: Injector<JobRef>, |
2c00a5a8 XL |
139 | panic_handler: Option<Box<PanicHandler>>, |
140 | start_handler: Option<Box<StartHandler>>, | |
141 | exit_handler: Option<Box<ExitHandler>>, | |
142 | ||
143 | // When this latch reaches 0, it means that all work on this | |
144 | // registry must be complete. This is ensured in the following ways: | |
145 | // | |
146 | // - if this is the global registry, there is a ref-count that never | |
147 | // gets released. | |
148 | // - if this is a user-created thread-pool, then so long as the thread-pool | |
149 | // exists, it holds a reference. | |
150 | // - when we inject a "blocking job" into the registry with `ThreadPool::install()`, | |
151 | // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't | |
152 | // return until the blocking job is complete, that ref will continue to be held. | |
153 | // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed. | |
154 | // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`) | |
155 | // and that job will keep the pool alive. | |
1b1a35ee | 156 | terminate_count: AtomicUsize, |
2c00a5a8 XL |
157 | } |
158 | ||
2c00a5a8 XL |
159 | /// //////////////////////////////////////////////////////////////////////// |
160 | /// Initialization | |
161 | ||
162 | static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None; | |
e74abb32 | 163 | static THE_REGISTRY_SET: Once = Once::new(); |
2c00a5a8 XL |
164 | |
165 | /// Starts the worker threads (if that has not already happened). If | |
166 | /// initialization has not already occurred, use the default | |
167 | /// configuration. | |
168 | fn global_registry() -> &'static Arc<Registry> { | |
416331ca XL |
169 | set_global_registry(|| Registry::new(ThreadPoolBuilder::new())) |
170 | .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) }) | |
171 | .expect("The global thread pool has not been initialized.") | |
2c00a5a8 XL |
172 | } |
173 | ||
174 | /// Starts the worker threads (if that has not already happened) with | |
175 | /// the given builder. | |
416331ca XL |
176 | pub(super) fn init_global_registry<S>( |
177 | builder: ThreadPoolBuilder<S>, | |
178 | ) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> | |
179 | where | |
180 | S: ThreadSpawn, | |
181 | { | |
182 | set_global_registry(|| Registry::new(builder)) | |
2c00a5a8 XL |
183 | } |
184 | ||
416331ca XL |
185 | /// Starts the worker threads (if that has not already happened) |
186 | /// by creating a registry with the given callback. | |
187 | fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> | |
188 | where | |
189 | F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>, | |
190 | { | |
191 | let mut result = Err(ThreadPoolBuildError::new( | |
192 | ErrorKind::GlobalPoolAlreadyInitialized, | |
193 | )); | |
194 | THE_REGISTRY_SET.call_once(|| { | |
195 | result = registry().map(|registry| { | |
196 | let registry = leak(registry); | |
197 | unsafe { | |
198 | THE_REGISTRY = Some(registry); | |
199 | } | |
200 | registry | |
201 | }); | |
202 | }); | |
203 | result | |
2c00a5a8 XL |
204 | } |
205 | ||
206 | struct Terminator<'a>(&'a Arc<Registry>); | |
207 | ||
208 | impl<'a> Drop for Terminator<'a> { | |
209 | fn drop(&mut self) { | |
210 | self.0.terminate() | |
211 | } | |
212 | } | |
213 | ||
214 | impl Registry { | |
416331ca XL |
215 | pub(super) fn new<S>( |
216 | mut builder: ThreadPoolBuilder<S>, | |
217 | ) -> Result<Arc<Self>, ThreadPoolBuildError> | |
218 | where | |
219 | S: ThreadSpawn, | |
220 | { | |
2c00a5a8 XL |
221 | let n_threads = builder.get_num_threads(); |
222 | let breadth_first = builder.get_breadth_first(); | |
223 | ||
416331ca XL |
224 | let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) |
225 | .map(|_| { | |
e74abb32 XL |
226 | let worker = if breadth_first { |
227 | Worker::new_fifo() | |
416331ca | 228 | } else { |
e74abb32 XL |
229 | Worker::new_lifo() |
230 | }; | |
231 | ||
232 | let stealer = worker.stealer(); | |
233 | (worker, stealer) | |
416331ca XL |
234 | }) |
235 | .unzip(); | |
2c00a5a8 | 236 | |
1b1a35ee | 237 | let logger = Logger::new(n_threads); |
2c00a5a8 | 238 | let registry = Arc::new(Registry { |
1b1a35ee | 239 | logger: logger.clone(), |
416331ca | 240 | thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), |
1b1a35ee XL |
241 | sleep: Sleep::new(logger, n_threads), |
242 | injected_jobs: Injector::new(), | |
243 | terminate_count: AtomicUsize::new(1), | |
2c00a5a8 XL |
244 | panic_handler: builder.take_panic_handler(), |
245 | start_handler: builder.take_start_handler(), | |
246 | exit_handler: builder.take_exit_handler(), | |
247 | }); | |
248 | ||
249 | // If we return early or panic, make sure to terminate existing threads. | |
250 | let t1000 = Terminator(®istry); | |
251 | ||
252 | for (index, worker) in workers.into_iter().enumerate() { | |
416331ca XL |
253 | let thread = ThreadBuilder { |
254 | name: builder.get_thread_name(index), | |
255 | stack_size: builder.get_stack_size(), | |
256 | registry: registry.clone(), | |
257 | worker, | |
258 | index, | |
259 | }; | |
260 | if let Err(e) = builder.get_spawn_handler().spawn(thread) { | |
261 | return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); | |
2c00a5a8 XL |
262 | } |
263 | } | |
264 | ||
265 | // Returning normally now, without termination. | |
266 | mem::forget(t1000); | |
267 | ||
268 | Ok(registry.clone()) | |
269 | } | |
270 | ||
416331ca | 271 | pub(super) fn current() -> Arc<Registry> { |
2c00a5a8 XL |
272 | unsafe { |
273 | let worker_thread = WorkerThread::current(); | |
274 | if worker_thread.is_null() { | |
275 | global_registry().clone() | |
276 | } else { | |
277 | (*worker_thread).registry.clone() | |
278 | } | |
279 | } | |
280 | } | |
281 | ||
282 | /// Returns the number of threads in the current registry. This | |
283 | /// is better than `Registry::current().num_threads()` because it | |
284 | /// avoids incrementing the `Arc`. | |
416331ca | 285 | pub(super) fn current_num_threads() -> usize { |
2c00a5a8 XL |
286 | unsafe { |
287 | let worker_thread = WorkerThread::current(); | |
288 | if worker_thread.is_null() { | |
289 | global_registry().num_threads() | |
290 | } else { | |
291 | (*worker_thread).registry.num_threads() | |
292 | } | |
293 | } | |
294 | } | |
295 | ||
416331ca XL |
296 | /// Returns the current `WorkerThread` if it's part of this `Registry`. |
297 | pub(super) fn current_thread(&self) -> Option<&WorkerThread> { | |
298 | unsafe { | |
299 | let worker = WorkerThread::current().as_ref()?; | |
300 | if worker.registry().id() == self.id() { | |
301 | Some(worker) | |
302 | } else { | |
303 | None | |
304 | } | |
305 | } | |
306 | } | |
2c00a5a8 XL |
307 | |
308 | /// Returns an opaque identifier for this registry. | |
416331ca | 309 | pub(super) fn id(&self) -> RegistryId { |
2c00a5a8 XL |
310 | // We can rely on `self` not to change since we only ever create |
311 | // registries that are boxed up in an `Arc` (see `new()` above). | |
416331ca XL |
312 | RegistryId { |
313 | addr: self as *const Self as usize, | |
314 | } | |
2c00a5a8 XL |
315 | } |
316 | ||
1b1a35ee XL |
317 | #[inline] |
318 | pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { | |
319 | self.logger.log(event) | |
320 | } | |
321 | ||
416331ca | 322 | pub(super) fn num_threads(&self) -> usize { |
2c00a5a8 XL |
323 | self.thread_infos.len() |
324 | } | |
325 | ||
e74abb32 | 326 | pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) { |
2c00a5a8 XL |
327 | match self.panic_handler { |
328 | Some(ref handler) => { | |
329 | // If the customizable panic handler itself panics, | |
330 | // then we abort. | |
331 | let abort_guard = unwind::AbortIfPanic; | |
332 | handler(err); | |
333 | mem::forget(abort_guard); | |
334 | } | |
335 | None => { | |
336 | // Default panic handler aborts. | |
337 | let _ = unwind::AbortIfPanic; // let this drop. | |
338 | } | |
339 | } | |
340 | } | |
341 | ||
342 | /// Waits for the worker threads to get up and running. This is | |
343 | /// meant to be used for benchmarking purposes, primarily, so that | |
344 | /// you can get more consistent numbers by having everything | |
345 | /// "ready to go". | |
416331ca | 346 | pub(super) fn wait_until_primed(&self) { |
2c00a5a8 XL |
347 | for info in &self.thread_infos { |
348 | info.primed.wait(); | |
349 | } | |
350 | } | |
351 | ||
352 | /// Waits for the worker threads to stop. This is used for testing | |
353 | /// -- so we can check that termination actually works. | |
354 | #[cfg(test)] | |
416331ca | 355 | pub(super) fn wait_until_stopped(&self) { |
2c00a5a8 XL |
356 | for info in &self.thread_infos { |
357 | info.stopped.wait(); | |
358 | } | |
359 | } | |
360 | ||
361 | /// //////////////////////////////////////////////////////////////////////// | |
362 | /// MAIN LOOP | |
363 | /// | |
364 | /// So long as all of the worker threads are hanging out in their | |
365 | /// top-level loop, there is no work to be done. | |
366 | ||
367 | /// Push a job into the given `registry`. If we are running on a | |
368 | /// worker thread for the registry, this will push onto the | |
369 | /// deque. Else, it will inject from the outside (which is slower). | |
416331ca | 370 | pub(super) fn inject_or_push(&self, job_ref: JobRef) { |
2c00a5a8 XL |
371 | let worker_thread = WorkerThread::current(); |
372 | unsafe { | |
373 | if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { | |
374 | (*worker_thread).push(job_ref); | |
375 | } else { | |
376 | self.inject(&[job_ref]); | |
377 | } | |
378 | } | |
379 | } | |
380 | ||
2c00a5a8 XL |
381 | /// Push a job into the "external jobs" queue; it will be taken by |
382 | /// whatever worker has nothing to do. Use this is you know that | |
383 | /// you are not on a worker of this registry. | |
416331ca | 384 | pub(super) fn inject(&self, injected_jobs: &[JobRef]) { |
1b1a35ee XL |
385 | self.log(|| JobsInjected { |
386 | count: injected_jobs.len(), | |
416331ca XL |
387 | }); |
388 | ||
389 | // It should not be possible for `state.terminate` to be true | |
390 | // here. It is only set to true when the user creates (and | |
391 | // drops) a `ThreadPool`; and, in that case, they cannot be | |
392 | // calling `inject()` later, since they dropped their | |
393 | // `ThreadPool`. | |
1b1a35ee XL |
394 | debug_assert_ne!( |
395 | self.terminate_count.load(Ordering::Acquire), | |
396 | 0, | |
416331ca XL |
397 | "inject() sees state.terminate as true" |
398 | ); | |
399 | ||
1b1a35ee XL |
400 | let queue_was_empty = self.injected_jobs.is_empty(); |
401 | ||
416331ca XL |
402 | for &job_ref in injected_jobs { |
403 | self.injected_jobs.push(job_ref); | |
2c00a5a8 | 404 | } |
1b1a35ee XL |
405 | |
406 | self.sleep | |
407 | .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty); | |
408 | } | |
409 | ||
410 | fn has_injected_job(&self) -> bool { | |
411 | !self.injected_jobs.is_empty() | |
2c00a5a8 XL |
412 | } |
413 | ||
414 | fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> { | |
1b1a35ee XL |
415 | loop { |
416 | match self.injected_jobs.steal() { | |
417 | Steal::Success(job) => { | |
418 | self.log(|| JobUninjected { | |
419 | worker: worker_index, | |
420 | }); | |
421 | return Some(job); | |
422 | } | |
423 | Steal::Empty => return None, | |
424 | Steal::Retry => {} | |
425 | } | |
2c00a5a8 XL |
426 | } |
427 | } | |
428 | ||
429 | /// If already in a worker-thread of this registry, just execute `op`. | |
430 | /// Otherwise, inject `op` in this thread-pool. Either way, block until `op` | |
431 | /// completes and return its return value. If `op` panics, that panic will | |
432 | /// be propagated as well. The second argument indicates `true` if injection | |
433 | /// was performed, `false` if executed directly. | |
416331ca XL |
434 | pub(super) fn in_worker<OP, R>(&self, op: OP) -> R |
435 | where | |
436 | OP: FnOnce(&WorkerThread, bool) -> R + Send, | |
437 | R: Send, | |
2c00a5a8 XL |
438 | { |
439 | unsafe { | |
440 | let worker_thread = WorkerThread::current(); | |
441 | if worker_thread.is_null() { | |
442 | self.in_worker_cold(op) | |
443 | } else if (*worker_thread).registry().id() != self.id() { | |
444 | self.in_worker_cross(&*worker_thread, op) | |
445 | } else { | |
446 | // Perfectly valid to give them a `&T`: this is the | |
447 | // current thread, so we know the data structure won't be | |
448 | // invalidated until we return. | |
449 | op(&*worker_thread, false) | |
450 | } | |
451 | } | |
452 | } | |
453 | ||
454 | #[cold] | |
455 | unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R | |
416331ca XL |
456 | where |
457 | OP: FnOnce(&WorkerThread, bool) -> R + Send, | |
458 | R: Send, | |
2c00a5a8 | 459 | { |
e74abb32 XL |
460 | thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); |
461 | ||
462 | LOCK_LATCH.with(|l| { | |
463 | // This thread isn't a member of *any* thread pool, so just block. | |
464 | debug_assert!(WorkerThread::current().is_null()); | |
465 | let job = StackJob::new( | |
466 | |injected| { | |
467 | let worker_thread = WorkerThread::current(); | |
468 | assert!(injected && !worker_thread.is_null()); | |
469 | op(&*worker_thread, true) | |
470 | }, | |
471 | l, | |
472 | ); | |
473 | self.inject(&[job.as_job_ref()]); | |
474 | job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. | |
1b1a35ee XL |
475 | |
476 | // flush accumulated logs as we exit the thread | |
477 | self.logger.log(|| Flush); | |
478 | ||
e74abb32 XL |
479 | job.into_result() |
480 | }) | |
2c00a5a8 XL |
481 | } |
482 | ||
483 | #[cold] | |
484 | unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R | |
416331ca XL |
485 | where |
486 | OP: FnOnce(&WorkerThread, bool) -> R + Send, | |
487 | R: Send, | |
2c00a5a8 XL |
488 | { |
489 | // This thread is a member of a different pool, so let it process | |
490 | // other work while waiting for this `op` to complete. | |
491 | debug_assert!(current_thread.registry().id() != self.id()); | |
1b1a35ee | 492 | let latch = SpinLatch::cross(current_thread); |
416331ca XL |
493 | let job = StackJob::new( |
494 | |injected| { | |
495 | let worker_thread = WorkerThread::current(); | |
496 | assert!(injected && !worker_thread.is_null()); | |
497 | op(&*worker_thread, true) | |
498 | }, | |
499 | latch, | |
500 | ); | |
2c00a5a8 XL |
501 | self.inject(&[job.as_job_ref()]); |
502 | current_thread.wait_until(&job.latch); | |
503 | job.into_result() | |
504 | } | |
505 | ||
f035d41b | 506 | /// Increments the terminate counter. This increment should be |
2c00a5a8 XL |
507 | /// balanced by a call to `terminate`, which will decrement. This |
508 | /// is used when spawning asynchronous work, which needs to | |
509 | /// prevent the registry from terminating so long as it is active. | |
510 | /// | |
511 | /// Note that blocking functions such as `join` and `scope` do not | |
512 | /// need to concern themselves with this fn; their context is | |
513 | /// responsible for ensuring the current thread-pool will not | |
514 | /// terminate until they return. | |
515 | /// | |
516 | /// The global thread-pool always has an outstanding reference | |
517 | /// (the initial one). Custom thread-pools have one outstanding | |
518 | /// reference that is dropped when the `ThreadPool` is dropped: | |
519 | /// since installing the thread-pool blocks until any joins/scopes | |
520 | /// complete, this ensures that joins/scopes are covered. | |
521 | /// | |
522 | /// The exception is `::spawn()`, which can create a job outside | |
523 | /// of any blocking scope. In that case, the job itself holds a | |
524 | /// terminate count and is responsible for invoking `terminate()` | |
525 | /// when finished. | |
416331ca | 526 | pub(super) fn increment_terminate_count(&self) { |
1b1a35ee XL |
527 | let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel); |
528 | debug_assert!(previous != 0, "registry ref count incremented from zero"); | |
529 | assert!( | |
530 | previous != std::usize::MAX, | |
531 | "overflow in registry ref count" | |
532 | ); | |
2c00a5a8 XL |
533 | } |
534 | ||
535 | /// Signals that the thread-pool which owns this registry has been | |
536 | /// dropped. The worker threads will gradually terminate, once any | |
537 | /// extant work is completed. | |
416331ca | 538 | pub(super) fn terminate(&self) { |
1b1a35ee XL |
539 | if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { |
540 | for (i, thread_info) in self.thread_infos.iter().enumerate() { | |
541 | thread_info.terminate.set_and_tickle_one(self, i); | |
542 | } | |
543 | } | |
2c00a5a8 | 544 | } |
f035d41b | 545 | |
1b1a35ee XL |
546 | /// Notify the worker that the latch they are sleeping on has been "set". |
547 | pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { | |
548 | self.sleep.notify_worker_latch_is_set(target_worker_index); | |
f035d41b | 549 | } |
2c00a5a8 XL |
550 | } |
551 | ||
552 | #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] | |
416331ca XL |
553 | pub(super) struct RegistryId { |
554 | addr: usize, | |
2c00a5a8 XL |
555 | } |
556 | ||
557 | struct ThreadInfo { | |
558 | /// Latch set once thread has started and we are entering into the | |
559 | /// main loop. Used to wait for worker threads to become primed, | |
560 | /// primarily of interest for benchmarking. | |
561 | primed: LockLatch, | |
562 | ||
563 | /// Latch is set once worker thread has completed. Used to wait | |
564 | /// until workers have stopped; only used for tests. | |
565 | stopped: LockLatch, | |
566 | ||
1b1a35ee XL |
567 | /// The latch used to signal that terminated has been requested. |
568 | /// This latch is *set* by the `terminate` method on the | |
569 | /// `Registry`, once the registry's main "terminate" counter | |
570 | /// reaches zero. | |
571 | /// | |
572 | /// NB. We use a `CountLatch` here because it has no lifetimes and is | |
573 | /// meant for async use, but the count never gets higher than one. | |
574 | terminate: CountLatch, | |
575 | ||
2c00a5a8 XL |
576 | /// the "stealer" half of the worker's deque |
577 | stealer: Stealer<JobRef>, | |
578 | } | |
579 | ||
580 | impl ThreadInfo { | |
581 | fn new(stealer: Stealer<JobRef>) -> ThreadInfo { | |
582 | ThreadInfo { | |
583 | primed: LockLatch::new(), | |
584 | stopped: LockLatch::new(), | |
1b1a35ee | 585 | terminate: CountLatch::new(), |
416331ca | 586 | stealer, |
2c00a5a8 XL |
587 | } |
588 | } | |
589 | } | |
590 | ||
591 | /// //////////////////////////////////////////////////////////////////////// | |
592 | /// WorkerThread identifiers | |
593 | ||
416331ca | 594 | pub(super) struct WorkerThread { |
2c00a5a8 | 595 | /// the "worker" half of our local deque |
416331ca | 596 | worker: Worker<JobRef>, |
2c00a5a8 | 597 | |
416331ca XL |
598 | /// local queue used for `spawn_fifo` indirection |
599 | fifo: JobFifo, | |
2c00a5a8 | 600 | |
416331ca | 601 | index: usize, |
2c00a5a8 XL |
602 | |
603 | /// A weak random number generator. | |
dc9dc135 | 604 | rng: XorShift64Star, |
2c00a5a8 XL |
605 | |
606 | registry: Arc<Registry>, | |
607 | } | |
608 | ||
609 | // This is a bit sketchy, but basically: the WorkerThread is | |
610 | // allocated on the stack of the worker on entry and stored into this | |
611 | // thread local variable. So it will remain valid at least until the | |
612 | // worker is fully unwound. Using an unsafe pointer avoids the need | |
613 | // for a RefCell<T> etc. | |
614 | thread_local! { | |
416331ca XL |
615 | static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null()); |
616 | } | |
617 | ||
618 | impl Drop for WorkerThread { | |
619 | fn drop(&mut self) { | |
620 | // Undo `set_current` | |
621 | WORKER_THREAD_STATE.with(|t| { | |
622 | assert!(t.get().eq(&(self as *const _))); | |
623 | t.set(ptr::null()); | |
624 | }); | |
625 | } | |
2c00a5a8 XL |
626 | } |
627 | ||
628 | impl WorkerThread { | |
629 | /// Gets the `WorkerThread` index for the current thread; returns | |
630 | /// NULL if this is not a worker thread. This pointer is valid | |
631 | /// anywhere on the current thread. | |
632 | #[inline] | |
416331ca XL |
633 | pub(super) fn current() -> *const WorkerThread { |
634 | WORKER_THREAD_STATE.with(Cell::get) | |
2c00a5a8 XL |
635 | } |
636 | ||
637 | /// Sets `self` as the worker thread index for the current thread. | |
638 | /// This is done during worker thread startup. | |
639 | unsafe fn set_current(thread: *const WorkerThread) { | |
640 | WORKER_THREAD_STATE.with(|t| { | |
641 | assert!(t.get().is_null()); | |
642 | t.set(thread); | |
643 | }); | |
644 | } | |
645 | ||
646 | /// Returns the registry that owns this worker thread. | |
1b1a35ee | 647 | #[inline] |
416331ca | 648 | pub(super) fn registry(&self) -> &Arc<Registry> { |
2c00a5a8 XL |
649 | &self.registry |
650 | } | |
651 | ||
1b1a35ee XL |
652 | #[inline] |
653 | pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { | |
654 | self.registry.logger.log(event) | |
655 | } | |
656 | ||
2c00a5a8 XL |
657 | /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). |
658 | #[inline] | |
416331ca | 659 | pub(super) fn index(&self) -> usize { |
2c00a5a8 XL |
660 | self.index |
661 | } | |
662 | ||
663 | #[inline] | |
416331ca | 664 | pub(super) unsafe fn push(&self, job: JobRef) { |
1b1a35ee XL |
665 | self.log(|| JobPushed { worker: self.index }); |
666 | let queue_was_empty = self.worker.is_empty(); | |
2c00a5a8 | 667 | self.worker.push(job); |
1b1a35ee XL |
668 | self.registry |
669 | .sleep | |
670 | .new_internal_jobs(self.index, 1, queue_was_empty); | |
2c00a5a8 XL |
671 | } |
672 | ||
673 | #[inline] | |
416331ca XL |
674 | pub(super) unsafe fn push_fifo(&self, job: JobRef) { |
675 | self.push(self.fifo.push(job)); | |
676 | } | |
677 | ||
678 | #[inline] | |
679 | pub(super) fn local_deque_is_empty(&self) -> bool { | |
680 | self.worker.is_empty() | |
2c00a5a8 XL |
681 | } |
682 | ||
683 | /// Attempts to obtain a "local" job -- typically this means | |
684 | /// popping from the top of the stack, though if we are configured | |
685 | /// for breadth-first execution, it would mean dequeuing from the | |
686 | /// bottom. | |
687 | #[inline] | |
416331ca | 688 | pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> { |
1b1a35ee XL |
689 | let popped_job = self.worker.pop(); |
690 | ||
691 | if popped_job.is_some() { | |
692 | self.log(|| JobPopped { worker: self.index }); | |
693 | } | |
694 | ||
695 | popped_job | |
2c00a5a8 XL |
696 | } |
697 | ||
698 | /// Wait until the latch is set. Try to keep busy by popping and | |
699 | /// stealing tasks as necessary. | |
700 | #[inline] | |
1b1a35ee XL |
701 | pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) { |
702 | let latch = latch.as_core_latch(); | |
2c00a5a8 XL |
703 | if !latch.probe() { |
704 | self.wait_until_cold(latch); | |
705 | } | |
706 | } | |
707 | ||
708 | #[cold] | |
1b1a35ee | 709 | unsafe fn wait_until_cold(&self, latch: &CoreLatch) { |
2c00a5a8 XL |
710 | // the code below should swallow all panics and hence never |
711 | // unwind; but if something does wrong, we want to abort, | |
712 | // because otherwise other code in rayon may assume that the | |
713 | // latch has been signaled, and that can lead to random memory | |
714 | // accesses, which would be *very bad* | |
715 | let abort_guard = unwind::AbortIfPanic; | |
716 | ||
1b1a35ee | 717 | let mut idle_state = self.registry.sleep.start_looking(self.index, latch); |
2c00a5a8 XL |
718 | while !latch.probe() { |
719 | // Try to find some work to do. We give preference first | |
720 | // to things in our local deque, then in other workers | |
721 | // deques, and finally to injected jobs from the | |
722 | // outside. The idea is to finish what we started before | |
723 | // we take on something new. | |
416331ca XL |
724 | if let Some(job) = self |
725 | .take_local_job() | |
726 | .or_else(|| self.steal()) | |
727 | .or_else(|| self.registry.pop_injected_job(self.index)) | |
728 | { | |
1b1a35ee | 729 | self.registry.sleep.work_found(idle_state); |
2c00a5a8 | 730 | self.execute(job); |
1b1a35ee | 731 | idle_state = self.registry.sleep.start_looking(self.index, latch); |
2c00a5a8 | 732 | } else { |
1b1a35ee XL |
733 | self.registry |
734 | .sleep | |
735 | .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job()) | |
2c00a5a8 XL |
736 | } |
737 | } | |
738 | ||
739 | // If we were sleepy, we are not anymore. We "found work" -- | |
740 | // whatever the surrounding thread was doing before it had to | |
741 | // wait. | |
1b1a35ee | 742 | self.registry.sleep.work_found(idle_state); |
2c00a5a8 | 743 | |
1b1a35ee XL |
744 | self.log(|| ThreadSawLatchSet { |
745 | worker: self.index, | |
746 | latch_addr: latch.addr(), | |
747 | }); | |
2c00a5a8 XL |
748 | mem::forget(abort_guard); // successful execution, do not abort |
749 | } | |
750 | ||
1b1a35ee | 751 | #[inline] |
416331ca | 752 | pub(super) unsafe fn execute(&self, job: JobRef) { |
2c00a5a8 | 753 | job.execute(); |
2c00a5a8 XL |
754 | } |
755 | ||
756 | /// Try to steal a single job and return it. | |
757 | /// | |
758 | /// This should only be done as a last resort, when there is no | |
759 | /// local work to do. | |
760 | unsafe fn steal(&self) -> Option<JobRef> { | |
761 | // we only steal when we don't have any work to do locally | |
416331ca | 762 | debug_assert!(self.local_deque_is_empty()); |
2c00a5a8 XL |
763 | |
764 | // otherwise, try to steal | |
1b1a35ee XL |
765 | let thread_infos = &self.registry.thread_infos.as_slice(); |
766 | let num_threads = thread_infos.len(); | |
2c00a5a8 XL |
767 | if num_threads <= 1 { |
768 | return None; | |
769 | } | |
dc9dc135 | 770 | |
1b1a35ee XL |
771 | loop { |
772 | let mut retry = false; | |
773 | let start = self.rng.next_usize(num_threads); | |
774 | let job = (start..num_threads) | |
775 | .chain(0..start) | |
776 | .filter(move |&i| i != self.index) | |
777 | .find_map(|victim_index| { | |
778 | let victim = &thread_infos[victim_index]; | |
2c00a5a8 | 779 | match victim.stealer.steal() { |
1b1a35ee XL |
780 | Steal::Success(job) => { |
781 | self.log(|| JobStolen { | |
2c00a5a8 | 782 | worker: self.index, |
1b1a35ee | 783 | victim: victim_index, |
2c00a5a8 | 784 | }); |
1b1a35ee XL |
785 | Some(job) |
786 | } | |
787 | Steal::Empty => None, | |
788 | Steal::Retry => { | |
789 | retry = true; | |
790 | None | |
416331ca | 791 | } |
2c00a5a8 | 792 | } |
1b1a35ee XL |
793 | }); |
794 | if job.is_some() || !retry { | |
795 | return job; | |
796 | } | |
797 | } | |
2c00a5a8 XL |
798 | } |
799 | } | |
800 | ||
801 | /// //////////////////////////////////////////////////////////////////////// | |
802 | ||
416331ca XL |
803 | unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) { |
804 | let worker_thread = &WorkerThread { | |
805 | worker, | |
806 | fifo: JobFifo::new(), | |
807 | index, | |
dc9dc135 | 808 | rng: XorShift64Star::new(), |
2c00a5a8 XL |
809 | registry: registry.clone(), |
810 | }; | |
416331ca | 811 | WorkerThread::set_current(worker_thread); |
2c00a5a8 XL |
812 | |
813 | // let registry know we are ready to do work | |
814 | registry.thread_infos[index].primed.set(); | |
815 | ||
816 | // Worker threads should not panic. If they do, just abort, as the | |
817 | // internal state of the threadpool is corrupted. Note that if | |
818 | // **user code** panics, we should catch that and redirect. | |
819 | let abort_guard = unwind::AbortIfPanic; | |
820 | ||
821 | // Inform a user callback that we started a thread. | |
822 | if let Some(ref handler) = registry.start_handler { | |
823 | let registry = registry.clone(); | |
824 | match unwind::halt_unwinding(|| handler(index)) { | |
416331ca | 825 | Ok(()) => {} |
2c00a5a8 XL |
826 | Err(err) => { |
827 | registry.handle_panic(err); | |
828 | } | |
829 | } | |
830 | } | |
831 | ||
1b1a35ee XL |
832 | let my_terminate_latch = ®istry.thread_infos[index].terminate; |
833 | worker_thread.log(|| ThreadStart { | |
834 | worker: index, | |
835 | terminate_addr: my_terminate_latch.as_core_latch().addr(), | |
836 | }); | |
837 | worker_thread.wait_until(my_terminate_latch); | |
2c00a5a8 XL |
838 | |
839 | // Should not be any work left in our queue. | |
840 | debug_assert!(worker_thread.take_local_job().is_none()); | |
841 | ||
842 | // let registry know we are done | |
843 | registry.thread_infos[index].stopped.set(); | |
844 | ||
845 | // Normal termination, do not abort. | |
846 | mem::forget(abort_guard); | |
847 | ||
1b1a35ee XL |
848 | worker_thread.log(|| ThreadTerminate { worker: index }); |
849 | ||
2c00a5a8 XL |
850 | // Inform a user callback that we exited a thread. |
851 | if let Some(ref handler) = registry.exit_handler { | |
852 | let registry = registry.clone(); | |
853 | match unwind::halt_unwinding(|| handler(index)) { | |
416331ca | 854 | Ok(()) => {} |
2c00a5a8 XL |
855 | Err(err) => { |
856 | registry.handle_panic(err); | |
857 | } | |
858 | } | |
859 | // We're already exiting the thread, there's nothing else to do. | |
860 | } | |
861 | } | |
862 | ||
863 | /// If already in a worker-thread, just execute `op`. Otherwise, | |
864 | /// execute `op` in the default thread-pool. Either way, block until | |
865 | /// `op` completes and return its return value. If `op` panics, that | |
866 | /// panic will be propagated as well. The second argument indicates | |
867 | /// `true` if injection was performed, `false` if executed directly. | |
416331ca XL |
868 | pub(super) fn in_worker<OP, R>(op: OP) -> R |
869 | where | |
870 | OP: FnOnce(&WorkerThread, bool) -> R + Send, | |
871 | R: Send, | |
2c00a5a8 XL |
872 | { |
873 | unsafe { | |
874 | let owner_thread = WorkerThread::current(); | |
875 | if !owner_thread.is_null() { | |
876 | // Perfectly valid to give them a `&T`: this is the | |
877 | // current thread, so we know the data structure won't be | |
878 | // invalidated until we return. | |
879 | op(&*owner_thread, false) | |
880 | } else { | |
881 | global_registry().in_worker_cold(op) | |
882 | } | |
883 | } | |
884 | } | |
dc9dc135 XL |
885 | |
886 | /// [xorshift*] is a fast pseudorandom number generator which will | |
887 | /// even tolerate weak seeding, as long as it's not zero. | |
888 | /// | |
889 | /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* | |
890 | struct XorShift64Star { | |
891 | state: Cell<u64>, | |
892 | } | |
893 | ||
894 | impl XorShift64Star { | |
895 | fn new() -> Self { | |
896 | // Any non-zero seed will do -- this uses the hash of a global counter. | |
897 | let mut seed = 0; | |
898 | while seed == 0 { | |
899 | let mut hasher = DefaultHasher::new(); | |
416331ca | 900 | #[allow(deprecated)] |
dc9dc135 XL |
901 | static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT; |
902 | hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed)); | |
903 | seed = hasher.finish(); | |
904 | } | |
905 | ||
906 | XorShift64Star { | |
907 | state: Cell::new(seed), | |
908 | } | |
909 | } | |
910 | ||
911 | fn next(&self) -> u64 { | |
912 | let mut x = self.state.get(); | |
913 | debug_assert_ne!(x, 0); | |
914 | x ^= x >> 12; | |
915 | x ^= x << 25; | |
916 | x ^= x >> 27; | |
917 | self.state.set(x); | |
918 | x.wrapping_mul(0x2545_f491_4f6c_dd1d) | |
919 | } | |
920 | ||
921 | /// Return a value from `0..n`. | |
922 | fn next_usize(&self, n: usize) -> usize { | |
923 | (self.next() % n as u64) as usize | |
924 | } | |
925 | } |