]>
Commit | Line | Data |
---|---|---|
2c00a5a8 XL |
1 | //! Methods for custom fork-join scopes, created by the [`scope()`] |
2 | //! function. These are a more flexible alternative to [`join()`]. | |
3 | //! | |
4 | //! [`scope()`]: fn.scope.html | |
5 | //! [`join()`]: ../join/join.fn.html | |
6 | ||
e74abb32 | 7 | use job::{HeapJob, JobFifo}; |
532ac7d7 XL |
8 | use latch::{CountLatch, Latch}; |
9 | use log::Event::*; | |
10 | use registry::{in_worker, Registry, WorkerThread}; | |
2c00a5a8 XL |
11 | use std::any::Any; |
12 | use std::fmt; | |
13 | use std::marker::PhantomData; | |
14 | use std::mem; | |
15 | use std::ptr; | |
2c00a5a8 | 16 | use std::sync::atomic::{AtomicPtr, Ordering}; |
532ac7d7 | 17 | use std::sync::Arc; |
48663c56 | 18 | use tlv; |
e74abb32 | 19 | use unwind; |
2c00a5a8 | 20 | |
532ac7d7 | 21 | mod internal; |
2c00a5a8 XL |
22 | #[cfg(test)] |
23 | mod test; | |
2c00a5a8 | 24 | |
e74abb32 XL |
25 | /// Represents a fork-join scope which can be used to spawn any number of tasks. |
26 | /// See [`scope()`] for more information. | |
2c00a5a8 XL |
27 | /// |
28 | ///[`scope()`]: fn.scope.html | |
29 | pub struct Scope<'scope> { | |
e74abb32 XL |
30 | base: ScopeBase<'scope>, |
31 | } | |
32 | ||
33 | /// Represents a fork-join scope which can be used to spawn any number of tasks. | |
34 | /// Those spawned from the same thread are prioritized in relative FIFO order. | |
35 | /// See [`scope_fifo()`] for more information. | |
36 | /// | |
37 | ///[`scope_fifo()`]: fn.scope_fifo.html | |
38 | pub struct ScopeFifo<'scope> { | |
39 | base: ScopeBase<'scope>, | |
40 | fifos: Vec<JobFifo>, | |
41 | } | |
42 | ||
43 | struct ScopeBase<'scope> { | |
2c00a5a8 XL |
44 | /// thread where `scope()` was executed (note that individual jobs |
45 | /// may be executing on different worker threads, though they | |
46 | /// should always be within the same pool of threads) | |
47 | owner_thread_index: usize, | |
48 | ||
49 | /// thread registry where `scope()` was executed. | |
50 | registry: Arc<Registry>, | |
51 | ||
52 | /// if some job panicked, the error is stored here; it will be | |
53 | /// propagated to the one who created the scope | |
e74abb32 | 54 | panic: AtomicPtr<Box<dyn Any + Send + 'static>>, |
2c00a5a8 XL |
55 | |
56 | /// latch to set when the counter drops to zero (and hence this scope is complete) | |
57 | job_completed_latch: CountLatch, | |
58 | ||
59 | /// You can think of a scope as containing a list of closures to execute, | |
60 | /// all of which outlive `'scope`. They're not actually required to be | |
61 | /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because | |
62 | /// the closures are only *moved* across threads to be executed. | |
e74abb32 | 63 | marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>, |
48663c56 XL |
64 | |
65 | /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. | |
66 | tlv: usize, | |
2c00a5a8 XL |
67 | } |
68 | ||
69 | /// Create a "fork-join" scope `s` and invokes the closure with a | |
70 | /// reference to `s`. This closure can then spawn asynchronous tasks | |
71 | /// into `s`. Those tasks may run asynchronously with respect to the | |
72 | /// closure; they may themselves spawn additional tasks into `s`. When | |
73 | /// the closure returns, it will block until all tasks that have been | |
74 | /// spawned into `s` complete. | |
75 | /// | |
76 | /// `scope()` is a more flexible building block compared to `join()`, | |
77 | /// since a loop can be used to spawn any number of tasks without | |
78 | /// recursing. However, that flexibility comes at a performance price: | |
79 | /// tasks spawned using `scope()` must be allocated onto the heap, | |
80 | /// whereas `join()` can make exclusive use of the stack. **Prefer | |
81 | /// `join()` (or, even better, parallel iterators) where possible.** | |
82 | /// | |
83 | /// # Example | |
84 | /// | |
85 | /// The Rayon `join()` function launches two closures and waits for them | |
86 | /// to stop. One could implement `join()` using a scope like so, although | |
87 | /// it would be less efficient than the real implementation: | |
88 | /// | |
89 | /// ```rust | |
90 | /// # use rayon_core as rayon; | |
91 | /// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB) | |
92 | /// where A: FnOnce() -> RA + Send, | |
93 | /// B: FnOnce() -> RB + Send, | |
94 | /// RA: Send, | |
95 | /// RB: Send, | |
96 | /// { | |
97 | /// let mut result_a: Option<RA> = None; | |
98 | /// let mut result_b: Option<RB> = None; | |
99 | /// rayon::scope(|s| { | |
100 | /// s.spawn(|_| result_a = Some(oper_a())); | |
101 | /// s.spawn(|_| result_b = Some(oper_b())); | |
102 | /// }); | |
103 | /// (result_a.unwrap(), result_b.unwrap()) | |
104 | /// } | |
105 | /// ``` | |
106 | /// | |
107 | /// # A note on threading | |
108 | /// | |
109 | /// The closure given to `scope()` executes in the Rayon thread-pool, | |
110 | /// as do those given to `spawn()`. This means that you can't access | |
111 | /// thread-local variables (well, you can, but they may have | |
112 | /// unexpected values). | |
113 | /// | |
114 | /// # Task execution | |
115 | /// | |
116 | /// Task execution potentially starts as soon as `spawn()` is called. | |
117 | /// The task will end sometime before `scope()` returns. Note that the | |
118 | /// *closure* given to scope may return much earlier. In general | |
119 | /// the lifetime of a scope created like `scope(body) goes something like this: | |
120 | /// | |
121 | /// - Scope begins when `scope(body)` is called | |
122 | /// - Scope body `body()` is invoked | |
123 | /// - Scope tasks may be spawned | |
124 | /// - Scope body returns | |
125 | /// - Scope tasks execute, possibly spawning more tasks | |
126 | /// - Once all tasks are done, scope ends and `scope()` returns | |
127 | /// | |
128 | /// To see how and when tasks are joined, consider this example: | |
129 | /// | |
130 | /// ```rust | |
131 | /// # use rayon_core as rayon; | |
132 | /// // point start | |
133 | /// rayon::scope(|s| { | |
134 | /// s.spawn(|s| { // task s.1 | |
135 | /// s.spawn(|s| { // task s.1.1 | |
136 | /// rayon::scope(|t| { | |
137 | /// t.spawn(|_| ()); // task t.1 | |
138 | /// t.spawn(|_| ()); // task t.2 | |
139 | /// }); | |
140 | /// }); | |
141 | /// }); | |
e74abb32 | 142 | /// s.spawn(|s| { // task s.2 |
2c00a5a8 XL |
143 | /// }); |
144 | /// // point mid | |
145 | /// }); | |
146 | /// // point end | |
147 | /// ``` | |
148 | /// | |
149 | /// The various tasks that are run will execute roughly like so: | |
150 | /// | |
151 | /// ```notrust | |
152 | /// | (start) | |
153 | /// | | |
154 | /// | (scope `s` created) | |
e74abb32 XL |
155 | /// +-----------------------------------------------+ (task s.2) |
156 | /// +-------+ (task s.1) | | |
157 | /// | | | | |
158 | /// | +---+ (task s.1.1) | | |
159 | /// | | | | | |
160 | /// | | | (scope `t` created) | | |
161 | /// | | +----------------+ (task t.2) | | |
162 | /// | | +---+ (task t.1) | | | |
163 | /// | (mid) | | | | | | |
164 | /// : | + <-+------------+ (scope `t` ends) | | |
165 | /// : | | | | |
166 | /// |<------+---+-----------------------------------+ (scope `s` ends) | |
2c00a5a8 XL |
167 | /// | |
168 | /// | (end) | |
169 | /// ``` | |
170 | /// | |
171 | /// The point here is that everything spawned into scope `s` will | |
172 | /// terminate (at latest) at the same point -- right before the | |
173 | /// original call to `rayon::scope` returns. This includes new | |
174 | /// subtasks created by other subtasks (e.g., task `s.1.1`). If a new | |
175 | /// scope is created (such as `t`), the things spawned into that scope | |
176 | /// will be joined before that scope returns, which in turn occurs | |
177 | /// before the creating task (task `s.1.1` in this case) finishes. | |
178 | /// | |
e74abb32 XL |
179 | /// There is no guaranteed order of execution for spawns in a scope, |
180 | /// given that other threads may steal tasks at any time. However, they | |
181 | /// are generally prioritized in a LIFO order on the thread from which | |
182 | /// they were spawned. So in this example, absent any stealing, we can | |
183 | /// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other | |
184 | /// threads always steal from the other end of the deque, like FIFO | |
185 | /// order. The idea is that "recent" tasks are most likely to be fresh | |
186 | /// in the local CPU's cache, while other threads can steal older | |
187 | /// "stale" tasks. For an alternate approach, consider | |
188 | /// [`scope_fifo()`] instead. | |
189 | /// | |
190 | /// [`scope_fifo()`]: fn.scope_fifo.html | |
191 | /// | |
2c00a5a8 XL |
192 | /// # Accessing stack data |
193 | /// | |
194 | /// In general, spawned tasks may access stack data in place that | |
195 | /// outlives the scope itself. Other data must be fully owned by the | |
196 | /// spawned task. | |
197 | /// | |
198 | /// ```rust | |
199 | /// # use rayon_core as rayon; | |
200 | /// let ok: Vec<i32> = vec![1, 2, 3]; | |
201 | /// rayon::scope(|s| { | |
202 | /// let bad: Vec<i32> = vec![4, 5, 6]; | |
203 | /// s.spawn(|_| { | |
204 | /// // We can access `ok` because outlives the scope `s`. | |
205 | /// println!("ok: {:?}", ok); | |
206 | /// | |
207 | /// // If we just try to use `bad` here, the closure will borrow `bad` | |
208 | /// // (because we are just printing it out, and that only requires a | |
209 | /// // borrow), which will result in a compilation error. Read on | |
210 | /// // for options. | |
211 | /// // println!("bad: {:?}", bad); | |
212 | /// }); | |
213 | /// }); | |
214 | /// ``` | |
215 | /// | |
216 | /// As the comments example above suggest, to reference `bad` we must | |
217 | /// take ownership of it. One way to do this is to detach the closure | |
218 | /// from the surrounding stack frame, using the `move` keyword. This | |
219 | /// will cause it to take ownership of *all* the variables it touches, | |
220 | /// in this case including both `ok` *and* `bad`: | |
221 | /// | |
222 | /// ```rust | |
223 | /// # use rayon_core as rayon; | |
224 | /// let ok: Vec<i32> = vec![1, 2, 3]; | |
225 | /// rayon::scope(|s| { | |
226 | /// let bad: Vec<i32> = vec![4, 5, 6]; | |
227 | /// s.spawn(move |_| { | |
228 | /// println!("ok: {:?}", ok); | |
229 | /// println!("bad: {:?}", bad); | |
230 | /// }); | |
231 | /// | |
232 | /// // That closure is fine, but now we can't use `ok` anywhere else, | |
233 | /// // since it is owend by the previous task: | |
234 | /// // s.spawn(|_| println!("ok: {:?}", ok)); | |
235 | /// }); | |
236 | /// ``` | |
237 | /// | |
238 | /// While this works, it could be a problem if we want to use `ok` elsewhere. | |
239 | /// There are two choices. We can keep the closure as a `move` closure, but | |
240 | /// instead of referencing the variable `ok`, we create a shadowed variable that | |
241 | /// is a borrow of `ok` and capture *that*: | |
242 | /// | |
243 | /// ```rust | |
244 | /// # use rayon_core as rayon; | |
245 | /// let ok: Vec<i32> = vec![1, 2, 3]; | |
246 | /// rayon::scope(|s| { | |
247 | /// let bad: Vec<i32> = vec![4, 5, 6]; | |
248 | /// let ok: &Vec<i32> = &ok; // shadow the original `ok` | |
249 | /// s.spawn(move |_| { | |
250 | /// println!("ok: {:?}", ok); // captures the shadowed version | |
251 | /// println!("bad: {:?}", bad); | |
252 | /// }); | |
253 | /// | |
254 | /// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references | |
255 | /// // can be shared freely. Note that we need a `move` closure here though, | |
256 | /// // because otherwise we'd be trying to borrow the shadowed `ok`, | |
257 | /// // and that doesn't outlive `scope`. | |
258 | /// s.spawn(move |_| println!("ok: {:?}", ok)); | |
259 | /// }); | |
260 | /// ``` | |
261 | /// | |
262 | /// Another option is not to use the `move` keyword but instead to take ownership | |
263 | /// of individual variables: | |
264 | /// | |
265 | /// ```rust | |
266 | /// # use rayon_core as rayon; | |
267 | /// let ok: Vec<i32> = vec![1, 2, 3]; | |
268 | /// rayon::scope(|s| { | |
269 | /// let bad: Vec<i32> = vec![4, 5, 6]; | |
270 | /// s.spawn(|_| { | |
271 | /// // Transfer ownership of `bad` into a local variable (also named `bad`). | |
272 | /// // This will force the closure to take ownership of `bad` from the environment. | |
273 | /// let bad = bad; | |
274 | /// println!("ok: {:?}", ok); // `ok` is only borrowed. | |
275 | /// println!("bad: {:?}", bad); // refers to our local variable, above. | |
276 | /// }); | |
277 | /// | |
278 | /// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok` | |
279 | /// }); | |
280 | /// ``` | |
281 | /// | |
282 | /// # Panics | |
283 | /// | |
284 | /// If a panic occurs, either in the closure given to `scope()` or in | |
285 | /// any of the spawned jobs, that panic will be propagated and the | |
286 | /// call to `scope()` will panic. If multiple panics occurs, it is | |
287 | /// non-deterministic which of their panic values will propagate. | |
288 | /// Regardless, once a task is spawned using `scope.spawn()`, it will | |
289 | /// execute, even if the spawning task should later panic. `scope()` | |
290 | /// returns once all spawned jobs have completed, and any panics are | |
291 | /// propagated at that point. | |
292 | pub fn scope<'scope, OP, R>(op: OP) -> R | |
532ac7d7 XL |
293 | where |
294 | OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, | |
295 | R: Send, | |
2c00a5a8 XL |
296 | { |
297 | in_worker(|owner_thread, _| { | |
e74abb32 XL |
298 | let scope = Scope::<'scope>::new(owner_thread); |
299 | unsafe { scope.base.complete(owner_thread, || op(&scope)) } | |
300 | }) | |
301 | } | |
302 | ||
303 | /// Create a "fork-join" scope `s` with FIFO order, and invokes the | |
304 | /// closure with a reference to `s`. This closure can then spawn | |
305 | /// asynchronous tasks into `s`. Those tasks may run asynchronously with | |
306 | /// respect to the closure; they may themselves spawn additional tasks | |
307 | /// into `s`. When the closure returns, it will block until all tasks | |
308 | /// that have been spawned into `s` complete. | |
309 | /// | |
310 | /// # Task execution | |
311 | /// | |
312 | /// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a | |
313 | /// difference in the order of execution. Consider a similar example: | |
314 | /// | |
315 | /// [`scope()`]: fn.scope.html | |
316 | /// | |
317 | /// ```rust | |
318 | /// # use rayon_core as rayon; | |
319 | /// // point start | |
320 | /// rayon::scope_fifo(|s| { | |
321 | /// s.spawn_fifo(|s| { // task s.1 | |
322 | /// s.spawn_fifo(|s| { // task s.1.1 | |
323 | /// rayon::scope_fifo(|t| { | |
324 | /// t.spawn_fifo(|_| ()); // task t.1 | |
325 | /// t.spawn_fifo(|_| ()); // task t.2 | |
326 | /// }); | |
327 | /// }); | |
328 | /// }); | |
329 | /// s.spawn_fifo(|s| { // task s.2 | |
330 | /// }); | |
331 | /// // point mid | |
332 | /// }); | |
333 | /// // point end | |
334 | /// ``` | |
335 | /// | |
336 | /// The various tasks that are run will execute roughly like so: | |
337 | /// | |
338 | /// ```notrust | |
339 | /// | (start) | |
340 | /// | | |
341 | /// | (FIFO scope `s` created) | |
342 | /// +--------------------+ (task s.1) | |
343 | /// +-------+ (task s.2) | | |
344 | /// | | +---+ (task s.1.1) | |
345 | /// | | | | | |
346 | /// | | | | (FIFO scope `t` created) | |
347 | /// | | | +----------------+ (task t.1) | |
348 | /// | | | +---+ (task t.2) | | |
349 | /// | (mid) | | | | | | |
350 | /// : | | + <-+------------+ (scope `t` ends) | |
351 | /// : | | | | |
352 | /// |<------+------------+---+ (scope `s` ends) | |
353 | /// | | |
354 | /// | (end) | |
355 | /// ``` | |
356 | /// | |
357 | /// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on | |
358 | /// the thread from which they were spawned, as opposed to `scope()`'s | |
359 | /// LIFO. So in this example, we can expect `s.1` to execute before | |
360 | /// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in | |
361 | /// FIFO order, as usual. Overall, this has roughly the same order as | |
362 | /// the now-deprecated [`breadth_first`] option, except the effect is | |
363 | /// isolated to a particular scope. If spawns are intermingled from any | |
364 | /// combination of `scope()` and `scope_fifo()`, or from different | |
365 | /// threads, their order is only specified with respect to spawns in the | |
366 | /// same scope and thread. | |
367 | /// | |
368 | /// For more details on this design, see Rayon [RFC #1]. | |
369 | /// | |
370 | /// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first | |
371 | /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md | |
372 | /// | |
373 | /// # Panics | |
374 | /// | |
375 | /// If a panic occurs, either in the closure given to `scope_fifo()` or | |
376 | /// in any of the spawned jobs, that panic will be propagated and the | |
377 | /// call to `scope_fifo()` will panic. If multiple panics occurs, it is | |
378 | /// non-deterministic which of their panic values will propagate. | |
379 | /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it | |
380 | /// will execute, even if the spawning task should later panic. | |
381 | /// `scope_fifo()` returns once all spawned jobs have completed, and any | |
382 | /// panics are propagated at that point. | |
383 | pub fn scope_fifo<'scope, OP, R>(op: OP) -> R | |
384 | where | |
385 | OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send, | |
386 | R: Send, | |
387 | { | |
388 | in_worker(|owner_thread, _| { | |
389 | let scope = ScopeFifo::<'scope>::new(owner_thread); | |
390 | unsafe { scope.base.complete(owner_thread, || op(&scope)) } | |
2c00a5a8 XL |
391 | }) |
392 | } | |
393 | ||
394 | impl<'scope> Scope<'scope> { | |
e74abb32 XL |
395 | fn new(owner_thread: &WorkerThread) -> Self { |
396 | Scope { | |
397 | base: ScopeBase::new(owner_thread), | |
398 | } | |
399 | } | |
400 | ||
2c00a5a8 XL |
401 | /// Spawns a job into the fork-join scope `self`. This job will |
402 | /// execute sometime before the fork-join scope completes. The | |
403 | /// job is specified as a closure, and this closure receives its | |
404 | /// own reference to the scope `self` as argument. This can be | |
405 | /// used to inject new jobs into `self`. | |
406 | /// | |
407 | /// # Returns | |
408 | /// | |
409 | /// Nothing. The spawned closures cannot pass back values to the | |
410 | /// caller directly, though they can write to local variables on | |
411 | /// the stack (if those variables outlive the scope) or | |
412 | /// communicate through shared channels. | |
413 | /// | |
414 | /// (The intention is to eventualy integrate with Rust futures to | |
415 | /// support spawns of functions that compute a value.) | |
416 | /// | |
417 | /// # Examples | |
418 | /// | |
419 | /// ```rust | |
420 | /// # use rayon_core as rayon; | |
421 | /// let mut value_a = None; | |
422 | /// let mut value_b = None; | |
423 | /// let mut value_c = None; | |
424 | /// rayon::scope(|s| { | |
425 | /// s.spawn(|s1| { | |
426 | /// // ^ this is the same scope as `s`; this handle `s1` | |
427 | /// // is intended for use by the spawned task, | |
428 | /// // since scope handles cannot cross thread boundaries. | |
429 | /// | |
430 | /// value_a = Some(22); | |
431 | /// | |
432 | /// // the scope `s` will not end until all these tasks are done | |
433 | /// s1.spawn(|_| { | |
434 | /// value_b = Some(44); | |
435 | /// }); | |
436 | /// }); | |
437 | /// | |
438 | /// s.spawn(|_| { | |
439 | /// value_c = Some(66); | |
440 | /// }); | |
441 | /// }); | |
442 | /// assert_eq!(value_a, Some(22)); | |
443 | /// assert_eq!(value_b, Some(44)); | |
444 | /// assert_eq!(value_c, Some(66)); | |
445 | /// ``` | |
446 | /// | |
447 | /// # See also | |
448 | /// | |
449 | /// The [`scope` function] has more extensive documentation about | |
450 | /// task spawning. | |
451 | /// | |
452 | /// [`scope` function]: fn.scope.html | |
453 | pub fn spawn<BODY>(&self, body: BODY) | |
532ac7d7 XL |
454 | where |
455 | BODY: FnOnce(&Scope<'scope>) + Send + 'scope, | |
2c00a5a8 | 456 | { |
e74abb32 | 457 | self.base.increment(); |
2c00a5a8 | 458 | unsafe { |
e74abb32 XL |
459 | let job_ref = Box::new(HeapJob::new(self.base.tlv, move || { |
460 | self.base.execute_job(move || body(self)) | |
461 | })) | |
462 | .as_job_ref(); | |
2c00a5a8 | 463 | |
e74abb32 XL |
464 | // Since `Scope` implements `Sync`, we can't be sure that we're still in a |
465 | // thread of this pool, so we can't just push to the local worker thread. | |
466 | self.base.registry.inject_or_push(job_ref); | |
2c00a5a8 XL |
467 | } |
468 | } | |
e74abb32 XL |
469 | } |
470 | ||
471 | impl<'scope> ScopeFifo<'scope> { | |
472 | fn new(owner_thread: &WorkerThread) -> Self { | |
473 | let num_threads = owner_thread.registry().num_threads(); | |
474 | ScopeFifo { | |
475 | base: ScopeBase::new(owner_thread), | |
476 | fifos: (0..num_threads).map(|_| JobFifo::new()).collect(), | |
477 | } | |
478 | } | |
479 | ||
480 | /// Spawns a job into the fork-join scope `self`. This job will | |
481 | /// execute sometime before the fork-join scope completes. The | |
482 | /// job is specified as a closure, and this closure receives its | |
483 | /// own reference to the scope `self` as argument. This can be | |
484 | /// used to inject new jobs into `self`. | |
485 | /// | |
486 | /// # See also | |
487 | /// | |
488 | /// This method is akin to [`Scope::spawn()`], but with a FIFO | |
489 | /// priority. The [`scope_fifo` function] has more details about | |
490 | /// this distinction. | |
491 | /// | |
492 | /// [`Scope::spawn()`]: struct.Scope.html#method.spawn | |
493 | /// [`scope_fifo` function]: fn.scope.html | |
494 | pub fn spawn_fifo<BODY>(&self, body: BODY) | |
495 | where | |
496 | BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, | |
497 | { | |
498 | self.base.increment(); | |
499 | unsafe { | |
500 | let job_ref = Box::new(HeapJob::new(self.base.tlv, move || { | |
501 | self.base.execute_job(move || body(self)) | |
502 | })) | |
503 | .as_job_ref(); | |
504 | ||
505 | // If we're in the pool, use our scope's private fifo for this thread to execute | |
506 | // in a locally-FIFO order. Otherwise, just use the pool's global injector. | |
507 | match self.base.registry.current_thread() { | |
508 | Some(worker) => { | |
509 | let fifo = &self.fifos[worker.index()]; | |
510 | worker.push(fifo.push(job_ref)); | |
511 | } | |
512 | None => self.base.registry.inject(&[job_ref]), | |
513 | } | |
514 | } | |
515 | } | |
516 | } | |
517 | ||
518 | impl<'scope> ScopeBase<'scope> { | |
519 | /// Create the base of a new scope for the given worker thread | |
520 | fn new(owner_thread: &WorkerThread) -> Self { | |
521 | ScopeBase { | |
522 | owner_thread_index: owner_thread.index(), | |
523 | registry: owner_thread.registry().clone(), | |
524 | panic: AtomicPtr::new(ptr::null_mut()), | |
525 | job_completed_latch: CountLatch::new(), | |
526 | marker: PhantomData, | |
527 | tlv: tlv::get(), | |
528 | } | |
529 | } | |
530 | ||
531 | fn increment(&self) { | |
532 | self.job_completed_latch.increment(); | |
533 | } | |
534 | ||
535 | /// Executes `func` as a job, either aborting or executing as | |
536 | /// appropriate. | |
537 | /// | |
538 | /// Unsafe because it must be executed on a worker thread. | |
539 | unsafe fn complete<FUNC, R>(&self, owner_thread: &WorkerThread, func: FUNC) -> R | |
540 | where | |
541 | FUNC: FnOnce() -> R, | |
542 | { | |
543 | let result = self.execute_job_closure(func); | |
544 | self.steal_till_jobs_complete(owner_thread); | |
545 | // Restore the TLV if we ran some jobs while waiting | |
546 | tlv::set(self.tlv); | |
547 | result.unwrap() // only None if `op` panicked, and that would have been propagated | |
548 | } | |
2c00a5a8 XL |
549 | |
550 | /// Executes `func` as a job, either aborting or executing as | |
551 | /// appropriate. | |
552 | /// | |
553 | /// Unsafe because it must be executed on a worker thread. | |
554 | unsafe fn execute_job<FUNC>(&self, func: FUNC) | |
532ac7d7 | 555 | where |
e74abb32 | 556 | FUNC: FnOnce(), |
2c00a5a8 XL |
557 | { |
558 | let _: Option<()> = self.execute_job_closure(func); | |
559 | } | |
560 | ||
561 | /// Executes `func` as a job in scope. Adjusts the "job completed" | |
562 | /// counters and also catches any panic and stores it into | |
563 | /// `scope`. | |
564 | /// | |
565 | /// Unsafe because this must be executed on a worker thread. | |
566 | unsafe fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R> | |
532ac7d7 | 567 | where |
e74abb32 | 568 | FUNC: FnOnce() -> R, |
2c00a5a8 | 569 | { |
e74abb32 | 570 | match unwind::halt_unwinding(func) { |
532ac7d7 XL |
571 | Ok(r) => { |
572 | self.job_completed_ok(); | |
573 | Some(r) | |
574 | } | |
575 | Err(err) => { | |
576 | self.job_panicked(err); | |
577 | None | |
578 | } | |
2c00a5a8 XL |
579 | } |
580 | } | |
581 | ||
e74abb32 | 582 | unsafe fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) { |
2c00a5a8 XL |
583 | // capture the first error we see, free the rest |
584 | let nil = ptr::null_mut(); | |
585 | let mut err = Box::new(err); // box up the fat ptr | |
532ac7d7 XL |
586 | if self |
587 | .panic | |
588 | .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed) | |
589 | .is_ok() | |
590 | { | |
591 | log!(JobPanickedErrorStored { | |
592 | owner_thread: self.owner_thread_index | |
593 | }); | |
2c00a5a8 XL |
594 | mem::forget(err); // ownership now transferred into self.panic |
595 | } else { | |
532ac7d7 XL |
596 | log!(JobPanickedErrorNotStored { |
597 | owner_thread: self.owner_thread_index | |
598 | }); | |
2c00a5a8 XL |
599 | } |
600 | ||
2c00a5a8 XL |
601 | self.job_completed_latch.set(); |
602 | } | |
603 | ||
604 | unsafe fn job_completed_ok(&self) { | |
532ac7d7 XL |
605 | log!(JobCompletedOk { |
606 | owner_thread: self.owner_thread_index | |
607 | }); | |
2c00a5a8 XL |
608 | self.job_completed_latch.set(); |
609 | } | |
610 | ||
611 | unsafe fn steal_till_jobs_complete(&self, owner_thread: &WorkerThread) { | |
612 | // wait for job counter to reach 0: | |
613 | owner_thread.wait_until(&self.job_completed_latch); | |
614 | ||
615 | // propagate panic, if any occurred; at this point, all | |
616 | // outstanding jobs have completed, so we can use a relaxed | |
617 | // ordering: | |
618 | let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); | |
619 | if !panic.is_null() { | |
532ac7d7 XL |
620 | log!(ScopeCompletePanicked { |
621 | owner_thread: owner_thread.index() | |
622 | }); | |
48663c56 XL |
623 | // Restore the TLV if we ran some jobs while waiting |
624 | tlv::set(self.tlv); | |
e74abb32 | 625 | let value: Box<Box<dyn Any + Send + 'static>> = mem::transmute(panic); |
2c00a5a8 XL |
626 | unwind::resume_unwinding(*value); |
627 | } else { | |
532ac7d7 XL |
628 | log!(ScopeCompleteNoPanic { |
629 | owner_thread: owner_thread.index() | |
630 | }); | |
2c00a5a8 XL |
631 | } |
632 | } | |
633 | } | |
634 | ||
635 | impl<'scope> fmt::Debug for Scope<'scope> { | |
e74abb32 | 636 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
2c00a5a8 | 637 | fmt.debug_struct("Scope") |
e74abb32 XL |
638 | .field("pool_id", &self.base.registry.id()) |
639 | .field("owner_thread_index", &self.base.owner_thread_index) | |
640 | .field("panic", &self.base.panic) | |
641 | .field("job_completed_latch", &self.base.job_completed_latch) | |
642 | .finish() | |
643 | } | |
644 | } | |
645 | ||
646 | impl<'scope> fmt::Debug for ScopeFifo<'scope> { | |
647 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | |
648 | fmt.debug_struct("ScopeFifo") | |
649 | .field("num_fifos", &self.fifos.len()) | |
650 | .field("pool_id", &self.base.registry.id()) | |
651 | .field("owner_thread_index", &self.base.owner_thread_index) | |
652 | .field("panic", &self.base.panic) | |
653 | .field("job_completed_latch", &self.base.job_completed_latch) | |
2c00a5a8 XL |
654 | .finish() |
655 | } | |
656 | } |