]> git.proxmox.com Git - rustc.git/blob - vendor/rustc-rayon-core/src/scope/mod.rs
New upstream version 1.63.0+dfsg1
[rustc.git] / vendor / rustc-rayon-core / src / scope / mod.rs
1 //! Methods for custom fork-join scopes, created by the [`scope()`]
2 //! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3 //!
4 //! [`scope()`]: fn.scope.html
5 //! [`in_place_scope()`]: fn.in_place_scope.html
6 //! [`join()`]: ../join/join.fn.html
7
8 use crate::job::{HeapJob, JobFifo};
9 use crate::latch::{CountLatch, CountLockLatch, Latch};
10 use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
11 use crate::tlv;
12 use crate::unwind;
13 use std::any::Any;
14 use std::fmt;
15 use std::marker::PhantomData;
16 use std::mem;
17 use std::ptr;
18 use std::sync::atomic::{AtomicPtr, Ordering};
19 use std::sync::Arc;
20
21 #[cfg(test)]
22 mod test;
23
24 /// Represents a fork-join scope which can be used to spawn any number of tasks.
25 /// See [`scope()`] for more information.
26 ///
27 ///[`scope()`]: fn.scope.html
28 pub struct Scope<'scope> {
29 base: ScopeBase<'scope>,
30 }
31
32 /// Represents a fork-join scope which can be used to spawn any number of tasks.
33 /// Those spawned from the same thread are prioritized in relative FIFO order.
34 /// See [`scope_fifo()`] for more information.
35 ///
36 ///[`scope_fifo()`]: fn.scope_fifo.html
37 pub struct ScopeFifo<'scope> {
38 base: ScopeBase<'scope>,
39 fifos: Vec<JobFifo>,
40 }
41
42 enum ScopeLatch {
43 /// A latch for scopes created on a rayon thread which will participate in work-
44 /// stealing while it waits for completion. This thread is not necessarily part
45 /// of the same registry as the scope itself!
46 Stealing {
47 latch: CountLatch,
48 /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
49 /// with registry B, when a job completes in a thread of registry B, we may
50 /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A.
51 /// That means we need a reference to registry A (since at that point we will
52 /// only have a reference to registry B), so we stash it here.
53 registry: Arc<Registry>,
54 /// The index of the worker to wake in `registry`
55 worker_index: usize,
56 },
57
58 /// A latch for scopes created on a non-rayon thread which will block to wait.
59 Blocking { latch: CountLockLatch },
60 }
61
62 struct ScopeBase<'scope> {
63 /// thread registry where `scope()` was executed or where `in_place_scope()`
64 /// should spawn jobs.
65 registry: Arc<Registry>,
66
67 /// if some job panicked, the error is stored here; it will be
68 /// propagated to the one who created the scope
69 panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
70
71 /// latch to track job counts
72 job_completed_latch: ScopeLatch,
73
74 /// You can think of a scope as containing a list of closures to execute,
75 /// all of which outlive `'scope`. They're not actually required to be
76 /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
77 /// the closures are only *moved* across threads to be executed.
78 marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
79
80 /// The TLV at the scope's creation. Used to set the TLV for spawned jobs.
81 tlv: usize,
82 }
83
84 /// Creates a "fork-join" scope `s` and invokes the closure with a
85 /// reference to `s`. This closure can then spawn asynchronous tasks
86 /// into `s`. Those tasks may run asynchronously with respect to the
87 /// closure; they may themselves spawn additional tasks into `s`. When
88 /// the closure returns, it will block until all tasks that have been
89 /// spawned into `s` complete.
90 ///
91 /// `scope()` is a more flexible building block compared to `join()`,
92 /// since a loop can be used to spawn any number of tasks without
93 /// recursing. However, that flexibility comes at a performance price:
94 /// tasks spawned using `scope()` must be allocated onto the heap,
95 /// whereas `join()` can make exclusive use of the stack. **Prefer
96 /// `join()` (or, even better, parallel iterators) where possible.**
97 ///
98 /// # Example
99 ///
100 /// The Rayon `join()` function launches two closures and waits for them
101 /// to stop. One could implement `join()` using a scope like so, although
102 /// it would be less efficient than the real implementation:
103 ///
104 /// ```rust
105 /// # use rayon_core as rayon;
106 /// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
107 /// where A: FnOnce() -> RA + Send,
108 /// B: FnOnce() -> RB + Send,
109 /// RA: Send,
110 /// RB: Send,
111 /// {
112 /// let mut result_a: Option<RA> = None;
113 /// let mut result_b: Option<RB> = None;
114 /// rayon::scope(|s| {
115 /// s.spawn(|_| result_a = Some(oper_a()));
116 /// s.spawn(|_| result_b = Some(oper_b()));
117 /// });
118 /// (result_a.unwrap(), result_b.unwrap())
119 /// }
120 /// ```
121 ///
122 /// # A note on threading
123 ///
124 /// The closure given to `scope()` executes in the Rayon thread-pool,
125 /// as do those given to `spawn()`. This means that you can't access
126 /// thread-local variables (well, you can, but they may have
127 /// unexpected values).
128 ///
129 /// # Task execution
130 ///
131 /// Task execution potentially starts as soon as `spawn()` is called.
132 /// The task will end sometime before `scope()` returns. Note that the
133 /// *closure* given to scope may return much earlier. In general
134 /// the lifetime of a scope created like `scope(body) goes something like this:
135 ///
136 /// - Scope begins when `scope(body)` is called
137 /// - Scope body `body()` is invoked
138 /// - Scope tasks may be spawned
139 /// - Scope body returns
140 /// - Scope tasks execute, possibly spawning more tasks
141 /// - Once all tasks are done, scope ends and `scope()` returns
142 ///
143 /// To see how and when tasks are joined, consider this example:
144 ///
145 /// ```rust
146 /// # use rayon_core as rayon;
147 /// // point start
148 /// rayon::scope(|s| {
149 /// s.spawn(|s| { // task s.1
150 /// s.spawn(|s| { // task s.1.1
151 /// rayon::scope(|t| {
152 /// t.spawn(|_| ()); // task t.1
153 /// t.spawn(|_| ()); // task t.2
154 /// });
155 /// });
156 /// });
157 /// s.spawn(|s| { // task s.2
158 /// });
159 /// // point mid
160 /// });
161 /// // point end
162 /// ```
163 ///
164 /// The various tasks that are run will execute roughly like so:
165 ///
166 /// ```notrust
167 /// | (start)
168 /// |
169 /// | (scope `s` created)
170 /// +-----------------------------------------------+ (task s.2)
171 /// +-------+ (task s.1) |
172 /// | | |
173 /// | +---+ (task s.1.1) |
174 /// | | | |
175 /// | | | (scope `t` created) |
176 /// | | +----------------+ (task t.2) |
177 /// | | +---+ (task t.1) | |
178 /// | (mid) | | | | |
179 /// : | + <-+------------+ (scope `t` ends) |
180 /// : | | |
181 /// |<------+---+-----------------------------------+ (scope `s` ends)
182 /// |
183 /// | (end)
184 /// ```
185 ///
186 /// The point here is that everything spawned into scope `s` will
187 /// terminate (at latest) at the same point -- right before the
188 /// original call to `rayon::scope` returns. This includes new
189 /// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
190 /// scope is created (such as `t`), the things spawned into that scope
191 /// will be joined before that scope returns, which in turn occurs
192 /// before the creating task (task `s.1.1` in this case) finishes.
193 ///
194 /// There is no guaranteed order of execution for spawns in a scope,
195 /// given that other threads may steal tasks at any time. However, they
196 /// are generally prioritized in a LIFO order on the thread from which
197 /// they were spawned. So in this example, absent any stealing, we can
198 /// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
199 /// threads always steal from the other end of the deque, like FIFO
200 /// order. The idea is that "recent" tasks are most likely to be fresh
201 /// in the local CPU's cache, while other threads can steal older
202 /// "stale" tasks. For an alternate approach, consider
203 /// [`scope_fifo()`] instead.
204 ///
205 /// [`scope_fifo()`]: fn.scope_fifo.html
206 ///
207 /// # Accessing stack data
208 ///
209 /// In general, spawned tasks may access stack data in place that
210 /// outlives the scope itself. Other data must be fully owned by the
211 /// spawned task.
212 ///
213 /// ```rust
214 /// # use rayon_core as rayon;
215 /// let ok: Vec<i32> = vec![1, 2, 3];
216 /// rayon::scope(|s| {
217 /// let bad: Vec<i32> = vec![4, 5, 6];
218 /// s.spawn(|_| {
219 /// // We can access `ok` because outlives the scope `s`.
220 /// println!("ok: {:?}", ok);
221 ///
222 /// // If we just try to use `bad` here, the closure will borrow `bad`
223 /// // (because we are just printing it out, and that only requires a
224 /// // borrow), which will result in a compilation error. Read on
225 /// // for options.
226 /// // println!("bad: {:?}", bad);
227 /// });
228 /// });
229 /// ```
230 ///
231 /// As the comments example above suggest, to reference `bad` we must
232 /// take ownership of it. One way to do this is to detach the closure
233 /// from the surrounding stack frame, using the `move` keyword. This
234 /// will cause it to take ownership of *all* the variables it touches,
235 /// in this case including both `ok` *and* `bad`:
236 ///
237 /// ```rust
238 /// # use rayon_core as rayon;
239 /// let ok: Vec<i32> = vec![1, 2, 3];
240 /// rayon::scope(|s| {
241 /// let bad: Vec<i32> = vec![4, 5, 6];
242 /// s.spawn(move |_| {
243 /// println!("ok: {:?}", ok);
244 /// println!("bad: {:?}", bad);
245 /// });
246 ///
247 /// // That closure is fine, but now we can't use `ok` anywhere else,
248 /// // since it is owend by the previous task:
249 /// // s.spawn(|_| println!("ok: {:?}", ok));
250 /// });
251 /// ```
252 ///
253 /// While this works, it could be a problem if we want to use `ok` elsewhere.
254 /// There are two choices. We can keep the closure as a `move` closure, but
255 /// instead of referencing the variable `ok`, we create a shadowed variable that
256 /// is a borrow of `ok` and capture *that*:
257 ///
258 /// ```rust
259 /// # use rayon_core as rayon;
260 /// let ok: Vec<i32> = vec![1, 2, 3];
261 /// rayon::scope(|s| {
262 /// let bad: Vec<i32> = vec![4, 5, 6];
263 /// let ok: &Vec<i32> = &ok; // shadow the original `ok`
264 /// s.spawn(move |_| {
265 /// println!("ok: {:?}", ok); // captures the shadowed version
266 /// println!("bad: {:?}", bad);
267 /// });
268 ///
269 /// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
270 /// // can be shared freely. Note that we need a `move` closure here though,
271 /// // because otherwise we'd be trying to borrow the shadowed `ok`,
272 /// // and that doesn't outlive `scope`.
273 /// s.spawn(move |_| println!("ok: {:?}", ok));
274 /// });
275 /// ```
276 ///
277 /// Another option is not to use the `move` keyword but instead to take ownership
278 /// of individual variables:
279 ///
280 /// ```rust
281 /// # use rayon_core as rayon;
282 /// let ok: Vec<i32> = vec![1, 2, 3];
283 /// rayon::scope(|s| {
284 /// let bad: Vec<i32> = vec![4, 5, 6];
285 /// s.spawn(|_| {
286 /// // Transfer ownership of `bad` into a local variable (also named `bad`).
287 /// // This will force the closure to take ownership of `bad` from the environment.
288 /// let bad = bad;
289 /// println!("ok: {:?}", ok); // `ok` is only borrowed.
290 /// println!("bad: {:?}", bad); // refers to our local variable, above.
291 /// });
292 ///
293 /// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
294 /// });
295 /// ```
296 ///
297 /// # Panics
298 ///
299 /// If a panic occurs, either in the closure given to `scope()` or in
300 /// any of the spawned jobs, that panic will be propagated and the
301 /// call to `scope()` will panic. If multiple panics occurs, it is
302 /// non-deterministic which of their panic values will propagate.
303 /// Regardless, once a task is spawned using `scope.spawn()`, it will
304 /// execute, even if the spawning task should later panic. `scope()`
305 /// returns once all spawned jobs have completed, and any panics are
306 /// propagated at that point.
307 pub fn scope<'scope, OP, R>(op: OP) -> R
308 where
309 OP: FnOnce(&Scope<'scope>) -> R + Send,
310 R: Send,
311 {
312 in_worker(|owner_thread, _| {
313 let scope = Scope::<'scope>::new(Some(owner_thread), None);
314 scope.base.complete(Some(owner_thread), || op(&scope))
315 })
316 }
317
318 /// Creates a "fork-join" scope `s` with FIFO order, and invokes the
319 /// closure with a reference to `s`. This closure can then spawn
320 /// asynchronous tasks into `s`. Those tasks may run asynchronously with
321 /// respect to the closure; they may themselves spawn additional tasks
322 /// into `s`. When the closure returns, it will block until all tasks
323 /// that have been spawned into `s` complete.
324 ///
325 /// # Task execution
326 ///
327 /// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
328 /// difference in the order of execution. Consider a similar example:
329 ///
330 /// [`scope()`]: fn.scope.html
331 ///
332 /// ```rust
333 /// # use rayon_core as rayon;
334 /// // point start
335 /// rayon::scope_fifo(|s| {
336 /// s.spawn_fifo(|s| { // task s.1
337 /// s.spawn_fifo(|s| { // task s.1.1
338 /// rayon::scope_fifo(|t| {
339 /// t.spawn_fifo(|_| ()); // task t.1
340 /// t.spawn_fifo(|_| ()); // task t.2
341 /// });
342 /// });
343 /// });
344 /// s.spawn_fifo(|s| { // task s.2
345 /// });
346 /// // point mid
347 /// });
348 /// // point end
349 /// ```
350 ///
351 /// The various tasks that are run will execute roughly like so:
352 ///
353 /// ```notrust
354 /// | (start)
355 /// |
356 /// | (FIFO scope `s` created)
357 /// +--------------------+ (task s.1)
358 /// +-------+ (task s.2) |
359 /// | | +---+ (task s.1.1)
360 /// | | | |
361 /// | | | | (FIFO scope `t` created)
362 /// | | | +----------------+ (task t.1)
363 /// | | | +---+ (task t.2) |
364 /// | (mid) | | | | |
365 /// : | | + <-+------------+ (scope `t` ends)
366 /// : | | |
367 /// |<------+------------+---+ (scope `s` ends)
368 /// |
369 /// | (end)
370 /// ```
371 ///
372 /// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
373 /// the thread from which they were spawned, as opposed to `scope()`'s
374 /// LIFO. So in this example, we can expect `s.1` to execute before
375 /// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
376 /// FIFO order, as usual. Overall, this has roughly the same order as
377 /// the now-deprecated [`breadth_first`] option, except the effect is
378 /// isolated to a particular scope. If spawns are intermingled from any
379 /// combination of `scope()` and `scope_fifo()`, or from different
380 /// threads, their order is only specified with respect to spawns in the
381 /// same scope and thread.
382 ///
383 /// For more details on this design, see Rayon [RFC #1].
384 ///
385 /// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
386 /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
387 ///
388 /// # Panics
389 ///
390 /// If a panic occurs, either in the closure given to `scope_fifo()` or
391 /// in any of the spawned jobs, that panic will be propagated and the
392 /// call to `scope_fifo()` will panic. If multiple panics occurs, it is
393 /// non-deterministic which of their panic values will propagate.
394 /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
395 /// will execute, even if the spawning task should later panic.
396 /// `scope_fifo()` returns once all spawned jobs have completed, and any
397 /// panics are propagated at that point.
398 pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
399 where
400 OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
401 R: Send,
402 {
403 in_worker(|owner_thread, _| {
404 let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
405 scope.base.complete(Some(owner_thread), || op(&scope))
406 })
407 }
408
409 /// Creates a "fork-join" scope `s` and invokes the closure with a
410 /// reference to `s`. This closure can then spawn asynchronous tasks
411 /// into `s`. Those tasks may run asynchronously with respect to the
412 /// closure; they may themselves spawn additional tasks into `s`. When
413 /// the closure returns, it will block until all tasks that have been
414 /// spawned into `s` complete.
415 ///
416 /// This is just like `scope()` except the closure runs on the same thread
417 /// that calls `in_place_scope()`. Only work that it spawns runs in the
418 /// thread pool.
419 ///
420 /// # Panics
421 ///
422 /// If a panic occurs, either in the closure given to `in_place_scope()` or in
423 /// any of the spawned jobs, that panic will be propagated and the
424 /// call to `in_place_scope()` will panic. If multiple panics occurs, it is
425 /// non-deterministic which of their panic values will propagate.
426 /// Regardless, once a task is spawned using `scope.spawn()`, it will
427 /// execute, even if the spawning task should later panic. `in_place_scope()`
428 /// returns once all spawned jobs have completed, and any panics are
429 /// propagated at that point.
430 pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
431 where
432 OP: FnOnce(&Scope<'scope>) -> R,
433 {
434 do_in_place_scope(None, op)
435 }
436
437 pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
438 where
439 OP: FnOnce(&Scope<'scope>) -> R,
440 {
441 let thread = unsafe { WorkerThread::current().as_ref() };
442 let scope = Scope::<'scope>::new(thread, registry);
443 scope.base.complete(thread, || op(&scope))
444 }
445
446 /// Creates a "fork-join" scope `s` with FIFO order, and invokes the
447 /// closure with a reference to `s`. This closure can then spawn
448 /// asynchronous tasks into `s`. Those tasks may run asynchronously with
449 /// respect to the closure; they may themselves spawn additional tasks
450 /// into `s`. When the closure returns, it will block until all tasks
451 /// that have been spawned into `s` complete.
452 ///
453 /// This is just like `scope_fifo()` except the closure runs on the same thread
454 /// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
455 /// thread pool.
456 ///
457 /// # Panics
458 ///
459 /// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
460 /// any of the spawned jobs, that panic will be propagated and the
461 /// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
462 /// non-deterministic which of their panic values will propagate.
463 /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
464 /// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
465 /// returns once all spawned jobs have completed, and any panics are
466 /// propagated at that point.
467 pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
468 where
469 OP: FnOnce(&ScopeFifo<'scope>) -> R,
470 {
471 do_in_place_scope_fifo(None, op)
472 }
473
474 pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
475 where
476 OP: FnOnce(&ScopeFifo<'scope>) -> R,
477 {
478 let thread = unsafe { WorkerThread::current().as_ref() };
479 let scope = ScopeFifo::<'scope>::new(thread, registry);
480 scope.base.complete(thread, || op(&scope))
481 }
482
483 impl<'scope> Scope<'scope> {
484 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
485 let base = ScopeBase::new(owner, registry);
486 Scope { base }
487 }
488
489 /// Spawns a job into the fork-join scope `self`. This job will
490 /// execute sometime before the fork-join scope completes. The
491 /// job is specified as a closure, and this closure receives its
492 /// own reference to the scope `self` as argument. This can be
493 /// used to inject new jobs into `self`.
494 ///
495 /// # Returns
496 ///
497 /// Nothing. The spawned closures cannot pass back values to the
498 /// caller directly, though they can write to local variables on
499 /// the stack (if those variables outlive the scope) or
500 /// communicate through shared channels.
501 ///
502 /// (The intention is to eventually integrate with Rust futures to
503 /// support spawns of functions that compute a value.)
504 ///
505 /// # Examples
506 ///
507 /// ```rust
508 /// # use rayon_core as rayon;
509 /// let mut value_a = None;
510 /// let mut value_b = None;
511 /// let mut value_c = None;
512 /// rayon::scope(|s| {
513 /// s.spawn(|s1| {
514 /// // ^ this is the same scope as `s`; this handle `s1`
515 /// // is intended for use by the spawned task,
516 /// // since scope handles cannot cross thread boundaries.
517 ///
518 /// value_a = Some(22);
519 ///
520 /// // the scope `s` will not end until all these tasks are done
521 /// s1.spawn(|_| {
522 /// value_b = Some(44);
523 /// });
524 /// });
525 ///
526 /// s.spawn(|_| {
527 /// value_c = Some(66);
528 /// });
529 /// });
530 /// assert_eq!(value_a, Some(22));
531 /// assert_eq!(value_b, Some(44));
532 /// assert_eq!(value_c, Some(66));
533 /// ```
534 ///
535 /// # See also
536 ///
537 /// The [`scope` function] has more extensive documentation about
538 /// task spawning.
539 ///
540 /// [`scope` function]: fn.scope.html
541 pub fn spawn<BODY>(&self, body: BODY)
542 where
543 BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
544 {
545 self.base.increment();
546 unsafe {
547 let job_ref = Box::new(HeapJob::new(self.base.tlv, move || {
548 self.base.execute_job(move || body(self))
549 }))
550 .as_job_ref();
551
552 // Since `Scope` implements `Sync`, we can't be sure that we're still in a
553 // thread of this pool, so we can't just push to the local worker thread.
554 // Also, this might be an in-place scope.
555 self.base.registry.inject_or_push(job_ref);
556 }
557 }
558 }
559
560 impl<'scope> ScopeFifo<'scope> {
561 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
562 let base = ScopeBase::new(owner, registry);
563 let num_threads = base.registry.num_threads();
564 let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
565 ScopeFifo { base, fifos }
566 }
567
568 /// Spawns a job into the fork-join scope `self`. This job will
569 /// execute sometime before the fork-join scope completes. The
570 /// job is specified as a closure, and this closure receives its
571 /// own reference to the scope `self` as argument. This can be
572 /// used to inject new jobs into `self`.
573 ///
574 /// # See also
575 ///
576 /// This method is akin to [`Scope::spawn()`], but with a FIFO
577 /// priority. The [`scope_fifo` function] has more details about
578 /// this distinction.
579 ///
580 /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
581 /// [`scope_fifo` function]: fn.scope_fifo.html
582 pub fn spawn_fifo<BODY>(&self, body: BODY)
583 where
584 BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
585 {
586 self.base.increment();
587 unsafe {
588 let job_ref = Box::new(HeapJob::new(self.base.tlv, move || {
589 self.base.execute_job(move || body(self))
590 }))
591 .as_job_ref();
592
593 // If we're in the pool, use our scope's private fifo for this thread to execute
594 // in a locally-FIFO order. Otherwise, just use the pool's global injector.
595 match self.base.registry.current_thread() {
596 Some(worker) => {
597 let fifo = &self.fifos[worker.index()];
598 worker.push(fifo.push(job_ref));
599 }
600 None => self.base.registry.inject(&[job_ref]),
601 }
602 }
603 }
604 }
605
606 impl<'scope> ScopeBase<'scope> {
607 /// Creates the base of a new scope for the given registry
608 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
609 let registry = registry.unwrap_or_else(|| match owner {
610 Some(owner) => owner.registry(),
611 None => global_registry(),
612 });
613
614 ScopeBase {
615 registry: Arc::clone(registry),
616 panic: AtomicPtr::new(ptr::null_mut()),
617 job_completed_latch: ScopeLatch::new(owner),
618 marker: PhantomData,
619 tlv: tlv::get(),
620 }
621 }
622
623 fn increment(&self) {
624 self.job_completed_latch.increment();
625 }
626
627 /// Executes `func` as a job, either aborting or executing as
628 /// appropriate.
629 fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
630 where
631 FUNC: FnOnce() -> R,
632 {
633 let result = self.execute_job_closure(func);
634 self.job_completed_latch.wait(owner);
635
636 // Restore the TLV if we ran some jobs while waiting
637 tlv::set(self.tlv);
638
639 self.maybe_propagate_panic();
640 result.unwrap() // only None if `op` panicked, and that would have been propagated
641 }
642
643 /// Executes `func` as a job, either aborting or executing as
644 /// appropriate.
645 fn execute_job<FUNC>(&self, func: FUNC)
646 where
647 FUNC: FnOnce(),
648 {
649 let _: Option<()> = self.execute_job_closure(func);
650 }
651
652 /// Executes `func` as a job in scope. Adjusts the "job completed"
653 /// counters and also catches any panic and stores it into
654 /// `scope`.
655 fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
656 where
657 FUNC: FnOnce() -> R,
658 {
659 match unwind::halt_unwinding(func) {
660 Ok(r) => {
661 self.job_completed_latch.set();
662 Some(r)
663 }
664 Err(err) => {
665 self.job_panicked(err);
666 self.job_completed_latch.set();
667 None
668 }
669 }
670 }
671
672 fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
673 // capture the first error we see, free the rest
674 let nil = ptr::null_mut();
675 let mut err = Box::new(err); // box up the fat ptr
676 if self
677 .panic
678 .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
679 .is_ok()
680 {
681 mem::forget(err); // ownership now transferred into self.panic
682 }
683 }
684
685 fn maybe_propagate_panic(&self) {
686 // propagate panic, if any occurred; at this point, all
687 // outstanding jobs have completed, so we can use a relaxed
688 // ordering:
689 let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
690 if !panic.is_null() {
691 let value = unsafe { Box::from_raw(panic) };
692
693 // Restore the TLV if we ran some jobs while waiting
694 tlv::set(self.tlv);
695
696 unwind::resume_unwinding(*value);
697 }
698 }
699 }
700
701 impl ScopeLatch {
702 fn new(owner: Option<&WorkerThread>) -> Self {
703 match owner {
704 Some(owner) => ScopeLatch::Stealing {
705 latch: CountLatch::new(),
706 registry: Arc::clone(owner.registry()),
707 worker_index: owner.index(),
708 },
709 None => ScopeLatch::Blocking {
710 latch: CountLockLatch::new(),
711 },
712 }
713 }
714
715 fn increment(&self) {
716 match self {
717 ScopeLatch::Stealing { latch, .. } => latch.increment(),
718 ScopeLatch::Blocking { latch } => latch.increment(),
719 }
720 }
721
722 fn set(&self) {
723 match self {
724 ScopeLatch::Stealing {
725 latch,
726 registry,
727 worker_index,
728 } => latch.set_and_tickle_one(registry, *worker_index),
729 ScopeLatch::Blocking { latch } => latch.set(),
730 }
731 }
732
733 fn wait(&self, owner: Option<&WorkerThread>) {
734 match self {
735 ScopeLatch::Stealing {
736 latch,
737 registry,
738 worker_index,
739 } => unsafe {
740 let owner = owner.expect("owner thread");
741 debug_assert_eq!(registry.id(), owner.registry().id());
742 debug_assert_eq!(*worker_index, owner.index());
743 owner.wait_until(latch);
744 },
745 ScopeLatch::Blocking { latch } => latch.wait(),
746 }
747 }
748 }
749
750 impl<'scope> fmt::Debug for Scope<'scope> {
751 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
752 fmt.debug_struct("Scope")
753 .field("pool_id", &self.base.registry.id())
754 .field("panic", &self.base.panic)
755 .field("job_completed_latch", &self.base.job_completed_latch)
756 .finish()
757 }
758 }
759
760 impl<'scope> fmt::Debug for ScopeFifo<'scope> {
761 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
762 fmt.debug_struct("ScopeFifo")
763 .field("num_fifos", &self.fifos.len())
764 .field("pool_id", &self.base.registry.id())
765 .field("panic", &self.base.panic)
766 .field("job_completed_latch", &self.base.job_completed_latch)
767 .finish()
768 }
769 }
770
771 impl fmt::Debug for ScopeLatch {
772 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
773 match self {
774 ScopeLatch::Stealing { latch, .. } => fmt
775 .debug_tuple("ScopeLatch::Stealing")
776 .field(latch)
777 .finish(),
778 ScopeLatch::Blocking { latch } => fmt
779 .debug_tuple("ScopeLatch::Blocking")
780 .field(latch)
781 .finish(),
782 }
783 }
784 }