]>
Commit | Line | Data |
---|---|---|
94b46f34 XL |
1 | //! Methods for custom fork-join scopes, created by the [`scope()`] |
2 | //! function. These are a more flexible alternative to [`join()`]. | |
3 | //! | |
4 | //! [`scope()`]: fn.scope.html | |
5 | //! [`join()`]: ../join/join.fn.html | |
6 | ||
7 | use latch::{Latch, CountLatch}; | |
8 | use log::Event::*; | |
9 | use job::HeapJob; | |
10 | use std::any::Any; | |
11 | use std::fmt; | |
12 | use std::marker::PhantomData; | |
13 | use std::mem; | |
14 | use std::ptr; | |
15 | use std::sync::Arc; | |
16 | use std::sync::atomic::{AtomicPtr, Ordering}; | |
17 | use registry::{in_worker, WorkerThread, Registry}; | |
18 | use unwind; | |
19 | ||
20 | #[cfg(test)] | |
21 | mod test; | |
22 | mod internal; | |
23 | ||
24 | ///Represents a fork-join scope which can be used to spawn any number of tasks. See [`scope()`] for more information. | |
25 | /// | |
26 | ///[`scope()`]: fn.scope.html | |
27 | pub struct Scope<'scope> { | |
28 | /// thread where `scope()` was executed (note that individual jobs | |
29 | /// may be executing on different worker threads, though they | |
30 | /// should always be within the same pool of threads) | |
31 | owner_thread_index: usize, | |
32 | ||
33 | /// thread registry where `scope()` was executed. | |
34 | registry: Arc<Registry>, | |
35 | ||
36 | /// if some job panicked, the error is stored here; it will be | |
37 | /// propagated to the one who created the scope | |
38 | panic: AtomicPtr<Box<Any + Send + 'static>>, | |
39 | ||
40 | /// latch to set when the counter drops to zero (and hence this scope is complete) | |
41 | job_completed_latch: CountLatch, | |
42 | ||
43 | /// You can think of a scope as containing a list of closures to execute, | |
44 | /// all of which outlive `'scope`. They're not actually required to be | |
45 | /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because | |
46 | /// the closures are only *moved* across threads to be executed. | |
47 | marker: PhantomData<Box<FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>, | |
48 | } | |
49 | ||
50 | /// Create a "fork-join" scope `s` and invokes the closure with a | |
51 | /// reference to `s`. This closure can then spawn asynchronous tasks | |
52 | /// into `s`. Those tasks may run asynchronously with respect to the | |
53 | /// closure; they may themselves spawn additional tasks into `s`. When | |
54 | /// the closure returns, it will block until all tasks that have been | |
55 | /// spawned into `s` complete. | |
56 | /// | |
57 | /// `scope()` is a more flexible building block compared to `join()`, | |
58 | /// since a loop can be used to spawn any number of tasks without | |
59 | /// recursing. However, that flexibility comes at a performance price: | |
60 | /// tasks spawned using `scope()` must be allocated onto the heap, | |
61 | /// whereas `join()` can make exclusive use of the stack. **Prefer | |
62 | /// `join()` (or, even better, parallel iterators) where possible.** | |
63 | /// | |
64 | /// # Example | |
65 | /// | |
66 | /// The Rayon `join()` function launches two closures and waits for them | |
67 | /// to stop. One could implement `join()` using a scope like so, although | |
68 | /// it would be less efficient than the real implementation: | |
69 | /// | |
70 | /// ```rust | |
71 | /// # use rayon_core as rayon; | |
72 | /// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB) | |
73 | /// where A: FnOnce() -> RA + Send, | |
74 | /// B: FnOnce() -> RB + Send, | |
75 | /// RA: Send, | |
76 | /// RB: Send, | |
77 | /// { | |
78 | /// let mut result_a: Option<RA> = None; | |
79 | /// let mut result_b: Option<RB> = None; | |
80 | /// rayon::scope(|s| { | |
81 | /// s.spawn(|_| result_a = Some(oper_a())); | |
82 | /// s.spawn(|_| result_b = Some(oper_b())); | |
83 | /// }); | |
84 | /// (result_a.unwrap(), result_b.unwrap()) | |
85 | /// } | |
86 | /// ``` | |
87 | /// | |
88 | /// # A note on threading | |
89 | /// | |
90 | /// The closure given to `scope()` executes in the Rayon thread-pool, | |
91 | /// as do those given to `spawn()`. This means that you can't access | |
92 | /// thread-local variables (well, you can, but they may have | |
93 | /// unexpected values). | |
94 | /// | |
95 | /// # Task execution | |
96 | /// | |
97 | /// Task execution potentially starts as soon as `spawn()` is called. | |
98 | /// The task will end sometime before `scope()` returns. Note that the | |
99 | /// *closure* given to scope may return much earlier. In general | |
100 | /// the lifetime of a scope created like `scope(body) goes something like this: | |
101 | /// | |
102 | /// - Scope begins when `scope(body)` is called | |
103 | /// - Scope body `body()` is invoked | |
104 | /// - Scope tasks may be spawned | |
105 | /// - Scope body returns | |
106 | /// - Scope tasks execute, possibly spawning more tasks | |
107 | /// - Once all tasks are done, scope ends and `scope()` returns | |
108 | /// | |
109 | /// To see how and when tasks are joined, consider this example: | |
110 | /// | |
111 | /// ```rust | |
112 | /// # use rayon_core as rayon; | |
113 | /// // point start | |
114 | /// rayon::scope(|s| { | |
115 | /// s.spawn(|s| { // task s.1 | |
116 | /// s.spawn(|s| { // task s.1.1 | |
117 | /// rayon::scope(|t| { | |
118 | /// t.spawn(|_| ()); // task t.1 | |
119 | /// t.spawn(|_| ()); // task t.2 | |
120 | /// }); | |
121 | /// }); | |
122 | /// }); | |
123 | /// s.spawn(|s| { // task 2 | |
124 | /// }); | |
125 | /// // point mid | |
126 | /// }); | |
127 | /// // point end | |
128 | /// ``` | |
129 | /// | |
130 | /// The various tasks that are run will execute roughly like so: | |
131 | /// | |
132 | /// ```notrust | |
133 | /// | (start) | |
134 | /// | | |
135 | /// | (scope `s` created) | |
136 | /// +--------------------+ (task s.1) | |
137 | /// +-------+ (task s.2) | | |
138 | /// | | +---+ (task s.1.1) | |
139 | /// | | | | | |
140 | /// | | | | (scope `t` created) | |
141 | /// | | | +----------------+ (task t.1) | |
142 | /// | | | +---+ (task t.2) | | |
143 | /// | (mid) | | | | | | |
144 | /// : | | + <-+------------+ (scope `t` ends) | |
145 | /// : | | | | |
146 | /// |<------+------------+---+ (scope `s` ends) | |
147 | /// | | |
148 | /// | (end) | |
149 | /// ``` | |
150 | /// | |
151 | /// The point here is that everything spawned into scope `s` will | |
152 | /// terminate (at latest) at the same point -- right before the | |
153 | /// original call to `rayon::scope` returns. This includes new | |
154 | /// subtasks created by other subtasks (e.g., task `s.1.1`). If a new | |
155 | /// scope is created (such as `t`), the things spawned into that scope | |
156 | /// will be joined before that scope returns, which in turn occurs | |
157 | /// before the creating task (task `s.1.1` in this case) finishes. | |
158 | /// | |
159 | /// # Accessing stack data | |
160 | /// | |
161 | /// In general, spawned tasks may access stack data in place that | |
162 | /// outlives the scope itself. Other data must be fully owned by the | |
163 | /// spawned task. | |
164 | /// | |
165 | /// ```rust | |
166 | /// # use rayon_core as rayon; | |
167 | /// let ok: Vec<i32> = vec![1, 2, 3]; | |
168 | /// rayon::scope(|s| { | |
169 | /// let bad: Vec<i32> = vec![4, 5, 6]; | |
170 | /// s.spawn(|_| { | |
171 | /// // We can access `ok` because outlives the scope `s`. | |
172 | /// println!("ok: {:?}", ok); | |
173 | /// | |
174 | /// // If we just try to use `bad` here, the closure will borrow `bad` | |
175 | /// // (because we are just printing it out, and that only requires a | |
176 | /// // borrow), which will result in a compilation error. Read on | |
177 | /// // for options. | |
178 | /// // println!("bad: {:?}", bad); | |
179 | /// }); | |
180 | /// }); | |
181 | /// ``` | |
182 | /// | |
183 | /// As the comments example above suggest, to reference `bad` we must | |
184 | /// take ownership of it. One way to do this is to detach the closure | |
185 | /// from the surrounding stack frame, using the `move` keyword. This | |
186 | /// will cause it to take ownership of *all* the variables it touches, | |
187 | /// in this case including both `ok` *and* `bad`: | |
188 | /// | |
189 | /// ```rust | |
190 | /// # use rayon_core as rayon; | |
191 | /// let ok: Vec<i32> = vec![1, 2, 3]; | |
192 | /// rayon::scope(|s| { | |
193 | /// let bad: Vec<i32> = vec![4, 5, 6]; | |
194 | /// s.spawn(move |_| { | |
195 | /// println!("ok: {:?}", ok); | |
196 | /// println!("bad: {:?}", bad); | |
197 | /// }); | |
198 | /// | |
199 | /// // That closure is fine, but now we can't use `ok` anywhere else, | |
200 | /// // since it is owend by the previous task: | |
201 | /// // s.spawn(|_| println!("ok: {:?}", ok)); | |
202 | /// }); | |
203 | /// ``` | |
204 | /// | |
205 | /// While this works, it could be a problem if we want to use `ok` elsewhere. | |
206 | /// There are two choices. We can keep the closure as a `move` closure, but | |
207 | /// instead of referencing the variable `ok`, we create a shadowed variable that | |
208 | /// is a borrow of `ok` and capture *that*: | |
209 | /// | |
210 | /// ```rust | |
211 | /// # use rayon_core as rayon; | |
212 | /// let ok: Vec<i32> = vec![1, 2, 3]; | |
213 | /// rayon::scope(|s| { | |
214 | /// let bad: Vec<i32> = vec![4, 5, 6]; | |
215 | /// let ok: &Vec<i32> = &ok; // shadow the original `ok` | |
216 | /// s.spawn(move |_| { | |
217 | /// println!("ok: {:?}", ok); // captures the shadowed version | |
218 | /// println!("bad: {:?}", bad); | |
219 | /// }); | |
220 | /// | |
221 | /// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references | |
222 | /// // can be shared freely. Note that we need a `move` closure here though, | |
223 | /// // because otherwise we'd be trying to borrow the shadowed `ok`, | |
224 | /// // and that doesn't outlive `scope`. | |
225 | /// s.spawn(move |_| println!("ok: {:?}", ok)); | |
226 | /// }); | |
227 | /// ``` | |
228 | /// | |
229 | /// Another option is not to use the `move` keyword but instead to take ownership | |
230 | /// of individual variables: | |
231 | /// | |
232 | /// ```rust | |
233 | /// # use rayon_core as rayon; | |
234 | /// let ok: Vec<i32> = vec![1, 2, 3]; | |
235 | /// rayon::scope(|s| { | |
236 | /// let bad: Vec<i32> = vec![4, 5, 6]; | |
237 | /// s.spawn(|_| { | |
238 | /// // Transfer ownership of `bad` into a local variable (also named `bad`). | |
239 | /// // This will force the closure to take ownership of `bad` from the environment. | |
240 | /// let bad = bad; | |
241 | /// println!("ok: {:?}", ok); // `ok` is only borrowed. | |
242 | /// println!("bad: {:?}", bad); // refers to our local variable, above. | |
243 | /// }); | |
244 | /// | |
245 | /// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok` | |
246 | /// }); | |
247 | /// ``` | |
248 | /// | |
249 | /// # Panics | |
250 | /// | |
251 | /// If a panic occurs, either in the closure given to `scope()` or in | |
252 | /// any of the spawned jobs, that panic will be propagated and the | |
253 | /// call to `scope()` will panic. If multiple panics occurs, it is | |
254 | /// non-deterministic which of their panic values will propagate. | |
255 | /// Regardless, once a task is spawned using `scope.spawn()`, it will | |
256 | /// execute, even if the spawning task should later panic. `scope()` | |
257 | /// returns once all spawned jobs have completed, and any panics are | |
258 | /// propagated at that point. | |
259 | pub fn scope<'scope, OP, R>(op: OP) -> R | |
260 | where OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, R: Send, | |
261 | { | |
262 | in_worker(|owner_thread, _| { | |
263 | unsafe { | |
264 | let scope: Scope<'scope> = Scope { | |
265 | owner_thread_index: owner_thread.index(), | |
266 | registry: owner_thread.registry().clone(), | |
267 | panic: AtomicPtr::new(ptr::null_mut()), | |
268 | job_completed_latch: CountLatch::new(), | |
269 | marker: PhantomData, | |
270 | }; | |
271 | let result = scope.execute_job_closure(op); | |
272 | scope.steal_till_jobs_complete(owner_thread); | |
273 | result.unwrap() // only None if `op` panicked, and that would have been propagated | |
274 | } | |
275 | }) | |
276 | } | |
277 | ||
278 | impl<'scope> Scope<'scope> { | |
279 | /// Spawns a job into the fork-join scope `self`. This job will | |
280 | /// execute sometime before the fork-join scope completes. The | |
281 | /// job is specified as a closure, and this closure receives its | |
282 | /// own reference to the scope `self` as argument. This can be | |
283 | /// used to inject new jobs into `self`. | |
284 | /// | |
285 | /// # Returns | |
286 | /// | |
287 | /// Nothing. The spawned closures cannot pass back values to the | |
288 | /// caller directly, though they can write to local variables on | |
289 | /// the stack (if those variables outlive the scope) or | |
290 | /// communicate through shared channels. | |
291 | /// | |
292 | /// (The intention is to eventualy integrate with Rust futures to | |
293 | /// support spawns of functions that compute a value.) | |
294 | /// | |
295 | /// # Examples | |
296 | /// | |
297 | /// ```rust | |
298 | /// # use rayon_core as rayon; | |
299 | /// let mut value_a = None; | |
300 | /// let mut value_b = None; | |
301 | /// let mut value_c = None; | |
302 | /// rayon::scope(|s| { | |
303 | /// s.spawn(|s1| { | |
304 | /// // ^ this is the same scope as `s`; this handle `s1` | |
305 | /// // is intended for use by the spawned task, | |
306 | /// // since scope handles cannot cross thread boundaries. | |
307 | /// | |
308 | /// value_a = Some(22); | |
309 | /// | |
310 | /// // the scope `s` will not end until all these tasks are done | |
311 | /// s1.spawn(|_| { | |
312 | /// value_b = Some(44); | |
313 | /// }); | |
314 | /// }); | |
315 | /// | |
316 | /// s.spawn(|_| { | |
317 | /// value_c = Some(66); | |
318 | /// }); | |
319 | /// }); | |
320 | /// assert_eq!(value_a, Some(22)); | |
321 | /// assert_eq!(value_b, Some(44)); | |
322 | /// assert_eq!(value_c, Some(66)); | |
323 | /// ``` | |
324 | /// | |
325 | /// # See also | |
326 | /// | |
327 | /// The [`scope` function] has more extensive documentation about | |
328 | /// task spawning. | |
329 | /// | |
330 | /// [`scope` function]: fn.scope.html | |
331 | pub fn spawn<BODY>(&self, body: BODY) | |
332 | where BODY: FnOnce(&Scope<'scope>) + Send + 'scope | |
333 | { | |
334 | unsafe { | |
335 | self.job_completed_latch.increment(); | |
336 | let job_ref = Box::new(HeapJob::new(move || self.execute_job(body))) | |
337 | .as_job_ref(); | |
338 | ||
339 | // Since `Scope` implements `Sync`, we can't be sure | |
340 | // that we're still in a thread of this pool, so we | |
341 | // can't just push to the local worker thread. | |
342 | self.registry.inject_or_push(job_ref); | |
343 | } | |
344 | } | |
345 | ||
346 | /// Executes `func` as a job, either aborting or executing as | |
347 | /// appropriate. | |
348 | /// | |
349 | /// Unsafe because it must be executed on a worker thread. | |
350 | unsafe fn execute_job<FUNC>(&self, func: FUNC) | |
351 | where FUNC: FnOnce(&Scope<'scope>) + 'scope | |
352 | { | |
353 | let _: Option<()> = self.execute_job_closure(func); | |
354 | } | |
355 | ||
356 | /// Executes `func` as a job in scope. Adjusts the "job completed" | |
357 | /// counters and also catches any panic and stores it into | |
358 | /// `scope`. | |
359 | /// | |
360 | /// Unsafe because this must be executed on a worker thread. | |
361 | unsafe fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R> | |
362 | where FUNC: FnOnce(&Scope<'scope>) -> R + 'scope | |
363 | { | |
364 | match unwind::halt_unwinding(move || func(self)) { | |
365 | Ok(r) => { self.job_completed_ok(); Some(r) } | |
366 | Err(err) => { self.job_panicked(err); None } | |
367 | } | |
368 | } | |
369 | ||
370 | unsafe fn job_panicked(&self, err: Box<Any + Send + 'static>) { | |
371 | // capture the first error we see, free the rest | |
372 | let nil = ptr::null_mut(); | |
373 | let mut err = Box::new(err); // box up the fat ptr | |
374 | if self.panic.compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed).is_ok() { | |
375 | log!(JobPanickedErrorStored { owner_thread: self.owner_thread_index }); | |
376 | mem::forget(err); // ownership now transferred into self.panic | |
377 | } else { | |
378 | log!(JobPanickedErrorNotStored { owner_thread: self.owner_thread_index }); | |
379 | } | |
380 | ||
381 | ||
382 | self.job_completed_latch.set(); | |
383 | } | |
384 | ||
385 | unsafe fn job_completed_ok(&self) { | |
386 | log!(JobCompletedOk { owner_thread: self.owner_thread_index }); | |
387 | self.job_completed_latch.set(); | |
388 | } | |
389 | ||
390 | unsafe fn steal_till_jobs_complete(&self, owner_thread: &WorkerThread) { | |
391 | // wait for job counter to reach 0: | |
392 | owner_thread.wait_until(&self.job_completed_latch); | |
393 | ||
394 | // propagate panic, if any occurred; at this point, all | |
395 | // outstanding jobs have completed, so we can use a relaxed | |
396 | // ordering: | |
397 | let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); | |
398 | if !panic.is_null() { | |
399 | log!(ScopeCompletePanicked { owner_thread: owner_thread.index() }); | |
400 | let value: Box<Box<Any + Send + 'static>> = mem::transmute(panic); | |
401 | unwind::resume_unwinding(*value); | |
402 | } else { | |
403 | log!(ScopeCompleteNoPanic { owner_thread: owner_thread.index() }); | |
404 | } | |
405 | } | |
406 | } | |
407 | ||
408 | impl<'scope> fmt::Debug for Scope<'scope> { | |
409 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | |
410 | fmt.debug_struct("Scope") | |
411 | .field("pool_id", &self.registry.id()) | |
412 | .field("owner_thread_index", &self.owner_thread_index) | |
413 | .field("panic", &self.panic) | |
414 | .field("job_completed_latch", &self.job_completed_latch) | |
415 | .finish() | |
416 | } | |
417 | } |