]>
Commit | Line | Data |
---|---|---|
6a06907d XL |
1 | use crate::job::{JobFifo, JobRef, StackJob}; |
2 | use crate::latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch}; | |
3 | use crate::log::Event::*; | |
4 | use crate::sleep::Sleep; | |
5 | use crate::unwind; | |
6 | use crate::util::leak; | |
7 | use crate::{ | |
8 | AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, | |
9 | ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, | |
10 | }; | |
e74abb32 XL |
11 | use crossbeam_deque::{Steal, Stealer, Worker}; |
12 | use crossbeam_queue::SegQueue; | |
94b46f34 | 13 | use std::any::Any; |
532ac7d7 XL |
14 | use std::cell::Cell; |
15 | use std::collections::hash_map::DefaultHasher; | |
e74abb32 | 16 | use std::fmt; |
532ac7d7 | 17 | use std::hash::Hasher; |
e74abb32 | 18 | use std::io; |
532ac7d7 | 19 | use std::mem; |
e74abb32 XL |
20 | use std::ptr; |
21 | #[allow(deprecated)] | |
22 | use std::sync::atomic::ATOMIC_USIZE_INIT; | |
23 | use std::sync::atomic::{AtomicUsize, Ordering}; | |
24 | use std::sync::{Arc, Once}; | |
94b46f34 | 25 | use std::thread; |
94b46f34 | 26 | use std::usize; |
e74abb32 XL |
27 | |
28 | /// Thread builder used for customization via | |
29 | /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). | |
30 | pub struct ThreadBuilder { | |
31 | name: Option<String>, | |
32 | stack_size: Option<usize>, | |
33 | worker: Worker<JobRef>, | |
34 | registry: Arc<Registry>, | |
35 | index: usize, | |
36 | } | |
37 | ||
38 | impl ThreadBuilder { | |
39 | /// Get the index of this thread in the pool, within `0..num_threads`. | |
40 | pub fn index(&self) -> usize { | |
41 | self.index | |
42 | } | |
43 | ||
44 | /// Get the string that was specified by `ThreadPoolBuilder::name()`. | |
45 | pub fn name(&self) -> Option<&str> { | |
46 | self.name.as_ref().map(String::as_str) | |
47 | } | |
48 | ||
49 | /// Get the value that was specified by `ThreadPoolBuilder::stack_size()`. | |
50 | pub fn stack_size(&self) -> Option<usize> { | |
51 | self.stack_size | |
52 | } | |
53 | ||
54 | /// Execute the main loop for this thread. This will not return until the | |
55 | /// thread pool is dropped. | |
56 | pub fn run(self) { | |
57 | unsafe { main_loop(self.worker, self.registry, self.index) } | |
58 | } | |
59 | } | |
60 | ||
61 | impl fmt::Debug for ThreadBuilder { | |
6a06907d | 62 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
e74abb32 XL |
63 | f.debug_struct("ThreadBuilder") |
64 | .field("pool", &self.registry.id()) | |
65 | .field("index", &self.index) | |
66 | .field("name", &self.name) | |
67 | .field("stack_size", &self.stack_size) | |
68 | .finish() | |
69 | } | |
70 | } | |
71 | ||
72 | /// Generalized trait for spawning a thread in the `Registry`. | |
73 | /// | |
74 | /// This trait is pub-in-private -- E0445 forces us to make it public, | |
75 | /// but we don't actually want to expose these details in the API. | |
76 | pub trait ThreadSpawn { | |
77 | private_decl! {} | |
78 | ||
79 | /// Spawn a thread with the `ThreadBuilder` parameters, and then | |
80 | /// call `ThreadBuilder::run()`. | |
6a06907d | 81 | fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>; |
e74abb32 XL |
82 | } |
83 | ||
84 | /// Spawns a thread in the "normal" way with `std::thread::Builder`. | |
85 | /// | |
86 | /// This type is pub-in-private -- E0445 forces us to make it public, | |
87 | /// but we don't actually want to expose these details in the API. | |
88 | #[derive(Debug, Default)] | |
89 | pub struct DefaultSpawn; | |
90 | ||
91 | impl ThreadSpawn for DefaultSpawn { | |
92 | private_impl! {} | |
93 | ||
94 | fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { | |
95 | let mut b = thread::Builder::new(); | |
96 | if let Some(name) = thread.name() { | |
97 | b = b.name(name.to_owned()); | |
98 | } | |
99 | if let Some(stack_size) = thread.stack_size() { | |
100 | b = b.stack_size(stack_size); | |
101 | } | |
102 | b.spawn(|| thread.run())?; | |
103 | Ok(()) | |
104 | } | |
105 | } | |
106 | ||
107 | /// Spawns a thread with a user's custom callback. | |
108 | /// | |
109 | /// This type is pub-in-private -- E0445 forces us to make it public, | |
110 | /// but we don't actually want to expose these details in the API. | |
111 | #[derive(Debug)] | |
112 | pub struct CustomSpawn<F>(F); | |
113 | ||
114 | impl<F> CustomSpawn<F> | |
115 | where | |
116 | F: FnMut(ThreadBuilder) -> io::Result<()>, | |
117 | { | |
118 | pub(super) fn new(spawn: F) -> Self { | |
119 | CustomSpawn(spawn) | |
120 | } | |
121 | } | |
122 | ||
123 | impl<F> ThreadSpawn for CustomSpawn<F> | |
124 | where | |
125 | F: FnMut(ThreadBuilder) -> io::Result<()>, | |
126 | { | |
127 | private_impl! {} | |
128 | ||
129 | #[inline] | |
130 | fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> { | |
131 | (self.0)(thread) | |
132 | } | |
133 | } | |
94b46f34 XL |
134 | |
135 | pub struct Registry { | |
136 | thread_infos: Vec<ThreadInfo>, | |
94b46f34 | 137 | sleep: Sleep, |
e74abb32 | 138 | injected_jobs: SegQueue<JobRef>, |
94b46f34 | 139 | panic_handler: Option<Box<PanicHandler>>, |
532ac7d7 | 140 | pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>, |
94b46f34 XL |
141 | start_handler: Option<Box<StartHandler>>, |
142 | exit_handler: Option<Box<ExitHandler>>, | |
532ac7d7 XL |
143 | pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>, |
144 | pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>, | |
94b46f34 XL |
145 | |
146 | // When this latch reaches 0, it means that all work on this | |
147 | // registry must be complete. This is ensured in the following ways: | |
148 | // | |
149 | // - if this is the global registry, there is a ref-count that never | |
150 | // gets released. | |
151 | // - if this is a user-created thread-pool, then so long as the thread-pool | |
152 | // exists, it holds a reference. | |
153 | // - when we inject a "blocking job" into the registry with `ThreadPool::install()`, | |
154 | // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't | |
155 | // return until the blocking job is complete, that ref will continue to be held. | |
156 | // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed. | |
157 | // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`) | |
158 | // and that job will keep the pool alive. | |
159 | terminate_latch: CountLatch, | |
160 | } | |
161 | ||
94b46f34 XL |
162 | /// //////////////////////////////////////////////////////////////////////// |
163 | /// Initialization | |
164 | ||
165 | static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None; | |
e74abb32 | 166 | static THE_REGISTRY_SET: Once = Once::new(); |
94b46f34 XL |
167 | |
168 | /// Starts the worker threads (if that has not already happened). If | |
169 | /// initialization has not already occurred, use the default | |
170 | /// configuration. | |
171 | fn global_registry() -> &'static Arc<Registry> { | |
e74abb32 XL |
172 | set_global_registry(|| Registry::new(ThreadPoolBuilder::new())) |
173 | .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) }) | |
174 | .expect("The global thread pool has not been initialized.") | |
94b46f34 XL |
175 | } |
176 | ||
177 | /// Starts the worker threads (if that has not already happened) with | |
178 | /// the given builder. | |
e74abb32 XL |
179 | pub(super) fn init_global_registry<S>( |
180 | builder: ThreadPoolBuilder<S>, | |
181 | ) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> | |
182 | where | |
183 | S: ThreadSpawn, | |
184 | { | |
185 | set_global_registry(|| Registry::new(builder)) | |
94b46f34 XL |
186 | } |
187 | ||
e74abb32 XL |
188 | /// Starts the worker threads (if that has not already happened) |
189 | /// by creating a registry with the given callback. | |
190 | fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> | |
191 | where | |
192 | F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>, | |
193 | { | |
194 | let mut result = Err(ThreadPoolBuildError::new( | |
195 | ErrorKind::GlobalPoolAlreadyInitialized, | |
196 | )); | |
197 | THE_REGISTRY_SET.call_once(|| { | |
198 | result = registry().map(|registry| { | |
199 | let registry = leak(registry); | |
200 | unsafe { | |
201 | THE_REGISTRY = Some(registry); | |
202 | } | |
203 | registry | |
204 | }); | |
205 | }); | |
206 | result | |
94b46f34 XL |
207 | } |
208 | ||
209 | struct Terminator<'a>(&'a Arc<Registry>); | |
210 | ||
211 | impl<'a> Drop for Terminator<'a> { | |
212 | fn drop(&mut self) { | |
213 | self.0.terminate() | |
214 | } | |
215 | } | |
216 | ||
217 | impl Registry { | |
e74abb32 XL |
218 | pub(super) fn new<S>( |
219 | mut builder: ThreadPoolBuilder<S>, | |
220 | ) -> Result<Arc<Self>, ThreadPoolBuildError> | |
221 | where | |
222 | S: ThreadSpawn, | |
223 | { | |
94b46f34 XL |
224 | let n_threads = builder.get_num_threads(); |
225 | let breadth_first = builder.get_breadth_first(); | |
226 | ||
e74abb32 XL |
227 | let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) |
228 | .map(|_| { | |
229 | let worker = if breadth_first { | |
230 | Worker::new_fifo() | |
231 | } else { | |
232 | Worker::new_lifo() | |
233 | }; | |
234 | ||
235 | let stealer = worker.stealer(); | |
236 | (worker, stealer) | |
237 | }) | |
238 | .unzip(); | |
94b46f34 XL |
239 | |
240 | let registry = Arc::new(Registry { | |
e74abb32 | 241 | thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), |
94b46f34 | 242 | sleep: Sleep::new(n_threads), |
e74abb32 | 243 | injected_jobs: SegQueue::new(), |
94b46f34 XL |
244 | terminate_latch: CountLatch::new(), |
245 | panic_handler: builder.take_panic_handler(), | |
246 | deadlock_handler: builder.take_deadlock_handler(), | |
247 | start_handler: builder.take_start_handler(), | |
94b46f34 | 248 | exit_handler: builder.take_exit_handler(), |
532ac7d7 XL |
249 | acquire_thread_handler: builder.take_acquire_thread_handler(), |
250 | release_thread_handler: builder.take_release_thread_handler(), | |
94b46f34 XL |
251 | }); |
252 | ||
253 | // If we return early or panic, make sure to terminate existing threads. | |
254 | let t1000 = Terminator(®istry); | |
255 | ||
256 | for (index, worker) in workers.into_iter().enumerate() { | |
e74abb32 XL |
257 | let thread = ThreadBuilder { |
258 | name: builder.get_thread_name(index), | |
259 | stack_size: builder.get_stack_size(), | |
260 | registry: registry.clone(), | |
261 | worker, | |
262 | index, | |
263 | }; | |
264 | if let Err(e) = builder.get_spawn_handler().spawn(thread) { | |
532ac7d7 | 265 | return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); |
94b46f34 XL |
266 | } |
267 | } | |
268 | ||
269 | // Returning normally now, without termination. | |
270 | mem::forget(t1000); | |
271 | ||
272 | Ok(registry.clone()) | |
273 | } | |
274 | ||
94b46f34 XL |
275 | pub fn current() -> Arc<Registry> { |
276 | unsafe { | |
277 | let worker_thread = WorkerThread::current(); | |
278 | if worker_thread.is_null() { | |
279 | global_registry().clone() | |
280 | } else { | |
281 | (*worker_thread).registry.clone() | |
282 | } | |
283 | } | |
284 | } | |
285 | ||
286 | /// Returns the number of threads in the current registry. This | |
287 | /// is better than `Registry::current().num_threads()` because it | |
288 | /// avoids incrementing the `Arc`. | |
e74abb32 | 289 | pub(super) fn current_num_threads() -> usize { |
94b46f34 XL |
290 | unsafe { |
291 | let worker_thread = WorkerThread::current(); | |
292 | if worker_thread.is_null() { | |
293 | global_registry().num_threads() | |
294 | } else { | |
295 | (*worker_thread).registry.num_threads() | |
296 | } | |
297 | } | |
298 | } | |
299 | ||
e74abb32 XL |
300 | /// Returns the current `WorkerThread` if it's part of this `Registry`. |
301 | pub(super) fn current_thread(&self) -> Option<&WorkerThread> { | |
302 | unsafe { | |
303 | let worker = WorkerThread::current().as_ref()?; | |
304 | if worker.registry().id() == self.id() { | |
305 | Some(worker) | |
306 | } else { | |
307 | None | |
308 | } | |
309 | } | |
310 | } | |
311 | ||
94b46f34 | 312 | /// Returns an opaque identifier for this registry. |
e74abb32 | 313 | pub(super) fn id(&self) -> RegistryId { |
94b46f34 XL |
314 | // We can rely on `self` not to change since we only ever create |
315 | // registries that are boxed up in an `Arc` (see `new()` above). | |
532ac7d7 XL |
316 | RegistryId { |
317 | addr: self as *const Self as usize, | |
318 | } | |
94b46f34 XL |
319 | } |
320 | ||
e74abb32 | 321 | pub(super) fn num_threads(&self) -> usize { |
94b46f34 XL |
322 | self.thread_infos.len() |
323 | } | |
324 | ||
e74abb32 | 325 | pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) { |
94b46f34 XL |
326 | match self.panic_handler { |
327 | Some(ref handler) => { | |
328 | // If the customizable panic handler itself panics, | |
329 | // then we abort. | |
330 | let abort_guard = unwind::AbortIfPanic; | |
331 | handler(err); | |
332 | mem::forget(abort_guard); | |
333 | } | |
334 | None => { | |
335 | // Default panic handler aborts. | |
336 | let _ = unwind::AbortIfPanic; // let this drop. | |
337 | } | |
338 | } | |
339 | } | |
340 | ||
341 | /// Waits for the worker threads to get up and running. This is | |
342 | /// meant to be used for benchmarking purposes, primarily, so that | |
343 | /// you can get more consistent numbers by having everything | |
344 | /// "ready to go". | |
e74abb32 | 345 | pub(super) fn wait_until_primed(&self) { |
94b46f34 XL |
346 | for info in &self.thread_infos { |
347 | info.primed.wait(); | |
348 | } | |
349 | } | |
350 | ||
351 | /// Waits for the worker threads to stop. This is used for testing | |
352 | /// -- so we can check that termination actually works. | |
e74abb32 | 353 | pub(super) fn wait_until_stopped(&self) { |
532ac7d7 | 354 | self.release_thread(); |
94b46f34 XL |
355 | for info in &self.thread_infos { |
356 | info.stopped.wait(); | |
357 | } | |
532ac7d7 XL |
358 | self.acquire_thread(); |
359 | } | |
360 | ||
361 | pub(crate) fn acquire_thread(&self) { | |
362 | if let Some(ref acquire_thread_handler) = self.acquire_thread_handler { | |
363 | acquire_thread_handler(); | |
364 | } | |
365 | } | |
366 | ||
367 | pub(crate) fn release_thread(&self) { | |
368 | if let Some(ref release_thread_handler) = self.release_thread_handler { | |
369 | release_thread_handler(); | |
370 | } | |
94b46f34 XL |
371 | } |
372 | ||
373 | /// //////////////////////////////////////////////////////////////////////// | |
374 | /// MAIN LOOP | |
375 | /// | |
376 | /// So long as all of the worker threads are hanging out in their | |
377 | /// top-level loop, there is no work to be done. | |
378 | ||
379 | /// Push a job into the given `registry`. If we are running on a | |
380 | /// worker thread for the registry, this will push onto the | |
381 | /// deque. Else, it will inject from the outside (which is slower). | |
e74abb32 | 382 | pub(super) fn inject_or_push(&self, job_ref: JobRef) { |
94b46f34 XL |
383 | let worker_thread = WorkerThread::current(); |
384 | unsafe { | |
385 | if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { | |
386 | (*worker_thread).push(job_ref); | |
387 | } else { | |
388 | self.inject(&[job_ref]); | |
389 | } | |
390 | } | |
391 | } | |
392 | ||
94b46f34 XL |
393 | /// Push a job into the "external jobs" queue; it will be taken by |
394 | /// whatever worker has nothing to do. Use this is you know that | |
395 | /// you are not on a worker of this registry. | |
e74abb32 | 396 | pub(super) fn inject(&self, injected_jobs: &[JobRef]) { |
532ac7d7 XL |
397 | log!(InjectJobs { |
398 | count: injected_jobs.len() | |
399 | }); | |
94b46f34 | 400 | |
e74abb32 XL |
401 | // It should not be possible for `state.terminate` to be true |
402 | // here. It is only set to true when the user creates (and | |
403 | // drops) a `ThreadPool`; and, in that case, they cannot be | |
404 | // calling `inject()` later, since they dropped their | |
405 | // `ThreadPool`. | |
406 | assert!( | |
407 | !self.terminate_latch.probe(), | |
408 | "inject() sees state.terminate as true" | |
409 | ); | |
410 | ||
411 | for &job_ref in injected_jobs { | |
412 | self.injected_jobs.push(job_ref); | |
94b46f34 XL |
413 | } |
414 | self.sleep.tickle(usize::MAX); | |
415 | } | |
416 | ||
417 | fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> { | |
e74abb32 XL |
418 | let job = self.injected_jobs.pop().ok(); |
419 | if job.is_some() { | |
420 | log!(UninjectedWork { | |
421 | worker: worker_index | |
422 | }); | |
94b46f34 | 423 | } |
e74abb32 | 424 | job |
94b46f34 XL |
425 | } |
426 | ||
427 | /// If already in a worker-thread of this registry, just execute `op`. | |
428 | /// Otherwise, inject `op` in this thread-pool. Either way, block until `op` | |
429 | /// completes and return its return value. If `op` panics, that panic will | |
430 | /// be propagated as well. The second argument indicates `true` if injection | |
431 | /// was performed, `false` if executed directly. | |
e74abb32 | 432 | pub(super) fn in_worker<OP, R>(&self, op: OP) -> R |
532ac7d7 XL |
433 | where |
434 | OP: FnOnce(&WorkerThread, bool) -> R + Send, | |
435 | R: Send, | |
94b46f34 XL |
436 | { |
437 | unsafe { | |
438 | let worker_thread = WorkerThread::current(); | |
439 | if worker_thread.is_null() { | |
440 | self.in_worker_cold(op) | |
441 | } else if (*worker_thread).registry().id() != self.id() { | |
442 | self.in_worker_cross(&*worker_thread, op) | |
443 | } else { | |
444 | // Perfectly valid to give them a `&T`: this is the | |
445 | // current thread, so we know the data structure won't be | |
446 | // invalidated until we return. | |
447 | op(&*worker_thread, false) | |
448 | } | |
449 | } | |
450 | } | |
451 | ||
452 | #[cold] | |
453 | unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R | |
532ac7d7 XL |
454 | where |
455 | OP: FnOnce(&WorkerThread, bool) -> R + Send, | |
456 | R: Send, | |
94b46f34 | 457 | { |
e74abb32 XL |
458 | thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); |
459 | ||
460 | LOCK_LATCH.with(|l| { | |
461 | // This thread isn't a member of *any* thread pool, so just block. | |
462 | debug_assert!(WorkerThread::current().is_null()); | |
463 | let job = StackJob::new( | |
464 | 0, | |
465 | |injected| { | |
466 | let worker_thread = WorkerThread::current(); | |
467 | assert!(injected && !worker_thread.is_null()); | |
468 | op(&*worker_thread, true) | |
469 | }, | |
470 | l, | |
471 | ); | |
472 | self.inject(&[job.as_job_ref()]); | |
473 | self.release_thread(); | |
474 | job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. | |
475 | self.acquire_thread(); | |
476 | job.into_result() | |
477 | }) | |
94b46f34 XL |
478 | } |
479 | ||
480 | #[cold] | |
481 | unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R | |
532ac7d7 XL |
482 | where |
483 | OP: FnOnce(&WorkerThread, bool) -> R + Send, | |
484 | R: Send, | |
94b46f34 XL |
485 | { |
486 | // This thread is a member of a different pool, so let it process | |
487 | // other work while waiting for this `op` to complete. | |
488 | debug_assert!(current_thread.registry().id() != self.id()); | |
489 | let latch = TickleLatch::new(SpinLatch::new(), ¤t_thread.registry().sleep); | |
532ac7d7 | 490 | let job = StackJob::new( |
48663c56 | 491 | 0, |
532ac7d7 XL |
492 | |injected| { |
493 | let worker_thread = WorkerThread::current(); | |
494 | assert!(injected && !worker_thread.is_null()); | |
495 | op(&*worker_thread, true) | |
496 | }, | |
497 | latch, | |
498 | ); | |
94b46f34 XL |
499 | self.inject(&[job.as_job_ref()]); |
500 | current_thread.wait_until(&job.latch); | |
501 | job.into_result() | |
502 | } | |
503 | ||
504 | /// Increment the terminate counter. This increment should be | |
505 | /// balanced by a call to `terminate`, which will decrement. This | |
506 | /// is used when spawning asynchronous work, which needs to | |
507 | /// prevent the registry from terminating so long as it is active. | |
508 | /// | |
509 | /// Note that blocking functions such as `join` and `scope` do not | |
510 | /// need to concern themselves with this fn; their context is | |
511 | /// responsible for ensuring the current thread-pool will not | |
512 | /// terminate until they return. | |
513 | /// | |
514 | /// The global thread-pool always has an outstanding reference | |
515 | /// (the initial one). Custom thread-pools have one outstanding | |
516 | /// reference that is dropped when the `ThreadPool` is dropped: | |
517 | /// since installing the thread-pool blocks until any joins/scopes | |
518 | /// complete, this ensures that joins/scopes are covered. | |
519 | /// | |
520 | /// The exception is `::spawn()`, which can create a job outside | |
521 | /// of any blocking scope. In that case, the job itself holds a | |
522 | /// terminate count and is responsible for invoking `terminate()` | |
523 | /// when finished. | |
e74abb32 | 524 | pub(super) fn increment_terminate_count(&self) { |
94b46f34 XL |
525 | self.terminate_latch.increment(); |
526 | } | |
527 | ||
528 | /// Signals that the thread-pool which owns this registry has been | |
529 | /// dropped. The worker threads will gradually terminate, once any | |
530 | /// extant work is completed. | |
e74abb32 | 531 | pub(super) fn terminate(&self) { |
94b46f34 XL |
532 | self.terminate_latch.set(); |
533 | self.sleep.tickle(usize::MAX); | |
534 | } | |
535 | } | |
536 | ||
537 | /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler | |
538 | /// if no other worker thread is active | |
539 | #[inline] | |
540 | pub fn mark_blocked() { | |
541 | let worker_thread = WorkerThread::current(); | |
542 | assert!(!worker_thread.is_null()); | |
543 | unsafe { | |
544 | let registry = &(*worker_thread).registry; | |
545 | registry.sleep.mark_blocked(®istry.deadlock_handler) | |
546 | } | |
547 | } | |
548 | ||
549 | /// Mark a previously blocked Rayon worker thread as unblocked | |
550 | #[inline] | |
551 | pub fn mark_unblocked(registry: &Registry) { | |
552 | registry.sleep.mark_unblocked() | |
553 | } | |
554 | ||
555 | #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] | |
e74abb32 | 556 | pub(super) struct RegistryId { |
532ac7d7 | 557 | addr: usize, |
94b46f34 XL |
558 | } |
559 | ||
94b46f34 XL |
560 | struct ThreadInfo { |
561 | /// Latch set once thread has started and we are entering into the | |
562 | /// main loop. Used to wait for worker threads to become primed, | |
563 | /// primarily of interest for benchmarking. | |
564 | primed: LockLatch, | |
565 | ||
566 | /// Latch is set once worker thread has completed. Used to wait | |
567 | /// until workers have stopped; only used for tests. | |
568 | stopped: LockLatch, | |
569 | ||
570 | /// the "stealer" half of the worker's deque | |
571 | stealer: Stealer<JobRef>, | |
572 | } | |
573 | ||
574 | impl ThreadInfo { | |
575 | fn new(stealer: Stealer<JobRef>) -> ThreadInfo { | |
576 | ThreadInfo { | |
577 | primed: LockLatch::new(), | |
578 | stopped: LockLatch::new(), | |
e74abb32 | 579 | stealer, |
94b46f34 XL |
580 | } |
581 | } | |
582 | } | |
583 | ||
584 | /// //////////////////////////////////////////////////////////////////////// | |
585 | /// WorkerThread identifiers | |
586 | ||
e74abb32 | 587 | pub(super) struct WorkerThread { |
94b46f34 | 588 | /// the "worker" half of our local deque |
e74abb32 | 589 | worker: Worker<JobRef>, |
94b46f34 | 590 | |
e74abb32 XL |
591 | /// local queue used for `spawn_fifo` indirection |
592 | fifo: JobFifo, | |
94b46f34 | 593 | |
e74abb32 | 594 | pub(crate) index: usize, |
94b46f34 XL |
595 | |
596 | /// A weak random number generator. | |
532ac7d7 | 597 | rng: XorShift64Star, |
94b46f34 XL |
598 | |
599 | pub(crate) registry: Arc<Registry>, | |
600 | } | |
601 | ||
602 | // This is a bit sketchy, but basically: the WorkerThread is | |
603 | // allocated on the stack of the worker on entry and stored into this | |
604 | // thread local variable. So it will remain valid at least until the | |
605 | // worker is fully unwound. Using an unsafe pointer avoids the need | |
606 | // for a RefCell<T> etc. | |
607 | thread_local! { | |
e74abb32 XL |
608 | static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null()); |
609 | } | |
610 | ||
611 | impl Drop for WorkerThread { | |
612 | fn drop(&mut self) { | |
613 | // Undo `set_current` | |
614 | WORKER_THREAD_STATE.with(|t| { | |
615 | assert!(t.get().eq(&(self as *const _))); | |
616 | t.set(ptr::null()); | |
617 | }); | |
618 | } | |
94b46f34 XL |
619 | } |
620 | ||
621 | impl WorkerThread { | |
622 | /// Gets the `WorkerThread` index for the current thread; returns | |
623 | /// NULL if this is not a worker thread. This pointer is valid | |
624 | /// anywhere on the current thread. | |
625 | #[inline] | |
e74abb32 XL |
626 | pub(super) fn current() -> *const WorkerThread { |
627 | WORKER_THREAD_STATE.with(Cell::get) | |
94b46f34 XL |
628 | } |
629 | ||
630 | /// Sets `self` as the worker thread index for the current thread. | |
631 | /// This is done during worker thread startup. | |
632 | unsafe fn set_current(thread: *const WorkerThread) { | |
633 | WORKER_THREAD_STATE.with(|t| { | |
634 | assert!(t.get().is_null()); | |
635 | t.set(thread); | |
636 | }); | |
637 | } | |
638 | ||
639 | /// Returns the registry that owns this worker thread. | |
e74abb32 | 640 | pub(super) fn registry(&self) -> &Arc<Registry> { |
94b46f34 XL |
641 | &self.registry |
642 | } | |
643 | ||
644 | /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). | |
645 | #[inline] | |
e74abb32 | 646 | pub(super) fn index(&self) -> usize { |
94b46f34 XL |
647 | self.index |
648 | } | |
649 | ||
650 | #[inline] | |
e74abb32 | 651 | pub(super) unsafe fn push(&self, job: JobRef) { |
94b46f34 XL |
652 | self.worker.push(job); |
653 | self.registry.sleep.tickle(self.index); | |
654 | } | |
655 | ||
656 | #[inline] | |
e74abb32 XL |
657 | pub(super) unsafe fn push_fifo(&self, job: JobRef) { |
658 | self.push(self.fifo.push(job)); | |
659 | } | |
660 | ||
661 | #[inline] | |
662 | pub(super) fn local_deque_is_empty(&self) -> bool { | |
663 | self.worker.is_empty() | |
94b46f34 XL |
664 | } |
665 | ||
666 | /// Attempts to obtain a "local" job -- typically this means | |
667 | /// popping from the top of the stack, though if we are configured | |
668 | /// for breadth-first execution, it would mean dequeuing from the | |
669 | /// bottom. | |
670 | #[inline] | |
e74abb32 XL |
671 | pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> { |
672 | self.worker.pop() | |
94b46f34 XL |
673 | } |
674 | ||
675 | /// Wait until the latch is set. Try to keep busy by popping and | |
676 | /// stealing tasks as necessary. | |
677 | #[inline] | |
e74abb32 | 678 | pub(super) unsafe fn wait_until<L: LatchProbe + ?Sized>(&self, latch: &L) { |
94b46f34 XL |
679 | log!(WaitUntil { worker: self.index }); |
680 | if !latch.probe() { | |
681 | self.wait_until_cold(latch); | |
682 | } | |
683 | } | |
684 | ||
685 | #[cold] | |
686 | unsafe fn wait_until_cold<L: LatchProbe + ?Sized>(&self, latch: &L) { | |
687 | // the code below should swallow all panics and hence never | |
688 | // unwind; but if something does wrong, we want to abort, | |
689 | // because otherwise other code in rayon may assume that the | |
690 | // latch has been signaled, and that can lead to random memory | |
691 | // accesses, which would be *very bad* | |
692 | let abort_guard = unwind::AbortIfPanic; | |
693 | ||
694 | let mut yields = 0; | |
695 | while !latch.probe() { | |
696 | // Try to find some work to do. We give preference first | |
697 | // to things in our local deque, then in other workers | |
698 | // deques, and finally to injected jobs from the | |
699 | // outside. The idea is to finish what we started before | |
700 | // we take on something new. | |
532ac7d7 XL |
701 | if let Some(job) = self |
702 | .take_local_job() | |
703 | .or_else(|| self.steal()) | |
704 | .or_else(|| self.registry.pop_injected_job(self.index)) | |
705 | { | |
94b46f34 XL |
706 | yields = self.registry.sleep.work_found(self.index, yields); |
707 | self.execute(job); | |
708 | } else { | |
e74abb32 XL |
709 | yields = self |
710 | .registry | |
711 | .sleep | |
712 | .no_work_found(self.index, yields, &self.registry); | |
94b46f34 XL |
713 | } |
714 | } | |
715 | ||
716 | // If we were sleepy, we are not anymore. We "found work" -- | |
717 | // whatever the surrounding thread was doing before it had to | |
718 | // wait. | |
719 | self.registry.sleep.work_found(self.index, yields); | |
720 | ||
721 | log!(LatchSet { worker: self.index }); | |
722 | mem::forget(abort_guard); // successful execution, do not abort | |
723 | } | |
724 | ||
e74abb32 | 725 | pub(super) unsafe fn execute(&self, job: JobRef) { |
94b46f34 XL |
726 | job.execute(); |
727 | ||
728 | // Subtle: executing this job will have `set()` some of its | |
729 | // latches. This may mean that a sleepy (or sleeping) worker | |
730 | // can now make progress. So we have to tickle them to let | |
731 | // them know. | |
732 | self.registry.sleep.tickle(self.index); | |
733 | } | |
734 | ||
735 | /// Try to steal a single job and return it. | |
736 | /// | |
737 | /// This should only be done as a last resort, when there is no | |
738 | /// local work to do. | |
739 | unsafe fn steal(&self) -> Option<JobRef> { | |
740 | // we only steal when we don't have any work to do locally | |
e74abb32 | 741 | debug_assert!(self.local_deque_is_empty()); |
94b46f34 XL |
742 | |
743 | // otherwise, try to steal | |
744 | let num_threads = self.registry.thread_infos.len(); | |
745 | if num_threads <= 1 { | |
746 | return None; | |
747 | } | |
532ac7d7 XL |
748 | |
749 | let start = self.rng.next_usize(num_threads); | |
750 | (start..num_threads) | |
751 | .chain(0..start) | |
94b46f34 XL |
752 | .filter(|&i| i != self.index) |
753 | .filter_map(|victim_index| { | |
754 | let victim = &self.registry.thread_infos[victim_index]; | |
755 | loop { | |
756 | match victim.stealer.steal() { | |
757 | Steal::Empty => return None, | |
e74abb32 | 758 | Steal::Success(d) => { |
94b46f34 XL |
759 | log!(StoleWork { |
760 | worker: self.index, | |
761 | victim: victim_index | |
762 | }); | |
763 | return Some(d); | |
532ac7d7 XL |
764 | } |
765 | Steal::Retry => {} | |
94b46f34 XL |
766 | } |
767 | } | |
768 | }) | |
769 | .next() | |
770 | } | |
771 | } | |
772 | ||
773 | /// //////////////////////////////////////////////////////////////////////// | |
774 | ||
e74abb32 XL |
775 | unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) { |
776 | let worker_thread = &WorkerThread { | |
777 | worker, | |
778 | fifo: JobFifo::new(), | |
779 | index, | |
532ac7d7 | 780 | rng: XorShift64Star::new(), |
94b46f34 XL |
781 | registry: registry.clone(), |
782 | }; | |
e74abb32 | 783 | WorkerThread::set_current(worker_thread); |
94b46f34 XL |
784 | |
785 | // let registry know we are ready to do work | |
786 | registry.thread_infos[index].primed.set(); | |
787 | ||
788 | // Worker threads should not panic. If they do, just abort, as the | |
789 | // internal state of the threadpool is corrupted. Note that if | |
790 | // **user code** panics, we should catch that and redirect. | |
791 | let abort_guard = unwind::AbortIfPanic; | |
792 | ||
793 | // Inform a user callback that we started a thread. | |
794 | if let Some(ref handler) = registry.start_handler { | |
795 | let registry = registry.clone(); | |
796 | match unwind::halt_unwinding(|| handler(index)) { | |
532ac7d7 | 797 | Ok(()) => {} |
94b46f34 XL |
798 | Err(err) => { |
799 | registry.handle_panic(err); | |
800 | } | |
801 | } | |
802 | } | |
803 | ||
532ac7d7 | 804 | registry.acquire_thread(); |
e74abb32 | 805 | worker_thread.wait_until(®istry.terminate_latch); |
94b46f34 XL |
806 | |
807 | // Should not be any work left in our queue. | |
808 | debug_assert!(worker_thread.take_local_job().is_none()); | |
809 | ||
810 | // let registry know we are done | |
811 | registry.thread_infos[index].stopped.set(); | |
812 | ||
813 | // Normal termination, do not abort. | |
814 | mem::forget(abort_guard); | |
815 | ||
816 | // Inform a user callback that we exited a thread. | |
817 | if let Some(ref handler) = registry.exit_handler { | |
818 | let registry = registry.clone(); | |
819 | match unwind::halt_unwinding(|| handler(index)) { | |
532ac7d7 | 820 | Ok(()) => {} |
94b46f34 XL |
821 | Err(err) => { |
822 | registry.handle_panic(err); | |
823 | } | |
824 | } | |
825 | // We're already exiting the thread, there's nothing else to do. | |
826 | } | |
532ac7d7 XL |
827 | |
828 | registry.release_thread(); | |
94b46f34 XL |
829 | } |
830 | ||
831 | /// If already in a worker-thread, just execute `op`. Otherwise, | |
832 | /// execute `op` in the default thread-pool. Either way, block until | |
833 | /// `op` completes and return its return value. If `op` panics, that | |
834 | /// panic will be propagated as well. The second argument indicates | |
835 | /// `true` if injection was performed, `false` if executed directly. | |
e74abb32 | 836 | pub(super) fn in_worker<OP, R>(op: OP) -> R |
532ac7d7 XL |
837 | where |
838 | OP: FnOnce(&WorkerThread, bool) -> R + Send, | |
839 | R: Send, | |
94b46f34 XL |
840 | { |
841 | unsafe { | |
842 | let owner_thread = WorkerThread::current(); | |
843 | if !owner_thread.is_null() { | |
844 | // Perfectly valid to give them a `&T`: this is the | |
845 | // current thread, so we know the data structure won't be | |
846 | // invalidated until we return. | |
847 | op(&*owner_thread, false) | |
848 | } else { | |
849 | global_registry().in_worker_cold(op) | |
850 | } | |
851 | } | |
852 | } | |
532ac7d7 XL |
853 | |
854 | /// [xorshift*] is a fast pseudorandom number generator which will | |
855 | /// even tolerate weak seeding, as long as it's not zero. | |
856 | /// | |
857 | /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* | |
858 | struct XorShift64Star { | |
859 | state: Cell<u64>, | |
860 | } | |
861 | ||
862 | impl XorShift64Star { | |
863 | fn new() -> Self { | |
864 | // Any non-zero seed will do -- this uses the hash of a global counter. | |
865 | let mut seed = 0; | |
866 | while seed == 0 { | |
867 | let mut hasher = DefaultHasher::new(); | |
e74abb32 | 868 | #[allow(deprecated)] |
532ac7d7 XL |
869 | static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT; |
870 | hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed)); | |
871 | seed = hasher.finish(); | |
872 | } | |
873 | ||
874 | XorShift64Star { | |
875 | state: Cell::new(seed), | |
876 | } | |
877 | } | |
878 | ||
879 | fn next(&self) -> u64 { | |
880 | let mut x = self.state.get(); | |
881 | debug_assert_ne!(x, 0); | |
882 | x ^= x >> 12; | |
883 | x ^= x << 25; | |
884 | x ^= x >> 27; | |
885 | self.state.set(x); | |
886 | x.wrapping_mul(0x2545_f491_4f6c_dd1d) | |
887 | } | |
888 | ||
889 | /// Return a value from `0..n`. | |
890 | fn next_usize(&self, n: usize) -> usize { | |
891 | (self.next() % n as u64) as usize | |
892 | } | |
893 | } |