]>
Commit | Line | Data |
---|---|---|
94b46f34 XL |
1 | //! |
2 | //! [Under construction](https://github.com/rayon-rs/rayon/issues/231) | |
3 | //! | |
4 | //! ## Restricting multiple versions | |
5 | //! | |
6 | //! In order to ensure proper coordination between threadpools, and especially | |
7 | //! to make sure there's only one global threadpool, `rayon-core` is actively | |
8 | //! restricted from building multiple versions of itself into a single target. | |
9 | //! You may see a build error like this in violation: | |
10 | //! | |
11 | //! ```text | |
12 | //! error: native library `rayon-core` is being linked to by more | |
13 | //! than one package, and can only be linked to by one package | |
14 | //! ``` | |
15 | //! | |
16 | //! While we strive to keep `rayon-core` semver-compatible, it's still | |
17 | //! possible to arrive at this situation if different crates have overly | |
18 | //! restrictive tilde or inequality requirements for `rayon-core`. The | |
19 | //! conflicting requirements will need to be resolved before the build will | |
20 | //! succeed. | |
21 | ||
6a06907d | 22 | #![doc(html_root_url = "https://docs.rs/rayon-core/1.7")] |
94b46f34 XL |
23 | |
24 | use std::any::Any; | |
25 | use std::env; | |
94b46f34 | 26 | use std::error::Error; |
532ac7d7 XL |
27 | use std::fmt; |
28 | use std::io; | |
94b46f34 XL |
29 | use std::marker::PhantomData; |
30 | use std::str::FromStr; | |
94b46f34 | 31 | |
94b46f34 XL |
32 | #[macro_use] |
33 | mod log; | |
e74abb32 XL |
34 | #[macro_use] |
35 | mod private; | |
94b46f34 | 36 | |
94b46f34 | 37 | mod job; |
532ac7d7 XL |
38 | mod join; |
39 | mod latch; | |
94b46f34 XL |
40 | mod registry; |
41 | mod scope; | |
42 | mod sleep; | |
43 | mod spawn; | |
94b46f34 XL |
44 | mod thread_pool; |
45 | mod unwind; | |
46 | mod util; | |
e74abb32 | 47 | mod worker_local; |
94b46f34 | 48 | |
532ac7d7 XL |
49 | mod compile_fail; |
50 | mod test; | |
51 | ||
94b46f34 XL |
52 | pub mod tlv; |
53 | ||
6a06907d XL |
54 | pub use self::join::{join, join_context}; |
55 | pub use self::registry::ThreadBuilder; | |
56 | pub use self::registry::{mark_blocked, mark_unblocked, Registry}; | |
57 | pub use self::scope::{scope, Scope}; | |
58 | pub use self::scope::{scope_fifo, ScopeFifo}; | |
59 | pub use self::spawn::{spawn, spawn_fifo}; | |
60 | pub use self::thread_pool::current_thread_has_pending_tasks; | |
61 | pub use self::thread_pool::current_thread_index; | |
62 | pub use self::thread_pool::ThreadPool; | |
e74abb32 XL |
63 | pub use worker_local::WorkerLocal; |
64 | ||
6a06907d | 65 | use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; |
532ac7d7 | 66 | |
94b46f34 XL |
67 | /// Returns the number of threads in the current registry. If this |
68 | /// code is executing within a Rayon thread-pool, then this will be | |
69 | /// the number of threads for the thread-pool of the current | |
70 | /// thread. Otherwise, it will be the number of threads for the global | |
71 | /// thread-pool. | |
72 | /// | |
73 | /// This can be useful when trying to judge how many times to split | |
74 | /// parallel work (the parallel iterator traits use this value | |
75 | /// internally for this purpose). | |
76 | /// | |
77 | /// # Future compatibility note | |
78 | /// | |
79 | /// Note that unless this thread-pool was created with a | |
80 | /// builder that specifies the number of threads, then this | |
81 | /// number may vary over time in future versions (see [the | |
82 | /// `num_threads()` method for details][snt]). | |
83 | /// | |
84 | /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads | |
85 | pub fn current_num_threads() -> usize { | |
6a06907d | 86 | crate::registry::Registry::current_num_threads() |
94b46f34 XL |
87 | } |
88 | ||
89 | /// Error when initializing a thread pool. | |
90 | #[derive(Debug)] | |
91 | pub struct ThreadPoolBuildError { | |
92 | kind: ErrorKind, | |
93 | } | |
94 | ||
95 | #[derive(Debug)] | |
96 | enum ErrorKind { | |
97 | GlobalPoolAlreadyInitialized, | |
98 | IOError(io::Error), | |
99 | } | |
100 | ||
101 | /// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool. | |
102 | /// ## Creating a ThreadPool | |
103 | /// The following creates a thread pool with 22 threads. | |
104 | /// | |
105 | /// ```rust | |
106 | /// # use rayon_core as rayon; | |
107 | /// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap(); | |
108 | /// ``` | |
109 | /// | |
110 | /// To instead configure the global thread pool, use [`build_global()`]: | |
111 | /// | |
112 | /// ```rust | |
113 | /// # use rayon_core as rayon; | |
114 | /// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap(); | |
115 | /// ``` | |
116 | /// | |
117 | /// [`ThreadPool`]: struct.ThreadPool.html | |
118 | /// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global | |
e74abb32 | 119 | pub struct ThreadPoolBuilder<S = DefaultSpawn> { |
94b46f34 XL |
120 | /// The number of threads in the rayon thread pool. |
121 | /// If zero will use the RAYON_NUM_THREADS environment variable. | |
122 | /// If RAYON_NUM_THREADS is invalid or zero will use the default. | |
123 | num_threads: usize, | |
124 | ||
125 | /// Custom closure, if any, to handle a panic that we cannot propagate | |
126 | /// anywhere else. | |
127 | panic_handler: Option<Box<PanicHandler>>, | |
128 | ||
129 | /// Closure to compute the name of a thread. | |
e74abb32 | 130 | get_thread_name: Option<Box<dyn FnMut(usize) -> String>>, |
94b46f34 XL |
131 | |
132 | /// The stack size for the created worker threads | |
133 | stack_size: Option<usize>, | |
134 | ||
135 | /// Closure invoked on deadlock. | |
136 | deadlock_handler: Option<Box<DeadlockHandler>>, | |
137 | ||
138 | /// Closure invoked on worker thread start. | |
139 | start_handler: Option<Box<StartHandler>>, | |
140 | ||
141 | /// Closure invoked on worker thread exit. | |
142 | exit_handler: Option<Box<ExitHandler>>, | |
143 | ||
e74abb32 XL |
144 | /// Closure invoked to spawn threads. |
145 | spawn_handler: S, | |
94b46f34 | 146 | |
532ac7d7 XL |
147 | /// Closure invoked when starting computations in a thread. |
148 | acquire_thread_handler: Option<Box<AcquireThreadHandler>>, | |
149 | ||
150 | /// Closure invoked when blocking in a thread. | |
151 | release_thread_handler: Option<Box<ReleaseThreadHandler>>, | |
152 | ||
94b46f34 XL |
153 | /// If false, worker threads will execute spawned jobs in a |
154 | /// "depth-first" fashion. If true, they will do a "breadth-first" | |
155 | /// fashion. Depth-first is the default. | |
156 | breadth_first: bool, | |
157 | } | |
158 | ||
159 | /// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead. | |
160 | /// | |
161 | /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html | |
162 | #[deprecated(note = "Use `ThreadPoolBuilder`")] | |
94b46f34 XL |
163 | pub struct Configuration { |
164 | builder: ThreadPoolBuilder, | |
165 | } | |
166 | ||
167 | /// The type for a panic handling closure. Note that this same closure | |
168 | /// may be invoked multiple times in parallel. | |
e74abb32 | 169 | type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync; |
94b46f34 XL |
170 | |
171 | /// The type for a closure that gets invoked when the Rayon thread pool deadlocks | |
e74abb32 | 172 | type DeadlockHandler = dyn Fn() + Send + Sync; |
94b46f34 XL |
173 | |
174 | /// The type for a closure that gets invoked when a thread starts. The | |
175 | /// closure is passed the index of the thread on which it is invoked. | |
176 | /// Note that this same closure may be invoked multiple times in parallel. | |
e74abb32 | 177 | type StartHandler = dyn Fn(usize) + Send + Sync; |
94b46f34 XL |
178 | |
179 | /// The type for a closure that gets invoked when a thread exits. The | |
180 | /// closure is passed the index of the thread on which is is invoked. | |
181 | /// Note that this same closure may be invoked multiple times in parallel. | |
e74abb32 XL |
182 | type ExitHandler = dyn Fn(usize) + Send + Sync; |
183 | ||
184 | // NB: We can't `#[derive(Default)]` because `S` is left ambiguous. | |
185 | impl Default for ThreadPoolBuilder { | |
186 | fn default() -> Self { | |
187 | ThreadPoolBuilder { | |
188 | num_threads: 0, | |
189 | panic_handler: None, | |
190 | get_thread_name: None, | |
191 | stack_size: None, | |
192 | start_handler: None, | |
193 | exit_handler: None, | |
194 | deadlock_handler: None, | |
195 | acquire_thread_handler: None, | |
196 | release_thread_handler: None, | |
197 | spawn_handler: DefaultSpawn, | |
198 | breadth_first: false, | |
199 | } | |
200 | } | |
201 | } | |
94b46f34 | 202 | |
532ac7d7 XL |
203 | /// The type for a closure that gets invoked before starting computations in a thread. |
204 | /// Note that this same closure may be invoked multiple times in parallel. | |
e74abb32 | 205 | type AcquireThreadHandler = dyn Fn() + Send + Sync; |
532ac7d7 XL |
206 | |
207 | /// The type for a closure that gets invoked before blocking in a thread. | |
208 | /// Note that this same closure may be invoked multiple times in parallel. | |
e74abb32 | 209 | type ReleaseThreadHandler = dyn Fn() + Send + Sync; |
532ac7d7 | 210 | |
94b46f34 XL |
211 | impl ThreadPoolBuilder { |
212 | /// Creates and returns a valid rayon thread pool builder, but does not initialize it. | |
e74abb32 XL |
213 | pub fn new() -> Self { |
214 | Self::default() | |
94b46f34 | 215 | } |
e74abb32 | 216 | } |
94b46f34 | 217 | |
e74abb32 XL |
218 | /// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the |
219 | /// default spawn and those set by [`spawn_handler`](#method.spawn_handler). | |
220 | impl<S> ThreadPoolBuilder<S> | |
221 | where | |
222 | S: ThreadSpawn, | |
223 | { | |
94b46f34 XL |
224 | /// Create a new `ThreadPool` initialized using this configuration. |
225 | pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> { | |
e74abb32 | 226 | ThreadPool::build(self) |
94b46f34 XL |
227 | } |
228 | ||
229 | /// Initializes the global thread pool. This initialization is | |
230 | /// **optional**. If you do not call this function, the thread pool | |
231 | /// will be automatically initialized with the default | |
232 | /// configuration. Calling `build_global` is not recommended, except | |
233 | /// in two scenarios: | |
234 | /// | |
235 | /// - You wish to change the default configuration. | |
236 | /// - You are running a benchmark, in which case initializing may | |
237 | /// yield slightly more consistent results, since the worker threads | |
238 | /// will already be ready to go even in the first iteration. But | |
239 | /// this cost is minimal. | |
240 | /// | |
241 | /// Initialization of the global thread pool happens exactly | |
242 | /// once. Once started, the configuration cannot be | |
243 | /// changed. Therefore, if you call `build_global` a second time, it | |
244 | /// will return an error. An `Ok` result indicates that this | |
245 | /// is the first initialization of the thread pool. | |
246 | pub fn build_global(self) -> Result<(), ThreadPoolBuildError> { | |
e74abb32 | 247 | let registry = registry::init_global_registry(self)?; |
94b46f34 XL |
248 | registry.wait_until_primed(); |
249 | Ok(()) | |
250 | } | |
e74abb32 XL |
251 | } |
252 | ||
253 | impl ThreadPoolBuilder { | |
254 | /// Create a scoped `ThreadPool` initialized using this configuration. | |
255 | /// | |
256 | /// This is a convenience function for building a pool using [`crossbeam::scope`] | |
257 | /// to spawn threads in a [`spawn_handler`](#method.spawn_handler). | |
258 | /// The threads in this pool will start by calling `wrapper`, which should | |
259 | /// do initialization and continue by calling `ThreadBuilder::run()`. | |
260 | /// | |
261 | /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html | |
262 | /// | |
263 | /// # Examples | |
264 | /// | |
265 | /// A scoped pool may be useful in combination with scoped thread-local variables. | |
266 | /// | |
267 | /// ``` | |
e74abb32 XL |
268 | /// # use rayon_core as rayon; |
269 | /// | |
6a06907d | 270 | /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>); |
e74abb32 XL |
271 | /// |
272 | /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { | |
273 | /// let pool_data = vec![1, 2, 3]; | |
274 | /// | |
275 | /// // We haven't assigned any TLS data yet. | |
276 | /// assert!(!POOL_DATA.is_set()); | |
277 | /// | |
278 | /// rayon::ThreadPoolBuilder::new() | |
279 | /// .build_scoped( | |
280 | /// // Borrow `pool_data` in TLS for each thread. | |
281 | /// |thread| POOL_DATA.set(&pool_data, || thread.run()), | |
282 | /// // Do some work that needs the TLS data. | |
283 | /// |pool| pool.install(|| assert!(POOL_DATA.is_set())), | |
284 | /// )?; | |
285 | /// | |
286 | /// // Once we've returned, `pool_data` is no longer borrowed. | |
287 | /// drop(pool_data); | |
288 | /// Ok(()) | |
289 | /// } | |
290 | /// ``` | |
291 | pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError> | |
292 | where | |
293 | W: Fn(ThreadBuilder) + Sync, // expected to call `run()` | |
294 | F: FnOnce(&ThreadPool) -> R, | |
295 | { | |
296 | let result = crossbeam_utils::thread::scope(|scope| { | |
297 | let wrapper = &wrapper; | |
298 | let pool = self | |
299 | .spawn_handler(|thread| { | |
300 | let mut builder = scope.builder(); | |
301 | if let Some(name) = thread.name() { | |
302 | builder = builder.name(name.to_string()); | |
303 | } | |
304 | if let Some(size) = thread.stack_size() { | |
305 | builder = builder.stack_size(size); | |
306 | } | |
307 | builder.spawn(move |_| wrapper(thread))?; | |
308 | Ok(()) | |
309 | }) | |
310 | .build()?; | |
311 | let result = unwind::halt_unwinding(|| with_pool(&pool)); | |
312 | pool.wait_until_stopped(); | |
313 | match result { | |
314 | Ok(result) => Ok(result), | |
315 | Err(err) => unwind::resume_unwinding(err), | |
316 | } | |
317 | }); | |
318 | ||
319 | match result { | |
320 | Ok(result) => result, | |
321 | Err(err) => unwind::resume_unwinding(err), | |
322 | } | |
323 | } | |
324 | } | |
325 | ||
326 | impl<S> ThreadPoolBuilder<S> { | |
327 | /// Set a custom function for spawning threads. | |
328 | /// | |
329 | /// Note that the threads will not exit until after the pool is dropped. It | |
330 | /// is up to the caller to wait for thread termination if that is important | |
331 | /// for any invariants. For instance, threads created in [`crossbeam::scope`] | |
332 | /// will be joined before that scope returns, and this will block indefinitely | |
333 | /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate | |
334 | /// until the entire process exits! | |
335 | /// | |
336 | /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html | |
337 | /// | |
338 | /// # Examples | |
339 | /// | |
340 | /// A minimal spawn handler just needs to call `run()` from an independent thread. | |
341 | /// | |
342 | /// ``` | |
343 | /// # use rayon_core as rayon; | |
344 | /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { | |
345 | /// let pool = rayon::ThreadPoolBuilder::new() | |
346 | /// .spawn_handler(|thread| { | |
347 | /// std::thread::spawn(|| thread.run()); | |
348 | /// Ok(()) | |
349 | /// }) | |
350 | /// .build()?; | |
351 | /// | |
352 | /// pool.install(|| println!("Hello from my custom thread!")); | |
353 | /// Ok(()) | |
354 | /// } | |
355 | /// ``` | |
356 | /// | |
357 | /// The default spawn handler sets the name and stack size if given, and propagates | |
358 | /// any errors from the thread builder. | |
359 | /// | |
360 | /// ``` | |
361 | /// # use rayon_core as rayon; | |
362 | /// fn main() -> Result<(), rayon::ThreadPoolBuildError> { | |
363 | /// let pool = rayon::ThreadPoolBuilder::new() | |
364 | /// .spawn_handler(|thread| { | |
365 | /// let mut b = std::thread::Builder::new(); | |
366 | /// if let Some(name) = thread.name() { | |
367 | /// b = b.name(name.to_owned()); | |
368 | /// } | |
369 | /// if let Some(stack_size) = thread.stack_size() { | |
370 | /// b = b.stack_size(stack_size); | |
371 | /// } | |
372 | /// b.spawn(|| thread.run())?; | |
373 | /// Ok(()) | |
374 | /// }) | |
375 | /// .build()?; | |
376 | /// | |
377 | /// pool.install(|| println!("Hello from my fully custom thread!")); | |
378 | /// Ok(()) | |
379 | /// } | |
380 | /// ``` | |
381 | pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>> | |
382 | where | |
383 | F: FnMut(ThreadBuilder) -> io::Result<()>, | |
384 | { | |
385 | ThreadPoolBuilder { | |
386 | spawn_handler: CustomSpawn::new(spawn), | |
387 | // ..self | |
388 | num_threads: self.num_threads, | |
389 | panic_handler: self.panic_handler, | |
390 | get_thread_name: self.get_thread_name, | |
391 | stack_size: self.stack_size, | |
392 | start_handler: self.start_handler, | |
393 | exit_handler: self.exit_handler, | |
394 | deadlock_handler: self.deadlock_handler, | |
395 | acquire_thread_handler: self.acquire_thread_handler, | |
396 | release_thread_handler: self.release_thread_handler, | |
397 | breadth_first: self.breadth_first, | |
398 | } | |
399 | } | |
400 | ||
401 | /// Returns a reference to the current spawn handler. | |
402 | fn get_spawn_handler(&mut self) -> &mut S { | |
403 | &mut self.spawn_handler | |
404 | } | |
94b46f34 XL |
405 | |
406 | /// Get the number of threads that will be used for the thread | |
407 | /// pool. See `num_threads()` for more information. | |
408 | fn get_num_threads(&self) -> usize { | |
409 | if self.num_threads > 0 { | |
410 | self.num_threads | |
411 | } else { | |
532ac7d7 XL |
412 | match env::var("RAYON_NUM_THREADS") |
413 | .ok() | |
414 | .and_then(|s| usize::from_str(&s).ok()) | |
415 | { | |
94b46f34 XL |
416 | Some(x) if x > 0 => return x, |
417 | Some(x) if x == 0 => return num_cpus::get(), | |
532ac7d7 | 418 | _ => {} |
94b46f34 XL |
419 | } |
420 | ||
421 | // Support for deprecated `RAYON_RS_NUM_CPUS`. | |
532ac7d7 XL |
422 | match env::var("RAYON_RS_NUM_CPUS") |
423 | .ok() | |
424 | .and_then(|s| usize::from_str(&s).ok()) | |
425 | { | |
94b46f34 XL |
426 | Some(x) if x > 0 => x, |
427 | _ => num_cpus::get(), | |
428 | } | |
429 | } | |
430 | } | |
431 | ||
432 | /// Get the thread name for the thread with the given index. | |
433 | fn get_thread_name(&mut self, index: usize) -> Option<String> { | |
e74abb32 XL |
434 | let f = self.get_thread_name.as_mut()?; |
435 | Some(f(index)) | |
94b46f34 XL |
436 | } |
437 | ||
438 | /// Set a closure which takes a thread index and returns | |
439 | /// the thread's name. | |
440 | pub fn thread_name<F>(mut self, closure: F) -> Self | |
532ac7d7 XL |
441 | where |
442 | F: FnMut(usize) -> String + 'static, | |
443 | { | |
94b46f34 XL |
444 | self.get_thread_name = Some(Box::new(closure)); |
445 | self | |
446 | } | |
447 | ||
448 | /// Set the number of threads to be used in the rayon threadpool. | |
449 | /// | |
450 | /// If you specify a non-zero number of threads using this | |
451 | /// function, then the resulting thread-pools are guaranteed to | |
452 | /// start at most this number of threads. | |
453 | /// | |
454 | /// If `num_threads` is 0, or you do not call this function, then | |
455 | /// the Rayon runtime will select the number of threads | |
456 | /// automatically. At present, this is based on the | |
457 | /// `RAYON_NUM_THREADS` environment variable (if set), | |
458 | /// or the number of logical CPUs (otherwise). | |
459 | /// In the future, however, the default behavior may | |
460 | /// change to dynamically add or remove threads as needed. | |
461 | /// | |
462 | /// **Future compatibility warning:** Given the default behavior | |
463 | /// may change in the future, if you wish to rely on a fixed | |
464 | /// number of threads, you should use this function to specify | |
465 | /// that number. To reproduce the current default behavior, you | |
466 | /// may wish to use the [`num_cpus` | |
467 | /// crate](https://crates.io/crates/num_cpus) to query the number | |
468 | /// of CPUs dynamically. | |
469 | /// | |
470 | /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one | |
471 | /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment | |
472 | /// variable. If both variables are specified, `RAYON_NUM_THREADS` will | |
473 | /// be prefered. | |
e74abb32 | 474 | pub fn num_threads(mut self, num_threads: usize) -> Self { |
94b46f34 XL |
475 | self.num_threads = num_threads; |
476 | self | |
477 | } | |
478 | ||
479 | /// Returns a copy of the current panic handler. | |
480 | fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> { | |
481 | self.panic_handler.take() | |
482 | } | |
483 | ||
484 | /// Normally, whenever Rayon catches a panic, it tries to | |
485 | /// propagate it to someplace sensible, to try and reflect the | |
486 | /// semantics of sequential execution. But in some cases, | |
487 | /// particularly with the `spawn()` APIs, there is no | |
488 | /// obvious place where we should propagate the panic to. | |
489 | /// In that case, this panic handler is invoked. | |
490 | /// | |
491 | /// If no panic handler is set, the default is to abort the | |
492 | /// process, under the principle that panics should not go | |
493 | /// unobserved. | |
494 | /// | |
495 | /// If the panic handler itself panics, this will abort the | |
496 | /// process. To prevent this, wrap the body of your panic handler | |
497 | /// in a call to `std::panic::catch_unwind()`. | |
e74abb32 | 498 | pub fn panic_handler<H>(mut self, panic_handler: H) -> Self |
532ac7d7 | 499 | where |
e74abb32 | 500 | H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static, |
94b46f34 XL |
501 | { |
502 | self.panic_handler = Some(Box::new(panic_handler)); | |
503 | self | |
504 | } | |
505 | ||
506 | /// Get the stack size of the worker threads | |
532ac7d7 | 507 | fn get_stack_size(&self) -> Option<usize> { |
94b46f34 XL |
508 | self.stack_size |
509 | } | |
510 | ||
511 | /// Set the stack size of the worker threads | |
512 | pub fn stack_size(mut self, stack_size: usize) -> Self { | |
513 | self.stack_size = Some(stack_size); | |
514 | self | |
515 | } | |
516 | ||
e74abb32 XL |
517 | /// **(DEPRECATED)** Suggest to worker threads that they execute |
518 | /// spawned jobs in a "breadth-first" fashion. | |
519 | /// | |
520 | /// Typically, when a worker thread is idle or blocked, it will | |
521 | /// attempt to execute the job from the *top* of its local deque of | |
522 | /// work (i.e., the job most recently spawned). If this flag is set | |
523 | /// to true, however, workers will prefer to execute in a | |
524 | /// *breadth-first* fashion -- that is, they will search for jobs at | |
525 | /// the *bottom* of their local deque. (At present, workers *always* | |
526 | /// steal from the bottom of other worker's deques, regardless of | |
527 | /// the setting of this flag.) | |
94b46f34 XL |
528 | /// |
529 | /// If you think of the tasks as a tree, where a parent task | |
530 | /// spawns its children in the tree, then this flag loosely | |
531 | /// corresponds to doing a breadth-first traversal of the tree, | |
532 | /// whereas the default would be to do a depth-first traversal. | |
533 | /// | |
534 | /// **Note that this is an "execution hint".** Rayon's task | |
535 | /// execution is highly dynamic and the precise order in which | |
536 | /// independent tasks are executed is not intended to be | |
537 | /// guaranteed. | |
e74abb32 XL |
538 | /// |
539 | /// This `breadth_first()` method is now deprecated per [RFC #1], | |
540 | /// and in the future its effect may be removed. Consider using | |
541 | /// [`scope_fifo()`] for a similar effect. | |
542 | /// | |
543 | /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md | |
544 | /// [`scope_fifo()`]: fn.scope_fifo.html | |
545 | #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")] | |
94b46f34 XL |
546 | pub fn breadth_first(mut self) -> Self { |
547 | self.breadth_first = true; | |
548 | self | |
549 | } | |
550 | ||
551 | fn get_breadth_first(&self) -> bool { | |
552 | self.breadth_first | |
553 | } | |
554 | ||
532ac7d7 XL |
555 | /// Takes the current acquire thread callback, leaving `None`. |
556 | fn take_acquire_thread_handler(&mut self) -> Option<Box<AcquireThreadHandler>> { | |
557 | self.acquire_thread_handler.take() | |
558 | } | |
559 | ||
560 | /// Set a callback to be invoked when starting computations in a thread. | |
e74abb32 XL |
561 | pub fn acquire_thread_handler<H>(mut self, acquire_thread_handler: H) -> Self |
562 | where | |
563 | H: Fn() + Send + Sync + 'static, | |
532ac7d7 XL |
564 | { |
565 | self.acquire_thread_handler = Some(Box::new(acquire_thread_handler)); | |
566 | self | |
567 | } | |
568 | ||
569 | /// Takes the current release thread callback, leaving `None`. | |
570 | fn take_release_thread_handler(&mut self) -> Option<Box<ReleaseThreadHandler>> { | |
571 | self.release_thread_handler.take() | |
572 | } | |
573 | ||
574 | /// Set a callback to be invoked when blocking in thread. | |
e74abb32 XL |
575 | pub fn release_thread_handler<H>(mut self, release_thread_handler: H) -> Self |
576 | where | |
577 | H: Fn() + Send + Sync + 'static, | |
532ac7d7 XL |
578 | { |
579 | self.release_thread_handler = Some(Box::new(release_thread_handler)); | |
580 | self | |
581 | } | |
582 | ||
94b46f34 XL |
583 | /// Takes the current deadlock callback, leaving `None`. |
584 | fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> { | |
585 | self.deadlock_handler.take() | |
586 | } | |
587 | ||
588 | /// Set a callback to be invoked on current deadlock. | |
e74abb32 XL |
589 | pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> Self |
590 | where | |
591 | H: Fn() + Send + Sync + 'static, | |
94b46f34 XL |
592 | { |
593 | self.deadlock_handler = Some(Box::new(deadlock_handler)); | |
594 | self | |
595 | } | |
596 | ||
597 | /// Takes the current thread start callback, leaving `None`. | |
598 | fn take_start_handler(&mut self) -> Option<Box<StartHandler>> { | |
599 | self.start_handler.take() | |
600 | } | |
601 | ||
602 | /// Set a callback to be invoked on thread start. | |
603 | /// | |
604 | /// The closure is passed the index of the thread on which it is invoked. | |
605 | /// Note that this same closure may be invoked multiple times in parallel. | |
606 | /// If this closure panics, the panic will be passed to the panic handler. | |
607 | /// If that handler returns, then startup will continue normally. | |
e74abb32 | 608 | pub fn start_handler<H>(mut self, start_handler: H) -> Self |
532ac7d7 XL |
609 | where |
610 | H: Fn(usize) + Send + Sync + 'static, | |
94b46f34 XL |
611 | { |
612 | self.start_handler = Some(Box::new(start_handler)); | |
613 | self | |
614 | } | |
615 | ||
616 | /// Returns a current thread exit callback, leaving `None`. | |
617 | fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> { | |
618 | self.exit_handler.take() | |
619 | } | |
620 | ||
621 | /// Set a callback to be invoked on thread exit. | |
622 | /// | |
623 | /// The closure is passed the index of the thread on which it is invoked. | |
624 | /// Note that this same closure may be invoked multiple times in parallel. | |
625 | /// If this closure panics, the panic will be passed to the panic handler. | |
626 | /// If that handler returns, then the thread will exit normally. | |
e74abb32 | 627 | pub fn exit_handler<H>(mut self, exit_handler: H) -> Self |
532ac7d7 XL |
628 | where |
629 | H: Fn(usize) + Send + Sync + 'static, | |
94b46f34 XL |
630 | { |
631 | self.exit_handler = Some(Box::new(exit_handler)); | |
632 | self | |
633 | } | |
94b46f34 XL |
634 | } |
635 | ||
636 | #[allow(deprecated)] | |
637 | impl Configuration { | |
638 | /// Creates and return a valid rayon thread pool configuration, but does not initialize it. | |
639 | pub fn new() -> Configuration { | |
532ac7d7 XL |
640 | Configuration { |
641 | builder: ThreadPoolBuilder::new(), | |
642 | } | |
94b46f34 XL |
643 | } |
644 | ||
645 | /// Deprecated in favor of `ThreadPoolBuilder::build`. | |
e74abb32 XL |
646 | pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> { |
647 | self.builder.build().map_err(Box::from) | |
94b46f34 XL |
648 | } |
649 | ||
650 | /// Deprecated in favor of `ThreadPoolBuilder::thread_name`. | |
651 | pub fn thread_name<F>(mut self, closure: F) -> Self | |
532ac7d7 XL |
652 | where |
653 | F: FnMut(usize) -> String + 'static, | |
654 | { | |
94b46f34 | 655 | self.builder = self.builder.thread_name(closure); |
532ac7d7 | 656 | self |
94b46f34 XL |
657 | } |
658 | ||
659 | /// Deprecated in favor of `ThreadPoolBuilder::num_threads`. | |
660 | pub fn num_threads(mut self, num_threads: usize) -> Configuration { | |
661 | self.builder = self.builder.num_threads(num_threads); | |
662 | self | |
663 | } | |
664 | ||
665 | /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`. | |
666 | pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration | |
532ac7d7 | 667 | where |
e74abb32 | 668 | H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static, |
94b46f34 XL |
669 | { |
670 | self.builder = self.builder.panic_handler(panic_handler); | |
671 | self | |
672 | } | |
673 | ||
674 | /// Deprecated in favor of `ThreadPoolBuilder::stack_size`. | |
675 | pub fn stack_size(mut self, stack_size: usize) -> Self { | |
676 | self.builder = self.builder.stack_size(stack_size); | |
677 | self | |
678 | } | |
679 | ||
680 | /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`. | |
681 | pub fn breadth_first(mut self) -> Self { | |
682 | self.builder = self.builder.breadth_first(); | |
683 | self | |
684 | } | |
685 | ||
686 | /// Deprecated in favor of `ThreadPoolBuilder::start_handler`. | |
687 | pub fn start_handler<H>(mut self, start_handler: H) -> Configuration | |
532ac7d7 XL |
688 | where |
689 | H: Fn(usize) + Send + Sync + 'static, | |
94b46f34 XL |
690 | { |
691 | self.builder = self.builder.start_handler(start_handler); | |
692 | self | |
693 | } | |
694 | ||
695 | /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`. | |
696 | pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration | |
532ac7d7 XL |
697 | where |
698 | H: Fn(usize) + Send + Sync + 'static, | |
94b46f34 XL |
699 | { |
700 | self.builder = self.builder.exit_handler(exit_handler); | |
701 | self | |
702 | } | |
703 | ||
704 | /// Returns a ThreadPoolBuilder with identical parameters. | |
705 | fn into_builder(self) -> ThreadPoolBuilder { | |
706 | self.builder | |
707 | } | |
708 | } | |
709 | ||
710 | impl ThreadPoolBuildError { | |
711 | fn new(kind: ErrorKind) -> ThreadPoolBuildError { | |
e74abb32 | 712 | ThreadPoolBuildError { kind } |
94b46f34 XL |
713 | } |
714 | } | |
715 | ||
716 | impl Error for ThreadPoolBuildError { | |
717 | fn description(&self) -> &str { | |
718 | match self.kind { | |
532ac7d7 XL |
719 | ErrorKind::GlobalPoolAlreadyInitialized => { |
720 | "The global thread pool has already been initialized." | |
721 | } | |
94b46f34 XL |
722 | ErrorKind::IOError(ref e) => e.description(), |
723 | } | |
724 | } | |
725 | } | |
726 | ||
727 | impl fmt::Display for ThreadPoolBuildError { | |
6a06907d | 728 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
94b46f34 XL |
729 | match self.kind { |
730 | ErrorKind::IOError(ref e) => e.fmt(f), | |
731 | _ => self.description().fmt(f), | |
732 | } | |
733 | } | |
734 | } | |
735 | ||
736 | /// Deprecated in favor of `ThreadPoolBuilder::build_global`. | |
737 | #[deprecated(note = "use `ThreadPoolBuilder::build_global`")] | |
738 | #[allow(deprecated)] | |
e74abb32 XL |
739 | pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> { |
740 | config.into_builder().build_global().map_err(Box::from) | |
94b46f34 XL |
741 | } |
742 | ||
e74abb32 | 743 | impl<S> fmt::Debug for ThreadPoolBuilder<S> { |
6a06907d | 744 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
532ac7d7 XL |
745 | let ThreadPoolBuilder { |
746 | ref num_threads, | |
747 | ref get_thread_name, | |
748 | ref panic_handler, | |
749 | ref stack_size, | |
750 | ref deadlock_handler, | |
751 | ref start_handler, | |
532ac7d7 XL |
752 | ref exit_handler, |
753 | ref acquire_thread_handler, | |
754 | ref release_thread_handler, | |
e74abb32 | 755 | spawn_handler: _, |
532ac7d7 XL |
756 | ref breadth_first, |
757 | } = *self; | |
94b46f34 XL |
758 | |
759 | // Just print `Some(<closure>)` or `None` to the debug | |
760 | // output. | |
761 | struct ClosurePlaceholder; | |
762 | impl fmt::Debug for ClosurePlaceholder { | |
6a06907d | 763 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
94b46f34 XL |
764 | f.write_str("<closure>") |
765 | } | |
766 | } | |
767 | let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder); | |
768 | let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); | |
769 | let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder); | |
770 | let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); | |
771 | let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); | |
532ac7d7 XL |
772 | let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder); |
773 | let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder); | |
94b46f34 XL |
774 | |
775 | f.debug_struct("ThreadPoolBuilder") | |
532ac7d7 XL |
776 | .field("num_threads", num_threads) |
777 | .field("get_thread_name", &get_thread_name) | |
778 | .field("panic_handler", &panic_handler) | |
779 | .field("stack_size", &stack_size) | |
780 | .field("deadlock_handler", &deadlock_handler) | |
781 | .field("start_handler", &start_handler) | |
782 | .field("exit_handler", &exit_handler) | |
532ac7d7 XL |
783 | .field("acquire_thread_handler", &acquire_thread_handler) |
784 | .field("release_thread_handler", &release_thread_handler) | |
785 | .field("breadth_first", &breadth_first) | |
786 | .finish() | |
94b46f34 XL |
787 | } |
788 | } | |
789 | ||
e74abb32 XL |
790 | #[allow(deprecated)] |
791 | impl Default for Configuration { | |
792 | fn default() -> Self { | |
793 | Configuration { | |
794 | builder: Default::default(), | |
795 | } | |
796 | } | |
797 | } | |
798 | ||
94b46f34 XL |
799 | #[allow(deprecated)] |
800 | impl fmt::Debug for Configuration { | |
6a06907d | 801 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
94b46f34 XL |
802 | self.builder.fmt(f) |
803 | } | |
804 | } | |
805 | ||
806 | /// Provides the calling context to a closure called by `join_context`. | |
807 | #[derive(Debug)] | |
808 | pub struct FnContext { | |
809 | migrated: bool, | |
810 | ||
811 | /// disable `Send` and `Sync`, just for a little future-proofing. | |
812 | _marker: PhantomData<*mut ()>, | |
813 | } | |
814 | ||
815 | impl FnContext { | |
816 | #[inline] | |
817 | fn new(migrated: bool) -> Self { | |
818 | FnContext { | |
e74abb32 | 819 | migrated, |
94b46f34 XL |
820 | _marker: PhantomData, |
821 | } | |
822 | } | |
823 | } | |
824 | ||
825 | impl FnContext { | |
826 | /// Returns `true` if the closure was called from a different thread | |
827 | /// than it was provided from. | |
828 | #[inline] | |
829 | pub fn migrated(&self) -> bool { | |
830 | self.migrated | |
831 | } | |
832 | } |