]> git.proxmox.com Git - rustc.git/blob - vendor/rustc-rayon-core/src/thread_pool/mod.rs
New upstream version 1.70.0+dfsg1
[rustc.git] / vendor / rustc-rayon-core / src / thread_pool / mod.rs
1 //! Contains support for user-managed thread pools, represented by the
2 //! the [`ThreadPool`] type (see that struct for details).
3 //!
4 //! [`ThreadPool`]: struct.ThreadPool.html
5
6 use crate::broadcast::{self, BroadcastContext};
7 use crate::join;
8 use crate::registry::{Registry, ThreadSpawn, WorkerThread};
9 use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
10 use crate::spawn;
11 use crate::{scope, Scope};
12 use crate::{scope_fifo, ScopeFifo};
13 use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
14 use std::error::Error;
15 use std::fmt;
16 use std::sync::Arc;
17
18 mod test;
19
20 /// Represents a user created [thread-pool].
21 ///
22 /// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
23 /// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
24 /// execute functions explicitly within this [`ThreadPool`] using
25 /// [`ThreadPool::install()`]. By contrast, top level rayon functions
26 /// (like `join()`) will execute implicitly within the current thread-pool.
27 ///
28 ///
29 /// ## Creating a ThreadPool
30 ///
31 /// ```rust
32 /// # use rayon_core as rayon;
33 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
34 /// ```
35 ///
36 /// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
37 /// threads. In addition, any other rayon operations called inside of `install()` will also
38 /// execute in the context of the `ThreadPool`.
39 ///
40 /// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
41 /// they will complete executing any remaining work that you have spawned, and automatically
42 /// terminate.
43 ///
44 ///
45 /// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
46 /// [`ThreadPool`]: struct.ThreadPool.html
47 /// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
48 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
49 /// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
50 /// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
51 pub struct ThreadPool {
52 registry: Arc<Registry>,
53 }
54
55 impl ThreadPool {
56 #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
57 #[allow(deprecated)]
58 /// Deprecated in favor of `ThreadPoolBuilder::build`.
59 pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
60 Self::build(configuration.into_builder()).map_err(Box::from)
61 }
62
63 pub(super) fn build<S>(
64 builder: ThreadPoolBuilder<S>,
65 ) -> Result<ThreadPool, ThreadPoolBuildError>
66 where
67 S: ThreadSpawn,
68 {
69 let registry = Registry::new(builder)?;
70 Ok(ThreadPool { registry })
71 }
72
73 /// Executes `op` within the threadpool. Any attempts to use
74 /// `join`, `scope`, or parallel iterators will then operate
75 /// within that threadpool.
76 ///
77 /// # Warning: thread-local data
78 ///
79 /// Because `op` is executing within the Rayon thread-pool,
80 /// thread-local data from the current thread will not be
81 /// accessible.
82 ///
83 /// # Panics
84 ///
85 /// If `op` should panic, that panic will be propagated.
86 ///
87 /// ## Using `install()`
88 ///
89 /// ```rust
90 /// # use rayon_core as rayon;
91 /// fn main() {
92 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
93 /// let n = pool.install(|| fib(20));
94 /// println!("{}", n);
95 /// }
96 ///
97 /// fn fib(n: usize) -> usize {
98 /// if n == 0 || n == 1 {
99 /// return n;
100 /// }
101 /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
102 /// return a + b;
103 /// }
104 /// ```
105 pub fn install<OP, R>(&self, op: OP) -> R
106 where
107 OP: FnOnce() -> R + Send,
108 R: Send,
109 {
110 self.registry.in_worker(|_, _| op())
111 }
112
113 /// Executes `op` within every thread in the threadpool. Any attempts to use
114 /// `join`, `scope`, or parallel iterators will then operate within that
115 /// threadpool.
116 ///
117 /// Broadcasts are executed on each thread after they have exhausted their
118 /// local work queue, before they attempt work-stealing from other threads.
119 /// The goal of that strategy is to run everywhere in a timely manner
120 /// *without* being too disruptive to current work. There may be alternative
121 /// broadcast styles added in the future for more or less aggressive
122 /// injection, if the need arises.
123 ///
124 /// # Warning: thread-local data
125 ///
126 /// Because `op` is executing within the Rayon thread-pool,
127 /// thread-local data from the current thread will not be
128 /// accessible.
129 ///
130 /// # Panics
131 ///
132 /// If `op` should panic on one or more threads, exactly one panic
133 /// will be propagated, only after all threads have completed
134 /// (or panicked) their own `op`.
135 ///
136 /// # Examples
137 ///
138 /// ```
139 /// # use rayon_core as rayon;
140 /// use std::sync::atomic::{AtomicUsize, Ordering};
141 ///
142 /// fn main() {
143 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
144 ///
145 /// // The argument gives context, including the index of each thread.
146 /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
147 /// assert_eq!(v, &[0, 1, 4, 9, 16]);
148 ///
149 /// // The closure can reference the local stack
150 /// let count = AtomicUsize::new(0);
151 /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
152 /// assert_eq!(count.into_inner(), 5);
153 /// }
154 /// ```
155 pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
156 where
157 OP: Fn(BroadcastContext<'_>) -> R + Sync,
158 R: Send,
159 {
160 // We assert that `self.registry` has not terminated.
161 unsafe { broadcast::broadcast_in(op, &self.registry) }
162 }
163
164 /// Returns the (current) number of threads in the thread pool.
165 ///
166 /// # Future compatibility note
167 ///
168 /// Note that unless this thread-pool was created with a
169 /// [`ThreadPoolBuilder`] that specifies the number of threads,
170 /// then this number may vary over time in future versions (see [the
171 /// `num_threads()` method for details][snt]).
172 ///
173 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
174 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
175 #[inline]
176 pub fn current_num_threads(&self) -> usize {
177 self.registry.num_threads()
178 }
179
180 /// If called from a Rayon worker thread in this thread-pool,
181 /// returns the index of that thread; if not called from a Rayon
182 /// thread, or called from a Rayon thread that belongs to a
183 /// different thread-pool, returns `None`.
184 ///
185 /// The index for a given thread will not change over the thread's
186 /// lifetime. However, multiple threads may share the same index if
187 /// they are in distinct thread-pools.
188 ///
189 /// # Future compatibility note
190 ///
191 /// Currently, every thread-pool (including the global
192 /// thread-pool) has a fixed number of threads, but this may
193 /// change in future Rayon versions (see [the `num_threads()` method
194 /// for details][snt]). In that case, the index for a
195 /// thread would not change during its lifetime, but thread
196 /// indices may wind up being reused if threads are terminated and
197 /// restarted.
198 ///
199 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
200 #[inline]
201 pub fn current_thread_index(&self) -> Option<usize> {
202 let curr = self.registry.current_thread()?;
203 Some(curr.index())
204 }
205
206 /// Returns true if the current worker thread currently has "local
207 /// tasks" pending. This can be useful as part of a heuristic for
208 /// deciding whether to spawn a new task or execute code on the
209 /// current thread, particularly in breadth-first
210 /// schedulers. However, keep in mind that this is an inherently
211 /// racy check, as other worker threads may be actively "stealing"
212 /// tasks from our local deque.
213 ///
214 /// **Background:** Rayon's uses a [work-stealing] scheduler. The
215 /// key idea is that each thread has its own [deque] of
216 /// tasks. Whenever a new task is spawned -- whether through
217 /// `join()`, `Scope::spawn()`, or some other means -- that new
218 /// task is pushed onto the thread's *local* deque. Worker threads
219 /// have a preference for executing their own tasks; if however
220 /// they run out of tasks, they will go try to "steal" tasks from
221 /// other threads. This function therefore has an inherent race
222 /// with other active worker threads, which may be removing items
223 /// from the local deque.
224 ///
225 /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
226 /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
227 #[inline]
228 pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
229 let curr = self.registry.current_thread()?;
230 Some(!curr.local_deque_is_empty())
231 }
232
233 /// Execute `oper_a` and `oper_b` in the thread-pool and return
234 /// the results. Equivalent to `self.install(|| join(oper_a,
235 /// oper_b))`.
236 pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
237 where
238 A: FnOnce() -> RA + Send,
239 B: FnOnce() -> RB + Send,
240 RA: Send,
241 RB: Send,
242 {
243 self.install(|| join(oper_a, oper_b))
244 }
245
246 /// Creates a scope that executes within this thread-pool.
247 /// Equivalent to `self.install(|| scope(...))`.
248 ///
249 /// See also: [the `scope()` function][scope].
250 ///
251 /// [scope]: fn.scope.html
252 pub fn scope<'scope, OP, R>(&self, op: OP) -> R
253 where
254 OP: FnOnce(&Scope<'scope>) -> R + Send,
255 R: Send,
256 {
257 self.install(|| scope(op))
258 }
259
260 /// Creates a scope that executes within this thread-pool.
261 /// Spawns from the same thread are prioritized in relative FIFO order.
262 /// Equivalent to `self.install(|| scope_fifo(...))`.
263 ///
264 /// See also: [the `scope_fifo()` function][scope_fifo].
265 ///
266 /// [scope_fifo]: fn.scope_fifo.html
267 pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
268 where
269 OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
270 R: Send,
271 {
272 self.install(|| scope_fifo(op))
273 }
274
275 /// Creates a scope that spawns work into this thread-pool.
276 ///
277 /// See also: [the `in_place_scope()` function][in_place_scope].
278 ///
279 /// [in_place_scope]: fn.in_place_scope.html
280 pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
281 where
282 OP: FnOnce(&Scope<'scope>) -> R,
283 {
284 do_in_place_scope(Some(&self.registry), op)
285 }
286
287 /// Creates a scope that spawns work into this thread-pool in FIFO order.
288 ///
289 /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
290 ///
291 /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
292 pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
293 where
294 OP: FnOnce(&ScopeFifo<'scope>) -> R,
295 {
296 do_in_place_scope_fifo(Some(&self.registry), op)
297 }
298
299 /// Spawns an asynchronous task in this thread-pool. This task will
300 /// run in the implicit, global scope, which means that it may outlast
301 /// the current stack frame -- therefore, it cannot capture any references
302 /// onto the stack (you will likely need a `move` closure).
303 ///
304 /// See also: [the `spawn()` function defined on scopes][spawn].
305 ///
306 /// [spawn]: struct.Scope.html#method.spawn
307 pub fn spawn<OP>(&self, op: OP)
308 where
309 OP: FnOnce() + Send + 'static,
310 {
311 // We assert that `self.registry` has not terminated.
312 unsafe { spawn::spawn_in(op, &self.registry) }
313 }
314
315 /// Spawns an asynchronous task in this thread-pool. This task will
316 /// run in the implicit, global scope, which means that it may outlast
317 /// the current stack frame -- therefore, it cannot capture any references
318 /// onto the stack (you will likely need a `move` closure).
319 ///
320 /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
321 ///
322 /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
323 pub fn spawn_fifo<OP>(&self, op: OP)
324 where
325 OP: FnOnce() + Send + 'static,
326 {
327 // We assert that `self.registry` has not terminated.
328 unsafe { spawn::spawn_fifo_in(op, &self.registry) }
329 }
330
331 /// Spawns an asynchronous task on every thread in this thread-pool. This task
332 /// will run in the implicit, global scope, which means that it may outlast the
333 /// current stack frame -- therefore, it cannot capture any references onto the
334 /// stack (you will likely need a `move` closure).
335 pub fn spawn_broadcast<OP>(&self, op: OP)
336 where
337 OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
338 {
339 // We assert that `self.registry` has not terminated.
340 unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
341 }
342
343 /// Cooperatively yields execution to Rayon.
344 ///
345 /// This is similar to the general [`yield_now()`], but only if the current
346 /// thread is part of *this* thread pool.
347 ///
348 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
349 /// nothing was available, or `None` if the current thread is not part this pool.
350 pub fn yield_now(&self) -> Option<Yield> {
351 let curr = self.registry.current_thread()?;
352 Some(curr.yield_now())
353 }
354
355 /// Cooperatively yields execution to local Rayon work.
356 ///
357 /// This is similar to the general [`yield_local()`], but only if the current
358 /// thread is part of *this* thread pool.
359 ///
360 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
361 /// nothing was available, or `None` if the current thread is not part this pool.
362 pub fn yield_local(&self) -> Option<Yield> {
363 let curr = self.registry.current_thread()?;
364 Some(curr.yield_local())
365 }
366
367 pub(crate) fn wait_until_stopped(self) {
368 let registry = self.registry.clone();
369 drop(self);
370 registry.wait_until_stopped();
371 }
372 }
373
374 impl Drop for ThreadPool {
375 fn drop(&mut self) {
376 self.registry.terminate();
377 }
378 }
379
380 impl fmt::Debug for ThreadPool {
381 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
382 fmt.debug_struct("ThreadPool")
383 .field("num_threads", &self.current_num_threads())
384 .field("id", &self.registry.id())
385 .finish()
386 }
387 }
388
389 /// If called from a Rayon worker thread, returns the index of that
390 /// thread within its current pool; if not called from a Rayon thread,
391 /// returns `None`.
392 ///
393 /// The index for a given thread will not change over the thread's
394 /// lifetime. However, multiple threads may share the same index if
395 /// they are in distinct thread-pools.
396 ///
397 /// See also: [the `ThreadPool::current_thread_index()` method].
398 ///
399 /// [m]: struct.ThreadPool.html#method.current_thread_index
400 ///
401 /// # Future compatibility note
402 ///
403 /// Currently, every thread-pool (including the global
404 /// thread-pool) has a fixed number of threads, but this may
405 /// change in future Rayon versions (see [the `num_threads()` method
406 /// for details][snt]). In that case, the index for a
407 /// thread would not change during its lifetime, but thread
408 /// indices may wind up being reused if threads are terminated and
409 /// restarted.
410 ///
411 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
412 #[inline]
413 pub fn current_thread_index() -> Option<usize> {
414 unsafe {
415 let curr = WorkerThread::current().as_ref()?;
416 Some(curr.index())
417 }
418 }
419
420 /// If called from a Rayon worker thread, indicates whether that
421 /// thread's local deque still has pending tasks. Otherwise, returns
422 /// `None`. For more information, see [the
423 /// `ThreadPool::current_thread_has_pending_tasks()` method][m].
424 ///
425 /// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
426 #[inline]
427 pub fn current_thread_has_pending_tasks() -> Option<bool> {
428 unsafe {
429 let curr = WorkerThread::current().as_ref()?;
430 Some(!curr.local_deque_is_empty())
431 }
432 }
433
434 /// Cooperatively yields execution to Rayon.
435 ///
436 /// If the current thread is part of a rayon thread pool, this looks for a
437 /// single unit of pending work in the pool, then executes it. Completion of
438 /// that work might include nested work or further work stealing.
439 ///
440 /// This is similar to [`std::thread::yield_now()`], but does not literally make
441 /// that call. If you are implementing a polling loop, you may want to also
442 /// yield to the OS scheduler yourself if no Rayon work was found.
443 ///
444 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
445 /// nothing was available, or `None` if this thread is not part of any pool at all.
446 pub fn yield_now() -> Option<Yield> {
447 unsafe {
448 let thread = WorkerThread::current().as_ref()?;
449 Some(thread.yield_now())
450 }
451 }
452
453 /// Cooperatively yields execution to local Rayon work.
454 ///
455 /// If the current thread is part of a rayon thread pool, this looks for a
456 /// single unit of pending work in this thread's queue, then executes it.
457 /// Completion of that work might include nested work or further work stealing.
458 ///
459 /// This is similar to [`yield_now()`], but does not steal from other threads.
460 ///
461 /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
462 /// nothing was available, or `None` if this thread is not part of any pool at all.
463 pub fn yield_local() -> Option<Yield> {
464 unsafe {
465 let thread = WorkerThread::current().as_ref()?;
466 Some(thread.yield_local())
467 }
468 }
469
470 /// Result of [`yield_now()`] or [`yield_local()`].
471 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
472 pub enum Yield {
473 /// Work was found and executed.
474 Executed,
475 /// No available work was found.
476 Idle,
477 }