2 //! [Under construction](https://github.com/rayon-rs/rayon/issues/231)
4 //! ## Restricting multiple versions
6 //! In order to ensure proper coordination between threadpools, and especially
7 //! to make sure there's only one global threadpool, `rayon-core` is actively
8 //! restricted from building multiple versions of itself into a single target.
9 //! You may see a build error like this in violation:
12 //! error: native library `rayon-core` is being linked to by more
13 //! than one package, and can only be linked to by one package
16 //! While we strive to keep `rayon-core` semver-compatible, it's still
17 //! possible to arrive at this situation if different crates have overly
18 //! restrictive tilde or inequality requirements for `rayon-core`. The
19 //! conflicting requirements will need to be resolved before the build will
22 #![doc(html_root_url = "https://docs.rs/rayon-core/1.9")]
23 #![deny(missing_debug_implementations)]
24 #![deny(missing_docs)]
25 #![deny(unreachable_pub)]
26 #![warn(rust_2018_idioms)]
30 use std
::error
::Error
;
33 use std
::marker
::PhantomData
;
34 use std
::str::FromStr
;
55 pub use self::join
::{join, join_context}
;
56 pub use self::registry
::ThreadBuilder
;
57 pub use self::scope
::{scope, Scope}
;
58 pub use self::scope
::{scope_fifo, ScopeFifo}
;
59 pub use self::spawn
::{spawn, spawn_fifo}
;
60 pub use self::thread_pool
::current_thread_has_pending_tasks
;
61 pub use self::thread_pool
::current_thread_index
;
62 pub use self::thread_pool
::ThreadPool
;
64 use self::registry
::{CustomSpawn, DefaultSpawn, ThreadSpawn}
;
66 /// Returns the number of threads in the current registry. If this
67 /// code is executing within a Rayon thread-pool, then this will be
68 /// the number of threads for the thread-pool of the current
69 /// thread. Otherwise, it will be the number of threads for the global
72 /// This can be useful when trying to judge how many times to split
73 /// parallel work (the parallel iterator traits use this value
74 /// internally for this purpose).
76 /// # Future compatibility note
78 /// Note that unless this thread-pool was created with a
79 /// builder that specifies the number of threads, then this
80 /// number may vary over time in future versions (see [the
81 /// `num_threads()` method for details][snt]).
83 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
84 pub fn current_num_threads() -> usize {
85 crate::registry
::Registry
::current_num_threads()
88 /// Error when initializing a thread pool.
90 pub struct ThreadPoolBuildError
{
96 GlobalPoolAlreadyInitialized
,
100 /// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
101 /// ## Creating a ThreadPool
102 /// The following creates a thread pool with 22 threads.
105 /// # use rayon_core as rayon;
106 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
109 /// To instead configure the global thread pool, use [`build_global()`]:
112 /// # use rayon_core as rayon;
113 /// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
116 /// [`ThreadPool`]: struct.ThreadPool.html
117 /// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
118 pub struct ThreadPoolBuilder
<S
= DefaultSpawn
> {
119 /// The number of threads in the rayon thread pool.
120 /// If zero will use the RAYON_NUM_THREADS environment variable.
121 /// If RAYON_NUM_THREADS is invalid or zero will use the default.
124 /// Custom closure, if any, to handle a panic that we cannot propagate
126 panic_handler
: Option
<Box
<PanicHandler
>>,
128 /// Closure to compute the name of a thread.
129 get_thread_name
: Option
<Box
<dyn FnMut(usize) -> String
>>,
131 /// The stack size for the created worker threads
132 stack_size
: Option
<usize>,
134 /// Closure invoked on worker thread start.
135 start_handler
: Option
<Box
<StartHandler
>>,
137 /// Closure invoked on worker thread exit.
138 exit_handler
: Option
<Box
<ExitHandler
>>,
140 /// Closure invoked to spawn threads.
143 /// If false, worker threads will execute spawned jobs in a
144 /// "depth-first" fashion. If true, they will do a "breadth-first"
145 /// fashion. Depth-first is the default.
149 /// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
151 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
152 #[deprecated(note = "Use `ThreadPoolBuilder`")]
153 pub struct Configuration
{
154 builder
: ThreadPoolBuilder
,
157 /// The type for a panic handling closure. Note that this same closure
158 /// may be invoked multiple times in parallel.
159 type PanicHandler
= dyn Fn(Box
<dyn Any
+ Send
>) + Send
+ Sync
;
161 /// The type for a closure that gets invoked when a thread starts. The
162 /// closure is passed the index of the thread on which it is invoked.
163 /// Note that this same closure may be invoked multiple times in parallel.
164 type StartHandler
= dyn Fn(usize) + Send
+ Sync
;
166 /// The type for a closure that gets invoked when a thread exits. The
167 /// closure is passed the index of the thread on which is is invoked.
168 /// Note that this same closure may be invoked multiple times in parallel.
169 type ExitHandler
= dyn Fn(usize) + Send
+ Sync
;
171 // NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
172 impl Default
for ThreadPoolBuilder
{
173 fn default() -> Self {
177 get_thread_name
: None
,
181 spawn_handler
: DefaultSpawn
,
182 breadth_first
: false,
187 impl ThreadPoolBuilder
{
188 /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
189 pub fn new() -> Self {
194 /// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
195 /// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
196 impl<S
> ThreadPoolBuilder
<S
>
200 /// Creates a new `ThreadPool` initialized using this configuration.
201 pub fn build(self) -> Result
<ThreadPool
, ThreadPoolBuildError
> {
202 ThreadPool
::build(self)
205 /// Initializes the global thread pool. This initialization is
206 /// **optional**. If you do not call this function, the thread pool
207 /// will be automatically initialized with the default
208 /// configuration. Calling `build_global` is not recommended, except
209 /// in two scenarios:
211 /// - You wish to change the default configuration.
212 /// - You are running a benchmark, in which case initializing may
213 /// yield slightly more consistent results, since the worker threads
214 /// will already be ready to go even in the first iteration. But
215 /// this cost is minimal.
217 /// Initialization of the global thread pool happens exactly
218 /// once. Once started, the configuration cannot be
219 /// changed. Therefore, if you call `build_global` a second time, it
220 /// will return an error. An `Ok` result indicates that this
221 /// is the first initialization of the thread pool.
222 pub fn build_global(self) -> Result
<(), ThreadPoolBuildError
> {
223 let registry
= registry
::init_global_registry(self)?
;
224 registry
.wait_until_primed();
229 impl ThreadPoolBuilder
{
230 /// Creates a scoped `ThreadPool` initialized using this configuration.
232 /// This is a convenience function for building a pool using [`crossbeam::scope`]
233 /// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
234 /// The threads in this pool will start by calling `wrapper`, which should
235 /// do initialization and continue by calling `ThreadBuilder::run()`.
237 /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
241 /// A scoped pool may be useful in combination with scoped thread-local variables.
244 /// # use rayon_core as rayon;
246 /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
248 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
249 /// let pool_data = vec![1, 2, 3];
251 /// // We haven't assigned any TLS data yet.
252 /// assert!(!POOL_DATA.is_set());
254 /// rayon::ThreadPoolBuilder::new()
256 /// // Borrow `pool_data` in TLS for each thread.
257 /// |thread| POOL_DATA.set(&pool_data, || thread.run()),
258 /// // Do some work that needs the TLS data.
259 /// |pool| pool.install(|| assert!(POOL_DATA.is_set())),
262 /// // Once we've returned, `pool_data` is no longer borrowed.
267 pub fn build_scoped
<W
, F
, R
>(self, wrapper
: W
, with_pool
: F
) -> Result
<R
, ThreadPoolBuildError
>
269 W
: Fn(ThreadBuilder
) + Sync
, // expected to call `run()`
270 F
: FnOnce(&ThreadPool
) -> R
,
272 let result
= crossbeam_utils
::thread
::scope(|scope
| {
273 let wrapper
= &wrapper
;
275 .spawn_handler(|thread
| {
276 let mut builder
= scope
.builder();
277 if let Some(name
) = thread
.name() {
278 builder
= builder
.name(name
.to_string());
280 if let Some(size
) = thread
.stack_size() {
281 builder
= builder
.stack_size(size
);
283 builder
.spawn(move |_
| wrapper(thread
))?
;
291 Ok(result
) => result
,
292 Err(err
) => unwind
::resume_unwinding(err
),
297 impl<S
> ThreadPoolBuilder
<S
> {
298 /// Sets a custom function for spawning threads.
300 /// Note that the threads will not exit until after the pool is dropped. It
301 /// is up to the caller to wait for thread termination if that is important
302 /// for any invariants. For instance, threads created in [`crossbeam::scope`]
303 /// will be joined before that scope returns, and this will block indefinitely
304 /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
305 /// until the entire process exits!
307 /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
311 /// A minimal spawn handler just needs to call `run()` from an independent thread.
314 /// # use rayon_core as rayon;
315 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
316 /// let pool = rayon::ThreadPoolBuilder::new()
317 /// .spawn_handler(|thread| {
318 /// std::thread::spawn(|| thread.run());
323 /// pool.install(|| println!("Hello from my custom thread!"));
328 /// The default spawn handler sets the name and stack size if given, and propagates
329 /// any errors from the thread builder.
332 /// # use rayon_core as rayon;
333 /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
334 /// let pool = rayon::ThreadPoolBuilder::new()
335 /// .spawn_handler(|thread| {
336 /// let mut b = std::thread::Builder::new();
337 /// if let Some(name) = thread.name() {
338 /// b = b.name(name.to_owned());
340 /// if let Some(stack_size) = thread.stack_size() {
341 /// b = b.stack_size(stack_size);
343 /// b.spawn(|| thread.run())?;
348 /// pool.install(|| println!("Hello from my fully custom thread!"));
352 pub fn spawn_handler
<F
>(self, spawn
: F
) -> ThreadPoolBuilder
<CustomSpawn
<F
>>
354 F
: FnMut(ThreadBuilder
) -> io
::Result
<()>,
357 spawn_handler
: CustomSpawn
::new(spawn
),
359 num_threads
: self.num_threads
,
360 panic_handler
: self.panic_handler
,
361 get_thread_name
: self.get_thread_name
,
362 stack_size
: self.stack_size
,
363 start_handler
: self.start_handler
,
364 exit_handler
: self.exit_handler
,
365 breadth_first
: self.breadth_first
,
369 /// Returns a reference to the current spawn handler.
370 fn get_spawn_handler(&mut self) -> &mut S
{
371 &mut self.spawn_handler
374 /// Get the number of threads that will be used for the thread
375 /// pool. See `num_threads()` for more information.
376 fn get_num_threads(&self) -> usize {
377 if self.num_threads
> 0 {
380 match env
::var("RAYON_NUM_THREADS")
382 .and_then(|s
| usize::from_str(&s
).ok())
384 Some(x
) if x
> 0 => return x
,
385 Some(x
) if x
== 0 => return num_cpus
::get(),
389 // Support for deprecated `RAYON_RS_NUM_CPUS`.
390 match env
::var("RAYON_RS_NUM_CPUS")
392 .and_then(|s
| usize::from_str(&s
).ok())
394 Some(x
) if x
> 0 => x
,
395 _
=> num_cpus
::get(),
400 /// Get the thread name for the thread with the given index.
401 fn get_thread_name(&mut self, index
: usize) -> Option
<String
> {
402 let f
= self.get_thread_name
.as_mut()?
;
406 /// Sets a closure which takes a thread index and returns
407 /// the thread's name.
408 pub fn thread_name
<F
>(mut self, closure
: F
) -> Self
410 F
: FnMut(usize) -> String
+ '
static,
412 self.get_thread_name
= Some(Box
::new(closure
));
416 /// Sets the number of threads to be used in the rayon threadpool.
418 /// If you specify a non-zero number of threads using this
419 /// function, then the resulting thread-pools are guaranteed to
420 /// start at most this number of threads.
422 /// If `num_threads` is 0, or you do not call this function, then
423 /// the Rayon runtime will select the number of threads
424 /// automatically. At present, this is based on the
425 /// `RAYON_NUM_THREADS` environment variable (if set),
426 /// or the number of logical CPUs (otherwise).
427 /// In the future, however, the default behavior may
428 /// change to dynamically add or remove threads as needed.
430 /// **Future compatibility warning:** Given the default behavior
431 /// may change in the future, if you wish to rely on a fixed
432 /// number of threads, you should use this function to specify
433 /// that number. To reproduce the current default behavior, you
434 /// may wish to use the [`num_cpus`
435 /// crate](https://crates.io/crates/num_cpus) to query the number
436 /// of CPUs dynamically.
438 /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
439 /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
440 /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
442 pub fn num_threads(mut self, num_threads
: usize) -> Self {
443 self.num_threads
= num_threads
;
447 /// Returns a copy of the current panic handler.
448 fn take_panic_handler(&mut self) -> Option
<Box
<PanicHandler
>> {
449 self.panic_handler
.take()
452 /// Normally, whenever Rayon catches a panic, it tries to
453 /// propagate it to someplace sensible, to try and reflect the
454 /// semantics of sequential execution. But in some cases,
455 /// particularly with the `spawn()` APIs, there is no
456 /// obvious place where we should propagate the panic to.
457 /// In that case, this panic handler is invoked.
459 /// If no panic handler is set, the default is to abort the
460 /// process, under the principle that panics should not go
463 /// If the panic handler itself panics, this will abort the
464 /// process. To prevent this, wrap the body of your panic handler
465 /// in a call to `std::panic::catch_unwind()`.
466 pub fn panic_handler
<H
>(mut self, panic_handler
: H
) -> Self
468 H
: Fn(Box
<dyn Any
+ Send
>) + Send
+ Sync
+ '
static,
470 self.panic_handler
= Some(Box
::new(panic_handler
));
474 /// Get the stack size of the worker threads
475 fn get_stack_size(&self) -> Option
<usize> {
479 /// Sets the stack size of the worker threads
480 pub fn stack_size(mut self, stack_size
: usize) -> Self {
481 self.stack_size
= Some(stack_size
);
485 /// **(DEPRECATED)** Suggest to worker threads that they execute
486 /// spawned jobs in a "breadth-first" fashion.
488 /// Typically, when a worker thread is idle or blocked, it will
489 /// attempt to execute the job from the *top* of its local deque of
490 /// work (i.e., the job most recently spawned). If this flag is set
491 /// to true, however, workers will prefer to execute in a
492 /// *breadth-first* fashion -- that is, they will search for jobs at
493 /// the *bottom* of their local deque. (At present, workers *always*
494 /// steal from the bottom of other worker's deques, regardless of
495 /// the setting of this flag.)
497 /// If you think of the tasks as a tree, where a parent task
498 /// spawns its children in the tree, then this flag loosely
499 /// corresponds to doing a breadth-first traversal of the tree,
500 /// whereas the default would be to do a depth-first traversal.
502 /// **Note that this is an "execution hint".** Rayon's task
503 /// execution is highly dynamic and the precise order in which
504 /// independent tasks are executed is not intended to be
507 /// This `breadth_first()` method is now deprecated per [RFC #1],
508 /// and in the future its effect may be removed. Consider using
509 /// [`scope_fifo()`] for a similar effect.
511 /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
512 /// [`scope_fifo()`]: fn.scope_fifo.html
513 #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
514 pub fn breadth_first(mut self) -> Self {
515 self.breadth_first
= true;
519 fn get_breadth_first(&self) -> bool
{
523 /// Takes the current thread start callback, leaving `None`.
524 fn take_start_handler(&mut self) -> Option
<Box
<StartHandler
>> {
525 self.start_handler
.take()
528 /// Sets a callback to be invoked on thread start.
530 /// The closure is passed the index of the thread on which it is invoked.
531 /// Note that this same closure may be invoked multiple times in parallel.
532 /// If this closure panics, the panic will be passed to the panic handler.
533 /// If that handler returns, then startup will continue normally.
534 pub fn start_handler
<H
>(mut self, start_handler
: H
) -> Self
536 H
: Fn(usize) + Send
+ Sync
+ '
static,
538 self.start_handler
= Some(Box
::new(start_handler
));
542 /// Returns a current thread exit callback, leaving `None`.
543 fn take_exit_handler(&mut self) -> Option
<Box
<ExitHandler
>> {
544 self.exit_handler
.take()
547 /// Sets a callback to be invoked on thread exit.
549 /// The closure is passed the index of the thread on which it is invoked.
550 /// Note that this same closure may be invoked multiple times in parallel.
551 /// If this closure panics, the panic will be passed to the panic handler.
552 /// If that handler returns, then the thread will exit normally.
553 pub fn exit_handler
<H
>(mut self, exit_handler
: H
) -> Self
555 H
: Fn(usize) + Send
+ Sync
+ '
static,
557 self.exit_handler
= Some(Box
::new(exit_handler
));
564 /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
565 pub fn new() -> Configuration
{
567 builder
: ThreadPoolBuilder
::new(),
571 /// Deprecated in favor of `ThreadPoolBuilder::build`.
572 pub fn build(self) -> Result
<ThreadPool
, Box
<dyn Error
+ '
static>> {
573 self.builder
.build().map_err(Box
::from
)
576 /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
577 pub fn thread_name
<F
>(mut self, closure
: F
) -> Self
579 F
: FnMut(usize) -> String
+ '
static,
581 self.builder
= self.builder
.thread_name(closure
);
585 /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
586 pub fn num_threads(mut self, num_threads
: usize) -> Configuration
{
587 self.builder
= self.builder
.num_threads(num_threads
);
591 /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
592 pub fn panic_handler
<H
>(mut self, panic_handler
: H
) -> Configuration
594 H
: Fn(Box
<dyn Any
+ Send
>) + Send
+ Sync
+ '
static,
596 self.builder
= self.builder
.panic_handler(panic_handler
);
600 /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
601 pub fn stack_size(mut self, stack_size
: usize) -> Self {
602 self.builder
= self.builder
.stack_size(stack_size
);
606 /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
607 pub fn breadth_first(mut self) -> Self {
608 self.builder
= self.builder
.breadth_first();
612 /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
613 pub fn start_handler
<H
>(mut self, start_handler
: H
) -> Configuration
615 H
: Fn(usize) + Send
+ Sync
+ '
static,
617 self.builder
= self.builder
.start_handler(start_handler
);
621 /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
622 pub fn exit_handler
<H
>(mut self, exit_handler
: H
) -> Configuration
624 H
: Fn(usize) + Send
+ Sync
+ '
static,
626 self.builder
= self.builder
.exit_handler(exit_handler
);
630 /// Returns a ThreadPoolBuilder with identical parameters.
631 fn into_builder(self) -> ThreadPoolBuilder
{
636 impl ThreadPoolBuildError
{
637 fn new(kind
: ErrorKind
) -> ThreadPoolBuildError
{
638 ThreadPoolBuildError { kind }
642 const GLOBAL_POOL_ALREADY_INITIALIZED
: &str =
643 "The global thread pool has already been initialized.";
645 impl Error
for ThreadPoolBuildError
{
647 fn description(&self) -> &str {
649 ErrorKind
::GlobalPoolAlreadyInitialized
=> GLOBAL_POOL_ALREADY_INITIALIZED
,
650 ErrorKind
::IOError(ref e
) => e
.description(),
654 fn source(&self) -> Option
<&(dyn Error
+ '
static)> {
656 ErrorKind
::GlobalPoolAlreadyInitialized
=> None
,
657 ErrorKind
::IOError(e
) => Some(e
),
662 impl fmt
::Display
for ThreadPoolBuildError
{
663 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
665 ErrorKind
::GlobalPoolAlreadyInitialized
=> GLOBAL_POOL_ALREADY_INITIALIZED
.fmt(f
),
666 ErrorKind
::IOError(e
) => e
.fmt(f
),
671 /// Deprecated in favor of `ThreadPoolBuilder::build_global`.
672 #[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
674 pub fn initialize(config
: Configuration
) -> Result
<(), Box
<dyn Error
>> {
675 config
.into_builder().build_global().map_err(Box
::from
)
678 impl<S
> fmt
::Debug
for ThreadPoolBuilder
<S
> {
679 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
680 let ThreadPoolBuilder
{
691 // Just print `Some(<closure>)` or `None` to the debug
693 struct ClosurePlaceholder
;
694 impl fmt
::Debug
for ClosurePlaceholder
{
695 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
696 f
.write_str("<closure>")
699 let get_thread_name
= get_thread_name
.as_ref().map(|_
| ClosurePlaceholder
);
700 let panic_handler
= panic_handler
.as_ref().map(|_
| ClosurePlaceholder
);
701 let start_handler
= start_handler
.as_ref().map(|_
| ClosurePlaceholder
);
702 let exit_handler
= exit_handler
.as_ref().map(|_
| ClosurePlaceholder
);
704 f
.debug_struct("ThreadPoolBuilder")
705 .field("num_threads", num_threads
)
706 .field("get_thread_name", &get_thread_name
)
707 .field("panic_handler", &panic_handler
)
708 .field("stack_size", &stack_size
)
709 .field("start_handler", &start_handler
)
710 .field("exit_handler", &exit_handler
)
711 .field("breadth_first", &breadth_first
)
717 impl Default
for Configuration
{
718 fn default() -> Self {
720 builder
: Default
::default(),
726 impl fmt
::Debug
for Configuration
{
727 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
732 /// Provides the calling context to a closure called by `join_context`.
734 pub struct FnContext
{
737 /// disable `Send` and `Sync`, just for a little future-proofing.
738 _marker
: PhantomData
<*mut ()>,
743 fn new(migrated
: bool
) -> Self {
746 _marker
: PhantomData
,
752 /// Returns `true` if the closure was called from a different thread
753 /// than it was provided from.
755 pub fn migrated(&self) -> bool
{