]> git.proxmox.com Git - rustc.git/blob - vendor/rayon-core/src/scope/mod.rs
f8d90ce204bf1350f274ad4b9316b1c63fe3b1ea
[rustc.git] / vendor / rayon-core / src / scope / mod.rs
1 //! Methods for custom fork-join scopes, created by the [`scope()`]
2 //! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3 //!
4 //! [`scope()`]: fn.scope.html
5 //! [`in_place_scope()`]: fn.in_place_scope.html
6 //! [`join()`]: ../join/join.fn.html
7
8 use crate::job::{HeapJob, JobFifo};
9 use crate::latch::{CountLatch, CountLockLatch, Latch};
10 use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
11 use crate::unwind;
12 use std::any::Any;
13 use std::fmt;
14 use std::marker::PhantomData;
15 use std::mem;
16 use std::ptr;
17 use std::sync::atomic::{AtomicPtr, Ordering};
18 use std::sync::Arc;
19
20 #[cfg(test)]
21 mod test;
22
23 /// Represents a fork-join scope which can be used to spawn any number of tasks.
24 /// See [`scope()`] for more information.
25 ///
26 ///[`scope()`]: fn.scope.html
27 pub struct Scope<'scope> {
28 base: ScopeBase<'scope>,
29 }
30
31 /// Represents a fork-join scope which can be used to spawn any number of tasks.
32 /// Those spawned from the same thread are prioritized in relative FIFO order.
33 /// See [`scope_fifo()`] for more information.
34 ///
35 ///[`scope_fifo()`]: fn.scope_fifo.html
36 pub struct ScopeFifo<'scope> {
37 base: ScopeBase<'scope>,
38 fifos: Vec<JobFifo>,
39 }
40
41 enum ScopeLatch {
42 /// A latch for scopes created on a rayon thread which will participate in work-
43 /// stealing while it waits for completion. This thread is not necessarily part
44 /// of the same registry as the scope itself!
45 Stealing {
46 latch: CountLatch,
47 /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
48 /// with registry B, when a job completes in a thread of registry B, we may
49 /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A.
50 /// That means we need a reference to registry A (since at that point we will
51 /// only have a reference to registry B), so we stash it here.
52 registry: Arc<Registry>,
53 /// The index of the worker to wake in `registry`
54 worker_index: usize,
55 },
56
57 /// A latch for scopes created on a non-rayon thread which will block to wait.
58 Blocking { latch: CountLockLatch },
59 }
60
61 struct ScopeBase<'scope> {
62 /// thread registry where `scope()` was executed or where `in_place_scope()`
63 /// should spawn jobs.
64 registry: Arc<Registry>,
65
66 /// if some job panicked, the error is stored here; it will be
67 /// propagated to the one who created the scope
68 panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
69
70 /// latch to track job counts
71 job_completed_latch: ScopeLatch,
72
73 /// You can think of a scope as containing a list of closures to execute,
74 /// all of which outlive `'scope`. They're not actually required to be
75 /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
76 /// the closures are only *moved* across threads to be executed.
77 marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
78 }
79
80 /// Creates a "fork-join" scope `s` and invokes the closure with a
81 /// reference to `s`. This closure can then spawn asynchronous tasks
82 /// into `s`. Those tasks may run asynchronously with respect to the
83 /// closure; they may themselves spawn additional tasks into `s`. When
84 /// the closure returns, it will block until all tasks that have been
85 /// spawned into `s` complete.
86 ///
87 /// `scope()` is a more flexible building block compared to `join()`,
88 /// since a loop can be used to spawn any number of tasks without
89 /// recursing. However, that flexibility comes at a performance price:
90 /// tasks spawned using `scope()` must be allocated onto the heap,
91 /// whereas `join()` can make exclusive use of the stack. **Prefer
92 /// `join()` (or, even better, parallel iterators) where possible.**
93 ///
94 /// # Example
95 ///
96 /// The Rayon `join()` function launches two closures and waits for them
97 /// to stop. One could implement `join()` using a scope like so, although
98 /// it would be less efficient than the real implementation:
99 ///
100 /// ```rust
101 /// # use rayon_core as rayon;
102 /// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
103 /// where A: FnOnce() -> RA + Send,
104 /// B: FnOnce() -> RB + Send,
105 /// RA: Send,
106 /// RB: Send,
107 /// {
108 /// let mut result_a: Option<RA> = None;
109 /// let mut result_b: Option<RB> = None;
110 /// rayon::scope(|s| {
111 /// s.spawn(|_| result_a = Some(oper_a()));
112 /// s.spawn(|_| result_b = Some(oper_b()));
113 /// });
114 /// (result_a.unwrap(), result_b.unwrap())
115 /// }
116 /// ```
117 ///
118 /// # A note on threading
119 ///
120 /// The closure given to `scope()` executes in the Rayon thread-pool,
121 /// as do those given to `spawn()`. This means that you can't access
122 /// thread-local variables (well, you can, but they may have
123 /// unexpected values).
124 ///
125 /// # Task execution
126 ///
127 /// Task execution potentially starts as soon as `spawn()` is called.
128 /// The task will end sometime before `scope()` returns. Note that the
129 /// *closure* given to scope may return much earlier. In general
130 /// the lifetime of a scope created like `scope(body) goes something like this:
131 ///
132 /// - Scope begins when `scope(body)` is called
133 /// - Scope body `body()` is invoked
134 /// - Scope tasks may be spawned
135 /// - Scope body returns
136 /// - Scope tasks execute, possibly spawning more tasks
137 /// - Once all tasks are done, scope ends and `scope()` returns
138 ///
139 /// To see how and when tasks are joined, consider this example:
140 ///
141 /// ```rust
142 /// # use rayon_core as rayon;
143 /// // point start
144 /// rayon::scope(|s| {
145 /// s.spawn(|s| { // task s.1
146 /// s.spawn(|s| { // task s.1.1
147 /// rayon::scope(|t| {
148 /// t.spawn(|_| ()); // task t.1
149 /// t.spawn(|_| ()); // task t.2
150 /// });
151 /// });
152 /// });
153 /// s.spawn(|s| { // task s.2
154 /// });
155 /// // point mid
156 /// });
157 /// // point end
158 /// ```
159 ///
160 /// The various tasks that are run will execute roughly like so:
161 ///
162 /// ```notrust
163 /// | (start)
164 /// |
165 /// | (scope `s` created)
166 /// +-----------------------------------------------+ (task s.2)
167 /// +-------+ (task s.1) |
168 /// | | |
169 /// | +---+ (task s.1.1) |
170 /// | | | |
171 /// | | | (scope `t` created) |
172 /// | | +----------------+ (task t.2) |
173 /// | | +---+ (task t.1) | |
174 /// | (mid) | | | | |
175 /// : | + <-+------------+ (scope `t` ends) |
176 /// : | | |
177 /// |<------+---+-----------------------------------+ (scope `s` ends)
178 /// |
179 /// | (end)
180 /// ```
181 ///
182 /// The point here is that everything spawned into scope `s` will
183 /// terminate (at latest) at the same point -- right before the
184 /// original call to `rayon::scope` returns. This includes new
185 /// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
186 /// scope is created (such as `t`), the things spawned into that scope
187 /// will be joined before that scope returns, which in turn occurs
188 /// before the creating task (task `s.1.1` in this case) finishes.
189 ///
190 /// There is no guaranteed order of execution for spawns in a scope,
191 /// given that other threads may steal tasks at any time. However, they
192 /// are generally prioritized in a LIFO order on the thread from which
193 /// they were spawned. So in this example, absent any stealing, we can
194 /// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
195 /// threads always steal from the other end of the deque, like FIFO
196 /// order. The idea is that "recent" tasks are most likely to be fresh
197 /// in the local CPU's cache, while other threads can steal older
198 /// "stale" tasks. For an alternate approach, consider
199 /// [`scope_fifo()`] instead.
200 ///
201 /// [`scope_fifo()`]: fn.scope_fifo.html
202 ///
203 /// # Accessing stack data
204 ///
205 /// In general, spawned tasks may access stack data in place that
206 /// outlives the scope itself. Other data must be fully owned by the
207 /// spawned task.
208 ///
209 /// ```rust
210 /// # use rayon_core as rayon;
211 /// let ok: Vec<i32> = vec![1, 2, 3];
212 /// rayon::scope(|s| {
213 /// let bad: Vec<i32> = vec![4, 5, 6];
214 /// s.spawn(|_| {
215 /// // We can access `ok` because outlives the scope `s`.
216 /// println!("ok: {:?}", ok);
217 ///
218 /// // If we just try to use `bad` here, the closure will borrow `bad`
219 /// // (because we are just printing it out, and that only requires a
220 /// // borrow), which will result in a compilation error. Read on
221 /// // for options.
222 /// // println!("bad: {:?}", bad);
223 /// });
224 /// });
225 /// ```
226 ///
227 /// As the comments example above suggest, to reference `bad` we must
228 /// take ownership of it. One way to do this is to detach the closure
229 /// from the surrounding stack frame, using the `move` keyword. This
230 /// will cause it to take ownership of *all* the variables it touches,
231 /// in this case including both `ok` *and* `bad`:
232 ///
233 /// ```rust
234 /// # use rayon_core as rayon;
235 /// let ok: Vec<i32> = vec![1, 2, 3];
236 /// rayon::scope(|s| {
237 /// let bad: Vec<i32> = vec![4, 5, 6];
238 /// s.spawn(move |_| {
239 /// println!("ok: {:?}", ok);
240 /// println!("bad: {:?}", bad);
241 /// });
242 ///
243 /// // That closure is fine, but now we can't use `ok` anywhere else,
244 /// // since it is owend by the previous task:
245 /// // s.spawn(|_| println!("ok: {:?}", ok));
246 /// });
247 /// ```
248 ///
249 /// While this works, it could be a problem if we want to use `ok` elsewhere.
250 /// There are two choices. We can keep the closure as a `move` closure, but
251 /// instead of referencing the variable `ok`, we create a shadowed variable that
252 /// is a borrow of `ok` and capture *that*:
253 ///
254 /// ```rust
255 /// # use rayon_core as rayon;
256 /// let ok: Vec<i32> = vec![1, 2, 3];
257 /// rayon::scope(|s| {
258 /// let bad: Vec<i32> = vec![4, 5, 6];
259 /// let ok: &Vec<i32> = &ok; // shadow the original `ok`
260 /// s.spawn(move |_| {
261 /// println!("ok: {:?}", ok); // captures the shadowed version
262 /// println!("bad: {:?}", bad);
263 /// });
264 ///
265 /// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
266 /// // can be shared freely. Note that we need a `move` closure here though,
267 /// // because otherwise we'd be trying to borrow the shadowed `ok`,
268 /// // and that doesn't outlive `scope`.
269 /// s.spawn(move |_| println!("ok: {:?}", ok));
270 /// });
271 /// ```
272 ///
273 /// Another option is not to use the `move` keyword but instead to take ownership
274 /// of individual variables:
275 ///
276 /// ```rust
277 /// # use rayon_core as rayon;
278 /// let ok: Vec<i32> = vec![1, 2, 3];
279 /// rayon::scope(|s| {
280 /// let bad: Vec<i32> = vec![4, 5, 6];
281 /// s.spawn(|_| {
282 /// // Transfer ownership of `bad` into a local variable (also named `bad`).
283 /// // This will force the closure to take ownership of `bad` from the environment.
284 /// let bad = bad;
285 /// println!("ok: {:?}", ok); // `ok` is only borrowed.
286 /// println!("bad: {:?}", bad); // refers to our local variable, above.
287 /// });
288 ///
289 /// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
290 /// });
291 /// ```
292 ///
293 /// # Panics
294 ///
295 /// If a panic occurs, either in the closure given to `scope()` or in
296 /// any of the spawned jobs, that panic will be propagated and the
297 /// call to `scope()` will panic. If multiple panics occurs, it is
298 /// non-deterministic which of their panic values will propagate.
299 /// Regardless, once a task is spawned using `scope.spawn()`, it will
300 /// execute, even if the spawning task should later panic. `scope()`
301 /// returns once all spawned jobs have completed, and any panics are
302 /// propagated at that point.
303 pub fn scope<'scope, OP, R>(op: OP) -> R
304 where
305 OP: FnOnce(&Scope<'scope>) -> R + Send,
306 R: Send,
307 {
308 in_worker(|owner_thread, _| {
309 let scope = Scope::<'scope>::new(Some(owner_thread), None);
310 scope.base.complete(Some(owner_thread), || op(&scope))
311 })
312 }
313
314 /// Creates a "fork-join" scope `s` with FIFO order, and invokes the
315 /// closure with a reference to `s`. This closure can then spawn
316 /// asynchronous tasks into `s`. Those tasks may run asynchronously with
317 /// respect to the closure; they may themselves spawn additional tasks
318 /// into `s`. When the closure returns, it will block until all tasks
319 /// that have been spawned into `s` complete.
320 ///
321 /// # Task execution
322 ///
323 /// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
324 /// difference in the order of execution. Consider a similar example:
325 ///
326 /// [`scope()`]: fn.scope.html
327 ///
328 /// ```rust
329 /// # use rayon_core as rayon;
330 /// // point start
331 /// rayon::scope_fifo(|s| {
332 /// s.spawn_fifo(|s| { // task s.1
333 /// s.spawn_fifo(|s| { // task s.1.1
334 /// rayon::scope_fifo(|t| {
335 /// t.spawn_fifo(|_| ()); // task t.1
336 /// t.spawn_fifo(|_| ()); // task t.2
337 /// });
338 /// });
339 /// });
340 /// s.spawn_fifo(|s| { // task s.2
341 /// });
342 /// // point mid
343 /// });
344 /// // point end
345 /// ```
346 ///
347 /// The various tasks that are run will execute roughly like so:
348 ///
349 /// ```notrust
350 /// | (start)
351 /// |
352 /// | (FIFO scope `s` created)
353 /// +--------------------+ (task s.1)
354 /// +-------+ (task s.2) |
355 /// | | +---+ (task s.1.1)
356 /// | | | |
357 /// | | | | (FIFO scope `t` created)
358 /// | | | +----------------+ (task t.1)
359 /// | | | +---+ (task t.2) |
360 /// | (mid) | | | | |
361 /// : | | + <-+------------+ (scope `t` ends)
362 /// : | | |
363 /// |<------+------------+---+ (scope `s` ends)
364 /// |
365 /// | (end)
366 /// ```
367 ///
368 /// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
369 /// the thread from which they were spawned, as opposed to `scope()`'s
370 /// LIFO. So in this example, we can expect `s.1` to execute before
371 /// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
372 /// FIFO order, as usual. Overall, this has roughly the same order as
373 /// the now-deprecated [`breadth_first`] option, except the effect is
374 /// isolated to a particular scope. If spawns are intermingled from any
375 /// combination of `scope()` and `scope_fifo()`, or from different
376 /// threads, their order is only specified with respect to spawns in the
377 /// same scope and thread.
378 ///
379 /// For more details on this design, see Rayon [RFC #1].
380 ///
381 /// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
382 /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
383 ///
384 /// # Panics
385 ///
386 /// If a panic occurs, either in the closure given to `scope_fifo()` or
387 /// in any of the spawned jobs, that panic will be propagated and the
388 /// call to `scope_fifo()` will panic. If multiple panics occurs, it is
389 /// non-deterministic which of their panic values will propagate.
390 /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
391 /// will execute, even if the spawning task should later panic.
392 /// `scope_fifo()` returns once all spawned jobs have completed, and any
393 /// panics are propagated at that point.
394 pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
395 where
396 OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
397 R: Send,
398 {
399 in_worker(|owner_thread, _| {
400 let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
401 scope.base.complete(Some(owner_thread), || op(&scope))
402 })
403 }
404
405 /// Creates a "fork-join" scope `s` and invokes the closure with a
406 /// reference to `s`. This closure can then spawn asynchronous tasks
407 /// into `s`. Those tasks may run asynchronously with respect to the
408 /// closure; they may themselves spawn additional tasks into `s`. When
409 /// the closure returns, it will block until all tasks that have been
410 /// spawned into `s` complete.
411 ///
412 /// This is just like `scope()` except the closure runs on the same thread
413 /// that calls `in_place_scope()`. Only work that it spawns runs in the
414 /// thread pool.
415 ///
416 /// # Panics
417 ///
418 /// If a panic occurs, either in the closure given to `in_place_scope()` or in
419 /// any of the spawned jobs, that panic will be propagated and the
420 /// call to `in_place_scope()` will panic. If multiple panics occurs, it is
421 /// non-deterministic which of their panic values will propagate.
422 /// Regardless, once a task is spawned using `scope.spawn()`, it will
423 /// execute, even if the spawning task should later panic. `in_place_scope()`
424 /// returns once all spawned jobs have completed, and any panics are
425 /// propagated at that point.
426 pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
427 where
428 OP: FnOnce(&Scope<'scope>) -> R,
429 {
430 do_in_place_scope(None, op)
431 }
432
433 pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
434 where
435 OP: FnOnce(&Scope<'scope>) -> R,
436 {
437 let thread = unsafe { WorkerThread::current().as_ref() };
438 let scope = Scope::<'scope>::new(thread, registry);
439 scope.base.complete(thread, || op(&scope))
440 }
441
442 /// Creates a "fork-join" scope `s` with FIFO order, and invokes the
443 /// closure with a reference to `s`. This closure can then spawn
444 /// asynchronous tasks into `s`. Those tasks may run asynchronously with
445 /// respect to the closure; they may themselves spawn additional tasks
446 /// into `s`. When the closure returns, it will block until all tasks
447 /// that have been spawned into `s` complete.
448 ///
449 /// This is just like `scope_fifo()` except the closure runs on the same thread
450 /// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
451 /// thread pool.
452 ///
453 /// # Panics
454 ///
455 /// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
456 /// any of the spawned jobs, that panic will be propagated and the
457 /// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
458 /// non-deterministic which of their panic values will propagate.
459 /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
460 /// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
461 /// returns once all spawned jobs have completed, and any panics are
462 /// propagated at that point.
463 pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
464 where
465 OP: FnOnce(&ScopeFifo<'scope>) -> R,
466 {
467 do_in_place_scope_fifo(None, op)
468 }
469
470 pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
471 where
472 OP: FnOnce(&ScopeFifo<'scope>) -> R,
473 {
474 let thread = unsafe { WorkerThread::current().as_ref() };
475 let scope = ScopeFifo::<'scope>::new(thread, registry);
476 scope.base.complete(thread, || op(&scope))
477 }
478
479 impl<'scope> Scope<'scope> {
480 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
481 let base = ScopeBase::new(owner, registry);
482 Scope { base }
483 }
484
485 /// Spawns a job into the fork-join scope `self`. This job will
486 /// execute sometime before the fork-join scope completes. The
487 /// job is specified as a closure, and this closure receives its
488 /// own reference to the scope `self` as argument. This can be
489 /// used to inject new jobs into `self`.
490 ///
491 /// # Returns
492 ///
493 /// Nothing. The spawned closures cannot pass back values to the
494 /// caller directly, though they can write to local variables on
495 /// the stack (if those variables outlive the scope) or
496 /// communicate through shared channels.
497 ///
498 /// (The intention is to eventually integrate with Rust futures to
499 /// support spawns of functions that compute a value.)
500 ///
501 /// # Examples
502 ///
503 /// ```rust
504 /// # use rayon_core as rayon;
505 /// let mut value_a = None;
506 /// let mut value_b = None;
507 /// let mut value_c = None;
508 /// rayon::scope(|s| {
509 /// s.spawn(|s1| {
510 /// // ^ this is the same scope as `s`; this handle `s1`
511 /// // is intended for use by the spawned task,
512 /// // since scope handles cannot cross thread boundaries.
513 ///
514 /// value_a = Some(22);
515 ///
516 /// // the scope `s` will not end until all these tasks are done
517 /// s1.spawn(|_| {
518 /// value_b = Some(44);
519 /// });
520 /// });
521 ///
522 /// s.spawn(|_| {
523 /// value_c = Some(66);
524 /// });
525 /// });
526 /// assert_eq!(value_a, Some(22));
527 /// assert_eq!(value_b, Some(44));
528 /// assert_eq!(value_c, Some(66));
529 /// ```
530 ///
531 /// # See also
532 ///
533 /// The [`scope` function] has more extensive documentation about
534 /// task spawning.
535 ///
536 /// [`scope` function]: fn.scope.html
537 pub fn spawn<BODY>(&self, body: BODY)
538 where
539 BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
540 {
541 self.base.increment();
542 unsafe {
543 let job_ref = Box::new(HeapJob::new(move || {
544 self.base.execute_job(move || body(self))
545 }))
546 .as_job_ref();
547
548 // Since `Scope` implements `Sync`, we can't be sure that we're still in a
549 // thread of this pool, so we can't just push to the local worker thread.
550 // Also, this might be an in-place scope.
551 self.base.registry.inject_or_push(job_ref);
552 }
553 }
554 }
555
556 impl<'scope> ScopeFifo<'scope> {
557 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
558 let base = ScopeBase::new(owner, registry);
559 let num_threads = base.registry.num_threads();
560 let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
561 ScopeFifo { base, fifos }
562 }
563
564 /// Spawns a job into the fork-join scope `self`. This job will
565 /// execute sometime before the fork-join scope completes. The
566 /// job is specified as a closure, and this closure receives its
567 /// own reference to the scope `self` as argument. This can be
568 /// used to inject new jobs into `self`.
569 ///
570 /// # See also
571 ///
572 /// This method is akin to [`Scope::spawn()`], but with a FIFO
573 /// priority. The [`scope_fifo` function] has more details about
574 /// this distinction.
575 ///
576 /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
577 /// [`scope_fifo` function]: fn.scope_fifo.html
578 pub fn spawn_fifo<BODY>(&self, body: BODY)
579 where
580 BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
581 {
582 self.base.increment();
583 unsafe {
584 let job_ref = Box::new(HeapJob::new(move || {
585 self.base.execute_job(move || body(self))
586 }))
587 .as_job_ref();
588
589 // If we're in the pool, use our scope's private fifo for this thread to execute
590 // in a locally-FIFO order. Otherwise, just use the pool's global injector.
591 match self.base.registry.current_thread() {
592 Some(worker) => {
593 let fifo = &self.fifos[worker.index()];
594 worker.push(fifo.push(job_ref));
595 }
596 None => self.base.registry.inject(&[job_ref]),
597 }
598 }
599 }
600 }
601
602 impl<'scope> ScopeBase<'scope> {
603 /// Creates the base of a new scope for the given registry
604 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
605 let registry = registry.unwrap_or_else(|| match owner {
606 Some(owner) => owner.registry(),
607 None => global_registry(),
608 });
609
610 ScopeBase {
611 registry: Arc::clone(registry),
612 panic: AtomicPtr::new(ptr::null_mut()),
613 job_completed_latch: ScopeLatch::new(owner),
614 marker: PhantomData,
615 }
616 }
617
618 fn increment(&self) {
619 self.job_completed_latch.increment();
620 }
621
622 /// Executes `func` as a job, either aborting or executing as
623 /// appropriate.
624 fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
625 where
626 FUNC: FnOnce() -> R,
627 {
628 let result = self.execute_job_closure(func);
629 self.job_completed_latch.wait(owner);
630 self.maybe_propagate_panic();
631 result.unwrap() // only None if `op` panicked, and that would have been propagated
632 }
633
634 /// Executes `func` as a job, either aborting or executing as
635 /// appropriate.
636 fn execute_job<FUNC>(&self, func: FUNC)
637 where
638 FUNC: FnOnce(),
639 {
640 let _: Option<()> = self.execute_job_closure(func);
641 }
642
643 /// Executes `func` as a job in scope. Adjusts the "job completed"
644 /// counters and also catches any panic and stores it into
645 /// `scope`.
646 fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
647 where
648 FUNC: FnOnce() -> R,
649 {
650 match unwind::halt_unwinding(func) {
651 Ok(r) => {
652 self.job_completed_latch.set();
653 Some(r)
654 }
655 Err(err) => {
656 self.job_panicked(err);
657 self.job_completed_latch.set();
658 None
659 }
660 }
661 }
662
663 fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
664 // capture the first error we see, free the rest
665 let nil = ptr::null_mut();
666 let mut err = Box::new(err); // box up the fat ptr
667 if self
668 .panic
669 .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
670 .is_ok()
671 {
672 mem::forget(err); // ownership now transferred into self.panic
673 }
674 }
675
676 fn maybe_propagate_panic(&self) {
677 // propagate panic, if any occurred; at this point, all
678 // outstanding jobs have completed, so we can use a relaxed
679 // ordering:
680 let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
681 if !panic.is_null() {
682 let value = unsafe { Box::from_raw(panic) };
683 unwind::resume_unwinding(*value);
684 }
685 }
686 }
687
688 impl ScopeLatch {
689 fn new(owner: Option<&WorkerThread>) -> Self {
690 match owner {
691 Some(owner) => ScopeLatch::Stealing {
692 latch: CountLatch::new(),
693 registry: Arc::clone(owner.registry()),
694 worker_index: owner.index(),
695 },
696 None => ScopeLatch::Blocking {
697 latch: CountLockLatch::new(),
698 },
699 }
700 }
701
702 fn increment(&self) {
703 match self {
704 ScopeLatch::Stealing { latch, .. } => latch.increment(),
705 ScopeLatch::Blocking { latch } => latch.increment(),
706 }
707 }
708
709 fn set(&self) {
710 match self {
711 ScopeLatch::Stealing {
712 latch,
713 registry,
714 worker_index,
715 } => latch.set_and_tickle_one(registry, *worker_index),
716 ScopeLatch::Blocking { latch } => latch.set(),
717 }
718 }
719
720 fn wait(&self, owner: Option<&WorkerThread>) {
721 match self {
722 ScopeLatch::Stealing {
723 latch,
724 registry,
725 worker_index,
726 } => unsafe {
727 let owner = owner.expect("owner thread");
728 debug_assert_eq!(registry.id(), owner.registry().id());
729 debug_assert_eq!(*worker_index, owner.index());
730 owner.wait_until(latch);
731 },
732 ScopeLatch::Blocking { latch } => latch.wait(),
733 }
734 }
735 }
736
737 impl<'scope> fmt::Debug for Scope<'scope> {
738 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
739 fmt.debug_struct("Scope")
740 .field("pool_id", &self.base.registry.id())
741 .field("panic", &self.base.panic)
742 .field("job_completed_latch", &self.base.job_completed_latch)
743 .finish()
744 }
745 }
746
747 impl<'scope> fmt::Debug for ScopeFifo<'scope> {
748 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
749 fmt.debug_struct("ScopeFifo")
750 .field("num_fifos", &self.fifos.len())
751 .field("pool_id", &self.base.registry.id())
752 .field("panic", &self.base.panic)
753 .field("job_completed_latch", &self.base.job_completed_latch)
754 .finish()
755 }
756 }
757
758 impl fmt::Debug for ScopeLatch {
759 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
760 match self {
761 ScopeLatch::Stealing { latch, .. } => fmt
762 .debug_tuple("ScopeLatch::Stealing")
763 .field(latch)
764 .finish(),
765 ScopeLatch::Blocking { latch } => fmt
766 .debug_tuple("ScopeLatch::Blocking")
767 .field(latch)
768 .finish(),
769 }
770 }
771 }