1 //! This module defines parallel operations that are implemented in
2 //! one way for the serial compiler, and another way the parallel compiler.
6 use crate::sync
::IntoDynSyncSend
;
7 use crate::FatalErrorMarker
;
8 use parking_lot
::Mutex
;
10 use std
::panic
::{catch_unwind, resume_unwind, AssertUnwindSafe}
;
12 #[cfg(not(parallel_compiler))]
14 #[cfg(parallel_compiler)]
17 /// A guard used to hold panics that occur during a parallel section to later by unwound.
18 /// This is used for the parallel compiler to prevent fatal errors from non-deterministically
19 /// hiding errors by ensuring that everything in the section has completed executing before
20 /// continuing with unwinding. It's also used for the non-parallel code to ensure error message
21 /// output match the parallel compiler for testing purposes.
22 pub struct ParallelGuard
{
23 panic
: Mutex
<Option
<IntoDynSyncSend
<Box
<dyn Any
+ Send
+ '
static>>>>,
27 pub fn run
<R
>(&self, f
: impl FnOnce() -> R
) -> Option
<R
> {
28 catch_unwind(AssertUnwindSafe(f
))
30 let mut panic
= self.panic
.lock();
31 if panic
.is_none() || !(*err
).is
::<FatalErrorMarker
>() {
32 *panic
= Some(IntoDynSyncSend(err
));
39 /// This gives access to a fresh parallel guard in the closure and will unwind any panics
40 /// caught in it after the closure returns.
42 pub fn parallel_guard
<R
>(f
: impl FnOnce(&ParallelGuard
) -> R
) -> R
{
43 let guard
= ParallelGuard { panic: Mutex::new(None) }
;
45 if let Some(IntoDynSyncSend(panic
)) = guard
.panic
.into_inner() {
52 use crate::sync
::parallel_guard
;
55 #[cfg(not(parallel_compiler))]
56 macro_rules
! parallel
{
57 ($
($blocks
:block
),*) => {{
58 $
crate::sync
::parallel_guard(|guard
| {
59 $
(guard
.run(|| $blocks
);)*
64 pub fn join
<A
, B
, RA
, RB
>(oper_a
: A
, oper_b
: B
) -> (RA
, RB
)
69 let (a
, b
) = parallel_guard(|guard
| {
70 let a
= guard
.run(oper_a
);
71 let b
= guard
.run(oper_b
);
74 (a
.unwrap(), b
.unwrap())
77 pub fn par_for_each_in
<T
: IntoIterator
>(t
: T
, mut for_each
: impl FnMut(T
::Item
)) {
78 parallel_guard(|guard
| {
79 t
.into_iter().for_each(|i
| {
80 guard
.run(|| for_each(i
));
85 pub fn try_par_for_each_in
<T
: IntoIterator
, E
>(
87 mut for_each
: impl FnMut(T
::Item
) -> Result
<(), E
>,
89 parallel_guard(|guard
| {
90 t
.into_iter().filter_map(|i
| guard
.run(|| for_each(i
))).fold(Ok(()), Result
::and
)
94 pub fn par_map
<T
: IntoIterator
, R
, C
: FromIterator
<R
>>(
96 mut map
: impl FnMut(<<T
as IntoIterator
>::IntoIter
as Iterator
>::Item
) -> R
,
98 parallel_guard(|guard
| t
.into_iter().filter_map(|i
| guard
.run(|| map(i
))).collect())
102 #[cfg(parallel_compiler)]
104 use crate::sync
::{mode, parallel_guard, DynSend, DynSync, FromDyn}
;
106 /// Runs a list of blocks in parallel. The first block is executed immediately on
107 /// the current thread. Use that for the longest running block.
109 macro_rules
! parallel
{
110 (impl $fblock
:block
[$
($c
:expr
,)*] [$block
:expr $
(, $rest
:expr
)*]) => {
111 parallel
!(impl $fblock
[$block
, $
($c
,)*] [$
($rest
),*])
113 (impl $fblock
:block
[$
($blocks
:expr
,)*] []) => {
114 $
crate::sync
::parallel_guard(|guard
| {
115 $
crate::sync
::scope(|s
| {
117 let block
= $
crate::sync
::FromDyn
::from(|| $blocks
);
119 guard
.run(move || block
.into_inner()());
122 guard
.run(|| $fblock
);
126 ($fblock
:block
, $
($blocks
:block
),*) => {
127 if $
crate::sync
::is_dyn_thread_safe() {
128 // Reverse the order of the later blocks since Rayon executes them in reverse order
129 // when using a single thread. This ensures the execution order matches that
130 // of a single threaded rustc.
131 parallel
!(impl $fblock
[] [$
($blocks
),*]);
133 $
crate::sync
::parallel_guard(|guard
| {
134 guard
.run(|| $fblock
);
135 $
(guard
.run(|| $blocks
);)*
141 // This function only works when `mode::is_dyn_thread_safe()`.
142 pub fn scope
<'scope
, OP
, R
>(op
: OP
) -> R
144 OP
: FnOnce(&rayon
::Scope
<'scope
>) -> R
+ DynSend
,
147 let op
= FromDyn
::from(op
);
148 rayon
::scope(|s
| FromDyn
::from(op
.into_inner()(s
))).into_inner()
152 pub fn join
<A
, B
, RA
: DynSend
, RB
: DynSend
>(oper_a
: A
, oper_b
: B
) -> (RA
, RB
)
154 A
: FnOnce() -> RA
+ DynSend
,
155 B
: FnOnce() -> RB
+ DynSend
,
157 if mode
::is_dyn_thread_safe() {
158 let oper_a
= FromDyn
::from(oper_a
);
159 let oper_b
= FromDyn
::from(oper_b
);
160 let (a
, b
) = parallel_guard(|guard
| {
162 move || guard
.run(move || FromDyn
::from(oper_a
.into_inner()())),
163 move || guard
.run(move || FromDyn
::from(oper_b
.into_inner()())),
166 (a
.unwrap().into_inner(), b
.unwrap().into_inner())
168 super::disabled
::join(oper_a
, oper_b
)
172 use rayon
::iter
::{FromParallelIterator, IntoParallelIterator, ParallelIterator}
;
174 pub fn par_for_each_in
<I
, T
: IntoIterator
<Item
= I
> + IntoParallelIterator
<Item
= I
>>(
176 for_each
: impl Fn(I
) + DynSync
+ DynSend
,
178 parallel_guard(|guard
| {
179 if mode
::is_dyn_thread_safe() {
180 let for_each
= FromDyn
::from(for_each
);
181 t
.into_par_iter().for_each(|i
| {
182 guard
.run(|| for_each(i
));
185 t
.into_iter().for_each(|i
| {
186 guard
.run(|| for_each(i
));
192 pub fn try_par_for_each_in
<
193 T
: IntoIterator
+ IntoParallelIterator
<Item
= <T
as IntoIterator
>::Item
>,
197 for_each
: impl Fn(<T
as IntoIterator
>::Item
) -> Result
<(), E
> + DynSync
+ DynSend
,
199 parallel_guard(|guard
| {
200 if mode
::is_dyn_thread_safe() {
201 let for_each
= FromDyn
::from(for_each
);
203 .filter_map(|i
| guard
.run(|| for_each(i
)))
204 .reduce(|| Ok(()), Result
::and
)
206 t
.into_iter().filter_map(|i
| guard
.run(|| for_each(i
))).fold(Ok(()), Result
::and
)
213 T
: IntoIterator
<Item
= I
> + IntoParallelIterator
<Item
= I
>,
214 R
: std
::marker
::Send
,
215 C
: FromIterator
<R
> + FromParallelIterator
<R
>,
218 map
: impl Fn(I
) -> R
+ DynSync
+ DynSend
,
220 parallel_guard(|guard
| {
221 if mode
::is_dyn_thread_safe() {
222 let map
= FromDyn
::from(map
);
223 t
.into_par_iter().filter_map(|i
| guard
.run(|| map(i
))).collect()
225 t
.into_iter().filter_map(|i
| guard
.run(|| map(i
))).collect()