]> git.proxmox.com Git - rustc.git/blame - vendor/rayon-core/src/join/mod.rs
New upstream version 1.48.0~beta.8+dfsg1
[rustc.git] / vendor / rayon-core / src / join / mod.rs
CommitLineData
f035d41b 1use crate::job::StackJob;
1b1a35ee 2use crate::latch::SpinLatch;
f035d41b
XL
3use crate::registry::{self, WorkerThread};
4use crate::unwind;
2c00a5a8 5use std::any::Any;
2c00a5a8 6
f035d41b 7use crate::FnContext;
2c00a5a8
XL
8
9#[cfg(test)]
10mod test;
11
12/// Takes two closures and *potentially* runs them in parallel. It
13/// returns a pair of the results from those closures.
14///
15/// Conceptually, calling `join()` is similar to spawning two threads,
16/// one executing each of the two closures. However, the
17/// implementation is quite different and incurs very low
18/// overhead. The underlying technique is called "work stealing": the
19/// Rayon runtime uses a fixed pool of worker threads and attempts to
20/// only execute code in parallel when there are idle CPUs to handle
21/// it.
22///
23/// When `join` is called from outside the thread pool, the calling
24/// thread will block while the closures execute in the pool. When
25/// `join` is called within the pool, the calling thread still actively
26/// participates in the thread pool. It will begin by executing closure
27/// A (on the current thread). While it is doing that, it will advertise
28/// closure B as being available for other threads to execute. Once closure A
29/// has completed, the current thread will try to execute closure B;
30/// if however closure B has been stolen, then it will look for other work
31/// while waiting for the thief to fully execute closure B. (This is the
32/// typical work-stealing strategy).
33///
34/// # Examples
35///
36/// This example uses join to perform a quick-sort (note this is not a
37/// particularly optimized implementation: if you **actually** want to
38/// sort for real, you should prefer [the `par_sort` method] offered
39/// by Rayon).
40///
dc9dc135 41/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort
2c00a5a8
XL
42///
43/// ```rust
44/// # use rayon_core as rayon;
45/// let mut v = vec![5, 1, 8, 22, 0, 44];
46/// quick_sort(&mut v);
47/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);
48///
49/// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
50/// if v.len() > 1 {
51/// let mid = partition(v);
52/// let (lo, hi) = v.split_at_mut(mid);
53/// rayon::join(|| quick_sort(lo),
54/// || quick_sort(hi));
55/// }
56/// }
57///
58/// // Partition rearranges all items `<=` to the pivot
59/// // item (arbitrary selected to be the last item in the slice)
60/// // to the first half of the slice. It then returns the
61/// // "dividing point" where the pivot is placed.
62/// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
63/// let pivot = v.len() - 1;
64/// let mut i = 0;
65/// for j in 0..pivot {
66/// if v[j] <= v[pivot] {
67/// v.swap(i, j);
68/// i += 1;
69/// }
70/// }
71/// v.swap(i, pivot);
72/// i
73/// }
74/// ```
75///
76/// # Warning about blocking I/O
77///
78/// The assumption is that the closures given to `join()` are
79/// CPU-bound tasks that do not perform I/O or other blocking
80/// operations. If you do perform I/O, and that I/O should block
81/// (e.g., waiting for a network request), the overall performance may
82/// be poor. Moreover, if you cause one closure to be blocked waiting
83/// on another (for example, using a channel), that could lead to a
84/// deadlock.
85///
86/// # Panics
87///
88/// No matter what happens, both closures will always be executed. If
89/// a single closure panics, whether it be the first or second
90/// closure, that panic will be propagated and hence `join()` will
91/// panic with the same panic value. If both closures panic, `join()`
92/// will panic with the panic value from the first closure.
93pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
416331ca
XL
94where
95 A: FnOnce() -> RA + Send,
96 B: FnOnce() -> RB + Send,
97 RA: Send,
98 RB: Send,
2c00a5a8 99{
e74abb32
XL
100 #[inline]
101 fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R {
102 move |_| f()
103 }
104
105 join_context(call(oper_a), call(oper_b))
2c00a5a8
XL
106}
107
108/// Identical to `join`, except that the closures have a parameter
109/// that provides context for the way the closure has been called,
110/// especially indicating whether they're executing on a different
111/// thread than where `join_context` was called. This will occur if
112/// the second job is stolen by a different thread, or if
113/// `join_context` was called from outside the thread pool to begin
114/// with.
115pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
416331ca
XL
116where
117 A: FnOnce(FnContext) -> RA + Send,
118 B: FnOnce(FnContext) -> RB + Send,
119 RA: Send,
120 RB: Send,
2c00a5a8 121{
e74abb32
XL
122 #[inline]
123 fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R {
124 move || f(FnContext::new(injected))
125 }
126
127 #[inline]
128 fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R {
129 move |migrated| f(FnContext::new(migrated))
130 }
131
2c00a5a8 132 registry::in_worker(|worker_thread, injected| unsafe {
2c00a5a8
XL
133 // Create virtual wrapper for task b; this all has to be
134 // done here so that the stack frame can keep it all live
135 // long enough.
1b1a35ee 136 let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
2c00a5a8
XL
137 let job_b_ref = job_b.as_job_ref();
138 worker_thread.push(job_b_ref);
139
140 // Execute task a; hopefully b gets stolen in the meantime.
e74abb32 141 let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
2c00a5a8
XL
142 let result_a = match status_a {
143 Ok(v) => v,
144 Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err),
145 };
146
147 // Now that task A has finished, try to pop job B from the
148 // local stack. It may already have been popped by job A; it
149 // may also have been stolen. There may also be some tasks
150 // pushed on top of it in the stack, and we will have to pop
151 // those off to get to it.
152 while !job_b.latch.probe() {
153 if let Some(job) = worker_thread.take_local_job() {
154 if job == job_b_ref {
155 // Found it! Let's run it.
156 //
157 // Note that this could panic, but it's ok if we unwind here.
2c00a5a8
XL
158 let result_b = job_b.run_inline(injected);
159 return (result_a, result_b);
160 } else {
2c00a5a8
XL
161 worker_thread.execute(job);
162 }
163 } else {
164 // Local deque is empty. Time to steal from other
165 // threads.
2c00a5a8
XL
166 worker_thread.wait_until(&job_b.latch);
167 debug_assert!(job_b.latch.probe());
168 break;
169 }
170 }
171
416331ca 172 (result_a, job_b.into_result())
2c00a5a8
XL
173 })
174}
175
176/// If job A panics, we still cannot return until we are sure that job
177/// B is complete. This is because it may contain references into the
178/// enclosing stack frame(s).
179#[cold] // cold path
416331ca
XL
180unsafe fn join_recover_from_panic(
181 worker_thread: &WorkerThread,
1b1a35ee 182 job_b_latch: &SpinLatch<'_>,
e74abb32 183 err: Box<dyn Any + Send>,
416331ca 184) -> ! {
2c00a5a8
XL
185 worker_thread.wait_until(job_b_latch);
186 unwind::resume_unwinding(err)
187}