]> git.proxmox.com Git - rustc.git/blob - compiler/rustc_data_structures/src/sync/parallel.rs
New upstream version 1.75.0+dfsg1
[rustc.git] / compiler / rustc_data_structures / src / sync / parallel.rs
1 //! This module defines parallel operations that are implemented in
2 //! one way for the serial compiler, and another way the parallel compiler.
3
4 #![allow(dead_code)]
5
6 use crate::sync::IntoDynSyncSend;
7 use crate::FatalErrorMarker;
8 use parking_lot::Mutex;
9 use std::any::Any;
10 use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
11
12 #[cfg(not(parallel_compiler))]
13 pub use disabled::*;
14 #[cfg(parallel_compiler)]
15 pub use enabled::*;
16
17 /// A guard used to hold panics that occur during a parallel section to later by unwound.
18 /// This is used for the parallel compiler to prevent fatal errors from non-deterministically
19 /// hiding errors by ensuring that everything in the section has completed executing before
20 /// continuing with unwinding. It's also used for the non-parallel code to ensure error message
21 /// output match the parallel compiler for testing purposes.
22 pub struct ParallelGuard {
23 panic: Mutex<Option<IntoDynSyncSend<Box<dyn Any + Send + 'static>>>>,
24 }
25
26 impl ParallelGuard {
27 pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
28 catch_unwind(AssertUnwindSafe(f))
29 .map_err(|err| {
30 let mut panic = self.panic.lock();
31 if panic.is_none() || !(*err).is::<FatalErrorMarker>() {
32 *panic = Some(IntoDynSyncSend(err));
33 }
34 })
35 .ok()
36 }
37 }
38
39 /// This gives access to a fresh parallel guard in the closure and will unwind any panics
40 /// caught in it after the closure returns.
41 #[inline]
42 pub fn parallel_guard<R>(f: impl FnOnce(&ParallelGuard) -> R) -> R {
43 let guard = ParallelGuard { panic: Mutex::new(None) };
44 let ret = f(&guard);
45 if let Some(IntoDynSyncSend(panic)) = guard.panic.into_inner() {
46 resume_unwind(panic);
47 }
48 ret
49 }
50
51 mod disabled {
52 use crate::sync::parallel_guard;
53
54 #[macro_export]
55 #[cfg(not(parallel_compiler))]
56 macro_rules! parallel {
57 ($($blocks:block),*) => {{
58 $crate::sync::parallel_guard(|guard| {
59 $(guard.run(|| $blocks);)*
60 });
61 }}
62 }
63
64 pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
65 where
66 A: FnOnce() -> RA,
67 B: FnOnce() -> RB,
68 {
69 let (a, b) = parallel_guard(|guard| {
70 let a = guard.run(oper_a);
71 let b = guard.run(oper_b);
72 (a, b)
73 });
74 (a.unwrap(), b.unwrap())
75 }
76
77 pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item)) {
78 parallel_guard(|guard| {
79 t.into_iter().for_each(|i| {
80 guard.run(|| for_each(i));
81 });
82 })
83 }
84
85 pub fn try_par_for_each_in<T: IntoIterator, E>(
86 t: T,
87 mut for_each: impl FnMut(T::Item) -> Result<(), E>,
88 ) -> Result<(), E> {
89 parallel_guard(|guard| {
90 t.into_iter().filter_map(|i| guard.run(|| for_each(i))).fold(Ok(()), Result::and)
91 })
92 }
93
94 pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>(
95 t: T,
96 mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R,
97 ) -> C {
98 parallel_guard(|guard| t.into_iter().filter_map(|i| guard.run(|| map(i))).collect())
99 }
100 }
101
102 #[cfg(parallel_compiler)]
103 mod enabled {
104 use crate::sync::{mode, parallel_guard, DynSend, DynSync, FromDyn};
105
106 /// Runs a list of blocks in parallel. The first block is executed immediately on
107 /// the current thread. Use that for the longest running block.
108 #[macro_export]
109 macro_rules! parallel {
110 (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
111 parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
112 };
113 (impl $fblock:block [$($blocks:expr,)*] []) => {
114 $crate::sync::parallel_guard(|guard| {
115 $crate::sync::scope(|s| {
116 $(
117 let block = $crate::sync::FromDyn::from(|| $blocks);
118 s.spawn(move |_| {
119 guard.run(move || block.into_inner()());
120 });
121 )*
122 guard.run(|| $fblock);
123 });
124 });
125 };
126 ($fblock:block, $($blocks:block),*) => {
127 if $crate::sync::is_dyn_thread_safe() {
128 // Reverse the order of the later blocks since Rayon executes them in reverse order
129 // when using a single thread. This ensures the execution order matches that
130 // of a single threaded rustc.
131 parallel!(impl $fblock [] [$($blocks),*]);
132 } else {
133 $crate::sync::parallel_guard(|guard| {
134 guard.run(|| $fblock);
135 $(guard.run(|| $blocks);)*
136 });
137 }
138 };
139 }
140
141 // This function only works when `mode::is_dyn_thread_safe()`.
142 pub fn scope<'scope, OP, R>(op: OP) -> R
143 where
144 OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
145 R: DynSend,
146 {
147 let op = FromDyn::from(op);
148 rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
149 }
150
151 #[inline]
152 pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
153 where
154 A: FnOnce() -> RA + DynSend,
155 B: FnOnce() -> RB + DynSend,
156 {
157 if mode::is_dyn_thread_safe() {
158 let oper_a = FromDyn::from(oper_a);
159 let oper_b = FromDyn::from(oper_b);
160 let (a, b) = parallel_guard(|guard| {
161 rayon::join(
162 move || guard.run(move || FromDyn::from(oper_a.into_inner()())),
163 move || guard.run(move || FromDyn::from(oper_b.into_inner()())),
164 )
165 });
166 (a.unwrap().into_inner(), b.unwrap().into_inner())
167 } else {
168 super::disabled::join(oper_a, oper_b)
169 }
170 }
171
172 use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
173
174 pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
175 t: T,
176 for_each: impl Fn(I) + DynSync + DynSend,
177 ) {
178 parallel_guard(|guard| {
179 if mode::is_dyn_thread_safe() {
180 let for_each = FromDyn::from(for_each);
181 t.into_par_iter().for_each(|i| {
182 guard.run(|| for_each(i));
183 });
184 } else {
185 t.into_iter().for_each(|i| {
186 guard.run(|| for_each(i));
187 });
188 }
189 });
190 }
191
192 pub fn try_par_for_each_in<
193 T: IntoIterator + IntoParallelIterator<Item = <T as IntoIterator>::Item>,
194 E: Send,
195 >(
196 t: T,
197 for_each: impl Fn(<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
198 ) -> Result<(), E> {
199 parallel_guard(|guard| {
200 if mode::is_dyn_thread_safe() {
201 let for_each = FromDyn::from(for_each);
202 t.into_par_iter()
203 .filter_map(|i| guard.run(|| for_each(i)))
204 .reduce(|| Ok(()), Result::and)
205 } else {
206 t.into_iter().filter_map(|i| guard.run(|| for_each(i))).fold(Ok(()), Result::and)
207 }
208 })
209 }
210
211 pub fn par_map<
212 I,
213 T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
214 R: std::marker::Send,
215 C: FromIterator<R> + FromParallelIterator<R>,
216 >(
217 t: T,
218 map: impl Fn(I) -> R + DynSync + DynSend,
219 ) -> C {
220 parallel_guard(|guard| {
221 if mode::is_dyn_thread_safe() {
222 let map = FromDyn::from(map);
223 t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect()
224 } else {
225 t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
226 }
227 })
228 }
229 }