]>
Commit | Line | Data |
---|---|---|
2c00a5a8 XL |
1 | //! Contains support for user-managed thread pools, represented by the |
2 | //! the [`ThreadPool`] type (see that struct for details). | |
3 | //! | |
4 | //! [`ThreadPool`]: struct.ThreadPool.html | |
5 | ||
6 | #[allow(deprecated)] | |
7 | use Configuration; | |
8 | use {ThreadPoolBuilder, ThreadPoolBuildError}; | |
9 | use join; | |
10 | use {scope, Scope}; | |
11 | use spawn; | |
12 | use std::sync::Arc; | |
13 | use std::error::Error; | |
14 | use std::fmt; | |
15 | use registry::{Registry, WorkerThread}; | |
16 | ||
17 | mod internal; | |
18 | mod test; | |
19 | ||
20 | /// Represents a user created [thread-pool]. | |
21 | /// | |
22 | /// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads | |
23 | /// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then | |
24 | /// execute functions explicitly within this [`ThreadPool`] using | |
25 | /// [`ThreadPool::install()`]. By contrast, top level rayon functions | |
26 | /// (like `join()`) will execute implicitly within the current thread-pool. | |
27 | /// | |
28 | /// | |
29 | /// ## Creating a ThreadPool | |
30 | /// | |
31 | /// ```rust | |
32 | /// # use rayon_core as rayon; | |
33 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); | |
34 | /// ``` | |
35 | /// | |
36 | /// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s | |
37 | /// threads. In addition, any other rayon operations called inside of `install()` will also | |
38 | /// execute in the context of the `ThreadPool`. | |
39 | /// | |
40 | /// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate, | |
41 | /// they will complete executing any remaining work that you have spawned, and automatically | |
42 | /// terminate. | |
43 | /// | |
44 | /// | |
45 | /// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool | |
46 | /// [`ThreadPool`]: struct.ThreadPool.html | |
47 | /// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new | |
48 | /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html | |
49 | /// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build | |
50 | /// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install | |
51 | pub struct ThreadPool { | |
52 | registry: Arc<Registry>, | |
53 | } | |
54 | ||
55 | pub fn build(builder: ThreadPoolBuilder) -> Result<ThreadPool, ThreadPoolBuildError> { | |
56 | let registry = try!(Registry::new(builder)); | |
57 | Ok(ThreadPool { registry: registry }) | |
58 | } | |
59 | ||
60 | impl ThreadPool { | |
61 | #[deprecated(note = "Use `ThreadPoolBuilder::build`")] | |
62 | #[allow(deprecated)] | |
63 | /// Deprecated in favor of `ThreadPoolBuilder::build`. | |
64 | pub fn new(configuration: Configuration) -> Result<ThreadPool, Box<Error>> { | |
65 | build(configuration.into_builder()).map_err(|e| e.into()) | |
66 | } | |
67 | ||
68 | /// Returns a handle to the global thread pool. This is the pool | |
69 | /// that Rayon will use by default when you perform a `join()` or | |
70 | /// `scope()` operation, if no other thread-pool is installed. If | |
71 | /// no global thread-pool has yet been started when this function | |
72 | /// is called, then the global thread-pool will be created (with | |
73 | /// the default configuration). If you wish to configure the | |
74 | /// global thread-pool differently, then you can use [the | |
75 | /// `rayon::initialize()` function][f] to do so. | |
76 | /// | |
77 | /// [f]: fn.initialize.html | |
78 | #[cfg(rayon_unstable)] | |
79 | pub fn global() -> &'static Arc<ThreadPool> { | |
80 | lazy_static! { | |
81 | static ref DEFAULT_THREAD_POOL: Arc<ThreadPool> = | |
82 | Arc::new(ThreadPool { registry: Registry::global() }); | |
83 | } | |
84 | ||
85 | &DEFAULT_THREAD_POOL | |
86 | } | |
87 | ||
88 | /// Executes `op` within the threadpool. Any attempts to use | |
89 | /// `join`, `scope`, or parallel iterators will then operate | |
90 | /// within that threadpool. | |
91 | /// | |
92 | /// # Warning: thread-local data | |
93 | /// | |
94 | /// Because `op` is executing within the Rayon thread-pool, | |
95 | /// thread-local data from the current thread will not be | |
96 | /// accessible. | |
97 | /// | |
98 | /// # Panics | |
99 | /// | |
100 | /// If `op` should panic, that panic will be propagated. | |
101 | /// | |
102 | /// ## Using `install()` | |
103 | /// | |
104 | /// ```rust | |
105 | /// # use rayon_core as rayon; | |
106 | /// fn main() { | |
107 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap(); | |
108 | /// let n = pool.install(|| fib(20)); | |
109 | /// println!("{}", n); | |
110 | /// } | |
111 | /// | |
112 | /// fn fib(n: usize) -> usize { | |
113 | /// if n == 0 || n == 1 { | |
114 | /// return n; | |
115 | /// } | |
116 | /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool` | |
117 | /// return a + b; | |
118 | /// } | |
119 | /// ``` | |
120 | pub fn install<OP, R>(&self, op: OP) -> R | |
121 | where OP: FnOnce() -> R + Send, | |
122 | R: Send | |
123 | { | |
124 | self.registry.in_worker(|_, _| op()) | |
125 | } | |
126 | ||
127 | /// Returns the (current) number of threads in the thread pool. | |
128 | /// | |
129 | /// # Future compatibility note | |
130 | /// | |
131 | /// Note that unless this thread-pool was created with a | |
132 | /// [`ThreadPoolBuilder`] that specifies the number of threads, | |
133 | /// then this number may vary over time in future versions (see [the | |
134 | /// `num_threads()` method for details][snt]). | |
135 | /// | |
136 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads | |
137 | /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html | |
138 | #[inline] | |
139 | pub fn current_num_threads(&self) -> usize { | |
140 | self.registry.num_threads() | |
141 | } | |
142 | ||
143 | /// If called from a Rayon worker thread in this thread-pool, | |
144 | /// returns the index of that thread; if not called from a Rayon | |
145 | /// thread, or called from a Rayon thread that belongs to a | |
146 | /// different thread-pool, returns `None`. | |
147 | /// | |
148 | /// The index for a given thread will not change over the thread's | |
149 | /// lifetime. However, multiple threads may share the same index if | |
150 | /// they are in distinct thread-pools. | |
151 | /// | |
152 | /// # Future compatibility note | |
153 | /// | |
154 | /// Currently, every thread-pool (including the global | |
155 | /// thread-pool) has a fixed number of threads, but this may | |
156 | /// change in future Rayon versions (see [the `num_threads()` method | |
157 | /// for details][snt]). In that case, the index for a | |
158 | /// thread would not change during its lifetime, but thread | |
159 | /// indices may wind up being reused if threads are terminated and | |
160 | /// restarted. | |
161 | /// | |
162 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads | |
163 | #[inline] | |
164 | pub fn current_thread_index(&self) -> Option<usize> { | |
165 | unsafe { | |
166 | let curr = WorkerThread::current(); | |
167 | if curr.is_null() { | |
168 | None | |
169 | } else if (*curr).registry().id() != self.registry.id() { | |
170 | None | |
171 | } else { | |
172 | Some((*curr).index()) | |
173 | } | |
174 | } | |
175 | } | |
176 | ||
177 | /// Returns true if the current worker thread currently has "local | |
178 | /// tasks" pending. This can be useful as part of a heuristic for | |
179 | /// deciding whether to spawn a new task or execute code on the | |
180 | /// current thread, particularly in breadth-first | |
181 | /// schedulers. However, keep in mind that this is an inherently | |
182 | /// racy check, as other worker threads may be actively "stealing" | |
183 | /// tasks from our local deque. | |
184 | /// | |
185 | /// **Background:** Rayon's uses a [work-stealing] scheduler. The | |
186 | /// key idea is that each thread has its own [deque] of | |
187 | /// tasks. Whenever a new task is spawned -- whether through | |
188 | /// `join()`, `Scope::spawn()`, or some other means -- that new | |
189 | /// task is pushed onto the thread's *local* deque. Worker threads | |
190 | /// have a preference for executing their own tasks; if however | |
191 | /// they run out of tasks, they will go try to "steal" tasks from | |
192 | /// other threads. This function therefore has an inherent race | |
193 | /// with other active worker threads, which may be removing items | |
194 | /// from the local deque. | |
195 | /// | |
196 | /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing | |
197 | /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue | |
198 | #[inline] | |
199 | pub fn current_thread_has_pending_tasks(&self) -> Option<bool> { | |
200 | unsafe { | |
201 | let curr = WorkerThread::current(); | |
202 | if curr.is_null() { | |
203 | None | |
204 | } else if (*curr).registry().id() != self.registry.id() { | |
205 | None | |
206 | } else { | |
207 | Some(!(*curr).local_deque_is_empty()) | |
208 | } | |
209 | } | |
210 | } | |
211 | ||
212 | /// Execute `oper_a` and `oper_b` in the thread-pool and return | |
213 | /// the results. Equivalent to `self.install(|| join(oper_a, | |
214 | /// oper_b))`. | |
215 | pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB) | |
216 | where A: FnOnce() -> RA + Send, | |
217 | B: FnOnce() -> RB + Send, | |
218 | RA: Send, | |
219 | RB: Send | |
220 | { | |
221 | self.install(|| join(oper_a, oper_b)) | |
222 | } | |
223 | ||
224 | /// Creates a scope that executes within this thread-pool. | |
225 | /// Equivalent to `self.install(|| scope(...))`. | |
226 | /// | |
227 | /// See also: [the `scope()` function][scope]. | |
228 | /// | |
229 | /// [scope]: fn.scope.html | |
230 | pub fn scope<'scope, OP, R>(&self, op: OP) -> R | |
231 | where OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, R: Send | |
232 | { | |
233 | self.install(|| scope(op)) | |
234 | } | |
235 | ||
236 | /// Spawns an asynchronous task in this thread-pool. This task will | |
237 | /// run in the implicit, global scope, which means that it may outlast | |
238 | /// the current stack frame -- therefore, it cannot capture any references | |
239 | /// onto the stack (you will likely need a `move` closure). | |
240 | /// | |
241 | /// See also: [the `spawn()` function defined on scopes][spawn]. | |
242 | /// | |
243 | /// [spawn]: struct.Scope.html#method.spawn | |
244 | pub fn spawn<OP>(&self, op: OP) | |
245 | where OP: FnOnce() + Send + 'static | |
246 | { | |
247 | // We assert that `self.registry` has not terminated. | |
248 | unsafe { spawn::spawn_in(op, &self.registry) } | |
249 | } | |
250 | } | |
251 | ||
252 | impl Drop for ThreadPool { | |
253 | fn drop(&mut self) { | |
254 | self.registry.terminate(); | |
255 | } | |
256 | } | |
257 | ||
258 | impl fmt::Debug for ThreadPool { | |
259 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | |
260 | fmt.debug_struct("ThreadPool") | |
261 | .field("num_threads", &self.current_num_threads()) | |
262 | .field("id", &self.registry.id()) | |
263 | .finish() | |
264 | } | |
265 | } | |
266 | ||
267 | /// If called from a Rayon worker thread, returns the index of that | |
268 | /// thread within its current pool; if not called from a Rayon thread, | |
269 | /// returns `None`. | |
270 | /// | |
271 | /// The index for a given thread will not change over the thread's | |
272 | /// lifetime. However, multiple threads may share the same index if | |
273 | /// they are in distinct thread-pools. | |
274 | /// | |
275 | /// See also: [the `ThreadPool::current_thread_index()` method]. | |
276 | /// | |
277 | /// [m]: struct.ThreadPool.html#method.current_thread_index | |
278 | /// | |
279 | /// # Future compatibility note | |
280 | /// | |
281 | /// Currently, every thread-pool (including the global | |
282 | /// thread-pool) has a fixed number of threads, but this may | |
283 | /// change in future Rayon versions (see [the `num_threads()` method | |
284 | /// for details][snt]). In that case, the index for a | |
285 | /// thread would not change during its lifetime, but thread | |
286 | /// indices may wind up being reused if threads are terminated and | |
287 | /// restarted. | |
288 | /// | |
289 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads | |
290 | #[inline] | |
291 | pub fn current_thread_index() -> Option<usize> { | |
292 | unsafe { | |
293 | let curr = WorkerThread::current(); | |
294 | if curr.is_null() { | |
295 | None | |
296 | } else { | |
297 | Some((*curr).index()) | |
298 | } | |
299 | } | |
300 | } | |
301 | ||
302 | /// If called from a Rayon worker thread, indicates whether that | |
303 | /// thread's local deque still has pending tasks. Otherwise, returns | |
304 | /// `None`. For more information, see [the | |
305 | /// `ThreadPool::current_thread_has_pending_tasks()` method][m]. | |
306 | /// | |
307 | /// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks | |
308 | #[inline] | |
309 | pub fn current_thread_has_pending_tasks() -> Option<bool> { | |
310 | unsafe { | |
311 | let curr = WorkerThread::current(); | |
312 | if curr.is_null() { | |
313 | None | |
314 | } else { | |
315 | Some(!(*curr).local_deque_is_empty()) | |
316 | } | |
317 | } | |
318 | } |