]> git.proxmox.com Git - rustc.git/blame - vendor/rayon-core/src/thread_pool/mod.rs
New upstream version 1.32.0~beta.2+dfsg1
[rustc.git] / vendor / rayon-core / src / thread_pool / mod.rs
CommitLineData
2c00a5a8
XL
1//! Contains support for user-managed thread pools, represented by the
2//! the [`ThreadPool`] type (see that struct for details).
3//!
4//! [`ThreadPool`]: struct.ThreadPool.html
5
6#[allow(deprecated)]
7use Configuration;
8use {ThreadPoolBuilder, ThreadPoolBuildError};
9use join;
10use {scope, Scope};
11use spawn;
12use std::sync::Arc;
13use std::error::Error;
14use std::fmt;
15use registry::{Registry, WorkerThread};
16
17mod internal;
18mod test;
19
20/// Represents a user created [thread-pool].
21///
22/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
23/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
24/// execute functions explicitly within this [`ThreadPool`] using
25/// [`ThreadPool::install()`]. By contrast, top level rayon functions
26/// (like `join()`) will execute implicitly within the current thread-pool.
27///
28///
29/// ## Creating a ThreadPool
30///
31/// ```rust
32/// # use rayon_core as rayon;
33/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
34/// ```
35///
36/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
37/// threads. In addition, any other rayon operations called inside of `install()` will also
38/// execute in the context of the `ThreadPool`.
39///
40/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
41/// they will complete executing any remaining work that you have spawned, and automatically
42/// terminate.
43///
44///
45/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
46/// [`ThreadPool`]: struct.ThreadPool.html
47/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
48/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
49/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
50/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
51pub struct ThreadPool {
52 registry: Arc<Registry>,
53}
54
55pub fn build(builder: ThreadPoolBuilder) -> Result<ThreadPool, ThreadPoolBuildError> {
56 let registry = try!(Registry::new(builder));
57 Ok(ThreadPool { registry: registry })
58}
59
60impl ThreadPool {
61 #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
62 #[allow(deprecated)]
63 /// Deprecated in favor of `ThreadPoolBuilder::build`.
64 pub fn new(configuration: Configuration) -> Result<ThreadPool, Box<Error>> {
65 build(configuration.into_builder()).map_err(|e| e.into())
66 }
67
68 /// Returns a handle to the global thread pool. This is the pool
69 /// that Rayon will use by default when you perform a `join()` or
70 /// `scope()` operation, if no other thread-pool is installed. If
71 /// no global thread-pool has yet been started when this function
72 /// is called, then the global thread-pool will be created (with
73 /// the default configuration). If you wish to configure the
74 /// global thread-pool differently, then you can use [the
75 /// `rayon::initialize()` function][f] to do so.
76 ///
77 /// [f]: fn.initialize.html
78 #[cfg(rayon_unstable)]
79 pub fn global() -> &'static Arc<ThreadPool> {
80 lazy_static! {
81 static ref DEFAULT_THREAD_POOL: Arc<ThreadPool> =
82 Arc::new(ThreadPool { registry: Registry::global() });
83 }
84
85 &DEFAULT_THREAD_POOL
86 }
87
88 /// Executes `op` within the threadpool. Any attempts to use
89 /// `join`, `scope`, or parallel iterators will then operate
90 /// within that threadpool.
91 ///
92 /// # Warning: thread-local data
93 ///
94 /// Because `op` is executing within the Rayon thread-pool,
95 /// thread-local data from the current thread will not be
96 /// accessible.
97 ///
98 /// # Panics
99 ///
100 /// If `op` should panic, that panic will be propagated.
101 ///
102 /// ## Using `install()`
103 ///
104 /// ```rust
105 /// # use rayon_core as rayon;
106 /// fn main() {
107 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
108 /// let n = pool.install(|| fib(20));
109 /// println!("{}", n);
110 /// }
111 ///
112 /// fn fib(n: usize) -> usize {
113 /// if n == 0 || n == 1 {
114 /// return n;
115 /// }
116 /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
117 /// return a + b;
118 /// }
119 /// ```
120 pub fn install<OP, R>(&self, op: OP) -> R
121 where OP: FnOnce() -> R + Send,
122 R: Send
123 {
124 self.registry.in_worker(|_, _| op())
125 }
126
127 /// Returns the (current) number of threads in the thread pool.
128 ///
129 /// # Future compatibility note
130 ///
131 /// Note that unless this thread-pool was created with a
132 /// [`ThreadPoolBuilder`] that specifies the number of threads,
133 /// then this number may vary over time in future versions (see [the
134 /// `num_threads()` method for details][snt]).
135 ///
136 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
137 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
138 #[inline]
139 pub fn current_num_threads(&self) -> usize {
140 self.registry.num_threads()
141 }
142
143 /// If called from a Rayon worker thread in this thread-pool,
144 /// returns the index of that thread; if not called from a Rayon
145 /// thread, or called from a Rayon thread that belongs to a
146 /// different thread-pool, returns `None`.
147 ///
148 /// The index for a given thread will not change over the thread's
149 /// lifetime. However, multiple threads may share the same index if
150 /// they are in distinct thread-pools.
151 ///
152 /// # Future compatibility note
153 ///
154 /// Currently, every thread-pool (including the global
155 /// thread-pool) has a fixed number of threads, but this may
156 /// change in future Rayon versions (see [the `num_threads()` method
157 /// for details][snt]). In that case, the index for a
158 /// thread would not change during its lifetime, but thread
159 /// indices may wind up being reused if threads are terminated and
160 /// restarted.
161 ///
162 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
163 #[inline]
164 pub fn current_thread_index(&self) -> Option<usize> {
165 unsafe {
166 let curr = WorkerThread::current();
167 if curr.is_null() {
168 None
169 } else if (*curr).registry().id() != self.registry.id() {
170 None
171 } else {
172 Some((*curr).index())
173 }
174 }
175 }
176
177 /// Returns true if the current worker thread currently has "local
178 /// tasks" pending. This can be useful as part of a heuristic for
179 /// deciding whether to spawn a new task or execute code on the
180 /// current thread, particularly in breadth-first
181 /// schedulers. However, keep in mind that this is an inherently
182 /// racy check, as other worker threads may be actively "stealing"
183 /// tasks from our local deque.
184 ///
185 /// **Background:** Rayon's uses a [work-stealing] scheduler. The
186 /// key idea is that each thread has its own [deque] of
187 /// tasks. Whenever a new task is spawned -- whether through
188 /// `join()`, `Scope::spawn()`, or some other means -- that new
189 /// task is pushed onto the thread's *local* deque. Worker threads
190 /// have a preference for executing their own tasks; if however
191 /// they run out of tasks, they will go try to "steal" tasks from
192 /// other threads. This function therefore has an inherent race
193 /// with other active worker threads, which may be removing items
194 /// from the local deque.
195 ///
196 /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
197 /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
198 #[inline]
199 pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
200 unsafe {
201 let curr = WorkerThread::current();
202 if curr.is_null() {
203 None
204 } else if (*curr).registry().id() != self.registry.id() {
205 None
206 } else {
207 Some(!(*curr).local_deque_is_empty())
208 }
209 }
210 }
211
212 /// Execute `oper_a` and `oper_b` in the thread-pool and return
213 /// the results. Equivalent to `self.install(|| join(oper_a,
214 /// oper_b))`.
215 pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
216 where A: FnOnce() -> RA + Send,
217 B: FnOnce() -> RB + Send,
218 RA: Send,
219 RB: Send
220 {
221 self.install(|| join(oper_a, oper_b))
222 }
223
224 /// Creates a scope that executes within this thread-pool.
225 /// Equivalent to `self.install(|| scope(...))`.
226 ///
227 /// See also: [the `scope()` function][scope].
228 ///
229 /// [scope]: fn.scope.html
230 pub fn scope<'scope, OP, R>(&self, op: OP) -> R
231 where OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, R: Send
232 {
233 self.install(|| scope(op))
234 }
235
236 /// Spawns an asynchronous task in this thread-pool. This task will
237 /// run in the implicit, global scope, which means that it may outlast
238 /// the current stack frame -- therefore, it cannot capture any references
239 /// onto the stack (you will likely need a `move` closure).
240 ///
241 /// See also: [the `spawn()` function defined on scopes][spawn].
242 ///
243 /// [spawn]: struct.Scope.html#method.spawn
244 pub fn spawn<OP>(&self, op: OP)
245 where OP: FnOnce() + Send + 'static
246 {
247 // We assert that `self.registry` has not terminated.
248 unsafe { spawn::spawn_in(op, &self.registry) }
249 }
250}
251
252impl Drop for ThreadPool {
253 fn drop(&mut self) {
254 self.registry.terminate();
255 }
256}
257
258impl fmt::Debug for ThreadPool {
259 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
260 fmt.debug_struct("ThreadPool")
261 .field("num_threads", &self.current_num_threads())
262 .field("id", &self.registry.id())
263 .finish()
264 }
265}
266
267/// If called from a Rayon worker thread, returns the index of that
268/// thread within its current pool; if not called from a Rayon thread,
269/// returns `None`.
270///
271/// The index for a given thread will not change over the thread's
272/// lifetime. However, multiple threads may share the same index if
273/// they are in distinct thread-pools.
274///
275/// See also: [the `ThreadPool::current_thread_index()` method].
276///
277/// [m]: struct.ThreadPool.html#method.current_thread_index
278///
279/// # Future compatibility note
280///
281/// Currently, every thread-pool (including the global
282/// thread-pool) has a fixed number of threads, but this may
283/// change in future Rayon versions (see [the `num_threads()` method
284/// for details][snt]). In that case, the index for a
285/// thread would not change during its lifetime, but thread
286/// indices may wind up being reused if threads are terminated and
287/// restarted.
288///
289/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
290#[inline]
291pub fn current_thread_index() -> Option<usize> {
292 unsafe {
293 let curr = WorkerThread::current();
294 if curr.is_null() {
295 None
296 } else {
297 Some((*curr).index())
298 }
299 }
300}
301
302/// If called from a Rayon worker thread, indicates whether that
303/// thread's local deque still has pending tasks. Otherwise, returns
304/// `None`. For more information, see [the
305/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
306///
307/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
308#[inline]
309pub fn current_thread_has_pending_tasks() -> Option<bool> {
310 unsafe {
311 let curr = WorkerThread::current();
312 if curr.is_null() {
313 None
314 } else {
315 Some(!(*curr).local_deque_is_empty())
316 }
317 }
318}