]> git.proxmox.com Git - rustc.git/blame - src/vendor/rustc-rayon-core/src/scope/mod.rs
New upstream version 1.31.0+dfsg1
[rustc.git] / src / vendor / rustc-rayon-core / src / scope / mod.rs
CommitLineData
94b46f34
XL
1//! Methods for custom fork-join scopes, created by the [`scope()`]
2//! function. These are a more flexible alternative to [`join()`].
3//!
4//! [`scope()`]: fn.scope.html
5//! [`join()`]: ../join/join.fn.html
6
7use latch::{Latch, CountLatch};
8use log::Event::*;
9use job::HeapJob;
10use std::any::Any;
11use std::fmt;
12use std::marker::PhantomData;
13use std::mem;
14use std::ptr;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicPtr, Ordering};
17use registry::{in_worker, WorkerThread, Registry};
18use unwind;
19
20#[cfg(test)]
21mod test;
22mod internal;
23
24///Represents a fork-join scope which can be used to spawn any number of tasks. See [`scope()`] for more information.
25///
26///[`scope()`]: fn.scope.html
27pub struct Scope<'scope> {
28 /// thread where `scope()` was executed (note that individual jobs
29 /// may be executing on different worker threads, though they
30 /// should always be within the same pool of threads)
31 owner_thread_index: usize,
32
33 /// thread registry where `scope()` was executed.
34 registry: Arc<Registry>,
35
36 /// if some job panicked, the error is stored here; it will be
37 /// propagated to the one who created the scope
38 panic: AtomicPtr<Box<Any + Send + 'static>>,
39
40 /// latch to set when the counter drops to zero (and hence this scope is complete)
41 job_completed_latch: CountLatch,
42
43 /// You can think of a scope as containing a list of closures to execute,
44 /// all of which outlive `'scope`. They're not actually required to be
45 /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
46 /// the closures are only *moved* across threads to be executed.
47 marker: PhantomData<Box<FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
48}
49
50/// Create a "fork-join" scope `s` and invokes the closure with a
51/// reference to `s`. This closure can then spawn asynchronous tasks
52/// into `s`. Those tasks may run asynchronously with respect to the
53/// closure; they may themselves spawn additional tasks into `s`. When
54/// the closure returns, it will block until all tasks that have been
55/// spawned into `s` complete.
56///
57/// `scope()` is a more flexible building block compared to `join()`,
58/// since a loop can be used to spawn any number of tasks without
59/// recursing. However, that flexibility comes at a performance price:
60/// tasks spawned using `scope()` must be allocated onto the heap,
61/// whereas `join()` can make exclusive use of the stack. **Prefer
62/// `join()` (or, even better, parallel iterators) where possible.**
63///
64/// # Example
65///
66/// The Rayon `join()` function launches two closures and waits for them
67/// to stop. One could implement `join()` using a scope like so, although
68/// it would be less efficient than the real implementation:
69///
70/// ```rust
71/// # use rayon_core as rayon;
72/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
73/// where A: FnOnce() -> RA + Send,
74/// B: FnOnce() -> RB + Send,
75/// RA: Send,
76/// RB: Send,
77/// {
78/// let mut result_a: Option<RA> = None;
79/// let mut result_b: Option<RB> = None;
80/// rayon::scope(|s| {
81/// s.spawn(|_| result_a = Some(oper_a()));
82/// s.spawn(|_| result_b = Some(oper_b()));
83/// });
84/// (result_a.unwrap(), result_b.unwrap())
85/// }
86/// ```
87///
88/// # A note on threading
89///
90/// The closure given to `scope()` executes in the Rayon thread-pool,
91/// as do those given to `spawn()`. This means that you can't access
92/// thread-local variables (well, you can, but they may have
93/// unexpected values).
94///
95/// # Task execution
96///
97/// Task execution potentially starts as soon as `spawn()` is called.
98/// The task will end sometime before `scope()` returns. Note that the
99/// *closure* given to scope may return much earlier. In general
100/// the lifetime of a scope created like `scope(body) goes something like this:
101///
102/// - Scope begins when `scope(body)` is called
103/// - Scope body `body()` is invoked
104/// - Scope tasks may be spawned
105/// - Scope body returns
106/// - Scope tasks execute, possibly spawning more tasks
107/// - Once all tasks are done, scope ends and `scope()` returns
108///
109/// To see how and when tasks are joined, consider this example:
110///
111/// ```rust
112/// # use rayon_core as rayon;
113/// // point start
114/// rayon::scope(|s| {
115/// s.spawn(|s| { // task s.1
116/// s.spawn(|s| { // task s.1.1
117/// rayon::scope(|t| {
118/// t.spawn(|_| ()); // task t.1
119/// t.spawn(|_| ()); // task t.2
120/// });
121/// });
122/// });
123/// s.spawn(|s| { // task 2
124/// });
125/// // point mid
126/// });
127/// // point end
128/// ```
129///
130/// The various tasks that are run will execute roughly like so:
131///
132/// ```notrust
133/// | (start)
134/// |
135/// | (scope `s` created)
136/// +--------------------+ (task s.1)
137/// +-------+ (task s.2) |
138/// | | +---+ (task s.1.1)
139/// | | | |
140/// | | | | (scope `t` created)
141/// | | | +----------------+ (task t.1)
142/// | | | +---+ (task t.2) |
143/// | (mid) | | | | |
144/// : | | + <-+------------+ (scope `t` ends)
145/// : | | |
146/// |<------+------------+---+ (scope `s` ends)
147/// |
148/// | (end)
149/// ```
150///
151/// The point here is that everything spawned into scope `s` will
152/// terminate (at latest) at the same point -- right before the
153/// original call to `rayon::scope` returns. This includes new
154/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
155/// scope is created (such as `t`), the things spawned into that scope
156/// will be joined before that scope returns, which in turn occurs
157/// before the creating task (task `s.1.1` in this case) finishes.
158///
159/// # Accessing stack data
160///
161/// In general, spawned tasks may access stack data in place that
162/// outlives the scope itself. Other data must be fully owned by the
163/// spawned task.
164///
165/// ```rust
166/// # use rayon_core as rayon;
167/// let ok: Vec<i32> = vec![1, 2, 3];
168/// rayon::scope(|s| {
169/// let bad: Vec<i32> = vec![4, 5, 6];
170/// s.spawn(|_| {
171/// // We can access `ok` because outlives the scope `s`.
172/// println!("ok: {:?}", ok);
173///
174/// // If we just try to use `bad` here, the closure will borrow `bad`
175/// // (because we are just printing it out, and that only requires a
176/// // borrow), which will result in a compilation error. Read on
177/// // for options.
178/// // println!("bad: {:?}", bad);
179/// });
180/// });
181/// ```
182///
183/// As the comments example above suggest, to reference `bad` we must
184/// take ownership of it. One way to do this is to detach the closure
185/// from the surrounding stack frame, using the `move` keyword. This
186/// will cause it to take ownership of *all* the variables it touches,
187/// in this case including both `ok` *and* `bad`:
188///
189/// ```rust
190/// # use rayon_core as rayon;
191/// let ok: Vec<i32> = vec![1, 2, 3];
192/// rayon::scope(|s| {
193/// let bad: Vec<i32> = vec![4, 5, 6];
194/// s.spawn(move |_| {
195/// println!("ok: {:?}", ok);
196/// println!("bad: {:?}", bad);
197/// });
198///
199/// // That closure is fine, but now we can't use `ok` anywhere else,
200/// // since it is owend by the previous task:
201/// // s.spawn(|_| println!("ok: {:?}", ok));
202/// });
203/// ```
204///
205/// While this works, it could be a problem if we want to use `ok` elsewhere.
206/// There are two choices. We can keep the closure as a `move` closure, but
207/// instead of referencing the variable `ok`, we create a shadowed variable that
208/// is a borrow of `ok` and capture *that*:
209///
210/// ```rust
211/// # use rayon_core as rayon;
212/// let ok: Vec<i32> = vec![1, 2, 3];
213/// rayon::scope(|s| {
214/// let bad: Vec<i32> = vec![4, 5, 6];
215/// let ok: &Vec<i32> = &ok; // shadow the original `ok`
216/// s.spawn(move |_| {
217/// println!("ok: {:?}", ok); // captures the shadowed version
218/// println!("bad: {:?}", bad);
219/// });
220///
221/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
222/// // can be shared freely. Note that we need a `move` closure here though,
223/// // because otherwise we'd be trying to borrow the shadowed `ok`,
224/// // and that doesn't outlive `scope`.
225/// s.spawn(move |_| println!("ok: {:?}", ok));
226/// });
227/// ```
228///
229/// Another option is not to use the `move` keyword but instead to take ownership
230/// of individual variables:
231///
232/// ```rust
233/// # use rayon_core as rayon;
234/// let ok: Vec<i32> = vec![1, 2, 3];
235/// rayon::scope(|s| {
236/// let bad: Vec<i32> = vec![4, 5, 6];
237/// s.spawn(|_| {
238/// // Transfer ownership of `bad` into a local variable (also named `bad`).
239/// // This will force the closure to take ownership of `bad` from the environment.
240/// let bad = bad;
241/// println!("ok: {:?}", ok); // `ok` is only borrowed.
242/// println!("bad: {:?}", bad); // refers to our local variable, above.
243/// });
244///
245/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
246/// });
247/// ```
248///
249/// # Panics
250///
251/// If a panic occurs, either in the closure given to `scope()` or in
252/// any of the spawned jobs, that panic will be propagated and the
253/// call to `scope()` will panic. If multiple panics occurs, it is
254/// non-deterministic which of their panic values will propagate.
255/// Regardless, once a task is spawned using `scope.spawn()`, it will
256/// execute, even if the spawning task should later panic. `scope()`
257/// returns once all spawned jobs have completed, and any panics are
258/// propagated at that point.
259pub fn scope<'scope, OP, R>(op: OP) -> R
260 where OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, R: Send,
261{
262 in_worker(|owner_thread, _| {
263 unsafe {
264 let scope: Scope<'scope> = Scope {
265 owner_thread_index: owner_thread.index(),
266 registry: owner_thread.registry().clone(),
267 panic: AtomicPtr::new(ptr::null_mut()),
268 job_completed_latch: CountLatch::new(),
269 marker: PhantomData,
270 };
271 let result = scope.execute_job_closure(op);
272 scope.steal_till_jobs_complete(owner_thread);
273 result.unwrap() // only None if `op` panicked, and that would have been propagated
274 }
275 })
276}
277
278impl<'scope> Scope<'scope> {
279 /// Spawns a job into the fork-join scope `self`. This job will
280 /// execute sometime before the fork-join scope completes. The
281 /// job is specified as a closure, and this closure receives its
282 /// own reference to the scope `self` as argument. This can be
283 /// used to inject new jobs into `self`.
284 ///
285 /// # Returns
286 ///
287 /// Nothing. The spawned closures cannot pass back values to the
288 /// caller directly, though they can write to local variables on
289 /// the stack (if those variables outlive the scope) or
290 /// communicate through shared channels.
291 ///
292 /// (The intention is to eventualy integrate with Rust futures to
293 /// support spawns of functions that compute a value.)
294 ///
295 /// # Examples
296 ///
297 /// ```rust
298 /// # use rayon_core as rayon;
299 /// let mut value_a = None;
300 /// let mut value_b = None;
301 /// let mut value_c = None;
302 /// rayon::scope(|s| {
303 /// s.spawn(|s1| {
304 /// // ^ this is the same scope as `s`; this handle `s1`
305 /// // is intended for use by the spawned task,
306 /// // since scope handles cannot cross thread boundaries.
307 ///
308 /// value_a = Some(22);
309 ///
310 /// // the scope `s` will not end until all these tasks are done
311 /// s1.spawn(|_| {
312 /// value_b = Some(44);
313 /// });
314 /// });
315 ///
316 /// s.spawn(|_| {
317 /// value_c = Some(66);
318 /// });
319 /// });
320 /// assert_eq!(value_a, Some(22));
321 /// assert_eq!(value_b, Some(44));
322 /// assert_eq!(value_c, Some(66));
323 /// ```
324 ///
325 /// # See also
326 ///
327 /// The [`scope` function] has more extensive documentation about
328 /// task spawning.
329 ///
330 /// [`scope` function]: fn.scope.html
331 pub fn spawn<BODY>(&self, body: BODY)
332 where BODY: FnOnce(&Scope<'scope>) + Send + 'scope
333 {
334 unsafe {
335 self.job_completed_latch.increment();
336 let job_ref = Box::new(HeapJob::new(move || self.execute_job(body)))
337 .as_job_ref();
338
339 // Since `Scope` implements `Sync`, we can't be sure
340 // that we're still in a thread of this pool, so we
341 // can't just push to the local worker thread.
342 self.registry.inject_or_push(job_ref);
343 }
344 }
345
346 /// Executes `func` as a job, either aborting or executing as
347 /// appropriate.
348 ///
349 /// Unsafe because it must be executed on a worker thread.
350 unsafe fn execute_job<FUNC>(&self, func: FUNC)
351 where FUNC: FnOnce(&Scope<'scope>) + 'scope
352 {
353 let _: Option<()> = self.execute_job_closure(func);
354 }
355
356 /// Executes `func` as a job in scope. Adjusts the "job completed"
357 /// counters and also catches any panic and stores it into
358 /// `scope`.
359 ///
360 /// Unsafe because this must be executed on a worker thread.
361 unsafe fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
362 where FUNC: FnOnce(&Scope<'scope>) -> R + 'scope
363 {
364 match unwind::halt_unwinding(move || func(self)) {
365 Ok(r) => { self.job_completed_ok(); Some(r) }
366 Err(err) => { self.job_panicked(err); None }
367 }
368 }
369
370 unsafe fn job_panicked(&self, err: Box<Any + Send + 'static>) {
371 // capture the first error we see, free the rest
372 let nil = ptr::null_mut();
373 let mut err = Box::new(err); // box up the fat ptr
374 if self.panic.compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed).is_ok() {
375 log!(JobPanickedErrorStored { owner_thread: self.owner_thread_index });
376 mem::forget(err); // ownership now transferred into self.panic
377 } else {
378 log!(JobPanickedErrorNotStored { owner_thread: self.owner_thread_index });
379 }
380
381
382 self.job_completed_latch.set();
383 }
384
385 unsafe fn job_completed_ok(&self) {
386 log!(JobCompletedOk { owner_thread: self.owner_thread_index });
387 self.job_completed_latch.set();
388 }
389
390 unsafe fn steal_till_jobs_complete(&self, owner_thread: &WorkerThread) {
391 // wait for job counter to reach 0:
392 owner_thread.wait_until(&self.job_completed_latch);
393
394 // propagate panic, if any occurred; at this point, all
395 // outstanding jobs have completed, so we can use a relaxed
396 // ordering:
397 let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
398 if !panic.is_null() {
399 log!(ScopeCompletePanicked { owner_thread: owner_thread.index() });
400 let value: Box<Box<Any + Send + 'static>> = mem::transmute(panic);
401 unwind::resume_unwinding(*value);
402 } else {
403 log!(ScopeCompleteNoPanic { owner_thread: owner_thread.index() });
404 }
405 }
406}
407
408impl<'scope> fmt::Debug for Scope<'scope> {
409 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
410 fmt.debug_struct("Scope")
411 .field("pool_id", &self.registry.id())
412 .field("owner_thread_index", &self.owner_thread_index)
413 .field("panic", &self.panic)
414 .field("job_completed_latch", &self.job_completed_latch)
415 .finish()
416 }
417}