]> git.proxmox.com Git - rustc.git/blame - vendor/gix-features/src/parallel/serial.rs
New upstream version 1.74.1+dfsg1
[rustc.git] / vendor / gix-features / src / parallel / serial.rs
CommitLineData
0a29b90c
FG
1use crate::parallel::Reduce;
2
3#[cfg(not(feature = "parallel"))]
4mod not_parallel {
fe692bf9
FG
5 use std::sync::atomic::{AtomicBool, AtomicIsize};
6
0a29b90c
FG
7 /// Runs `left` and then `right`, one after another, returning their output when both are done.
8 pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
9 (left(), right())
10 }
11
12 /// A scope for spawning threads.
fe692bf9
FG
13 pub struct Scope<'scope, 'env: 'scope> {
14 _scope: std::marker::PhantomData<&'scope mut &'scope ()>,
15 _env: std::marker::PhantomData<&'env mut &'env ()>,
0a29b90c
FG
16 }
17
18 pub struct ThreadBuilder;
19
20 /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
21 pub fn build_thread() -> ThreadBuilder {
22 ThreadBuilder
23 }
24
25 #[allow(unsafe_code)]
fe692bf9 26 unsafe impl Sync for Scope<'_, '_> {}
0a29b90c
FG
27
28 impl ThreadBuilder {
29 pub fn name(self, _new: String) -> Self {
30 self
31 }
fe692bf9 32 pub fn spawn_scoped<'scope, 'env, F, T>(
0a29b90c 33 &self,
fe692bf9 34 scope: &'scope Scope<'scope, 'env>,
0a29b90c 35 f: F,
fe692bf9 36 ) -> std::io::Result<ScopedJoinHandle<'scope, T>>
0a29b90c 37 where
fe692bf9
FG
38 F: FnOnce() -> T + 'scope,
39 T: 'scope,
0a29b90c
FG
40 {
41 Ok(scope.spawn(f))
42 }
43 }
44
fe692bf9
FG
45 impl<'scope, 'env> Scope<'scope, 'env> {
46 /// Provided with this scope, let `f` start new threads that live within it.
47 pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
0a29b90c 48 where
fe692bf9
FG
49 F: FnOnce() -> T + 'scope,
50 T: 'scope,
0a29b90c
FG
51 {
52 ScopedJoinHandle {
53 result: f(),
54 _marker: Default::default(),
55 }
56 }
57 }
58
59 /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
60 /// Note that this implementation will run the spawned functions immediately.
61 pub fn threads<'env, F, R>(f: F) -> R
62 where
fe692bf9 63 F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
0a29b90c
FG
64 {
65 f(&Scope {
fe692bf9
FG
66 _scope: Default::default(),
67 _env: Default::default(),
0a29b90c
FG
68 })
69 }
70
71 /// A handle that can be used to join its scoped thread.
72 ///
73 /// This struct is created by the [`Scope::spawn`] method and the
74 /// [`ScopedThreadBuilder::spawn`] method.
75 pub struct ScopedJoinHandle<'scope, T> {
76 /// Holds the result of the inner closure.
77 result: T,
78 _marker: std::marker::PhantomData<&'scope mut &'scope ()>,
79 }
80
81 impl<T> ScopedJoinHandle<'_, T> {
82 pub fn join(self) -> std::thread::Result<T> {
83 Ok(self.result)
84 }
fe692bf9
FG
85 pub fn is_finished(&self) -> bool {
86 true
87 }
0a29b90c
FG
88 }
89
90 /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
91 /// This is only good for operations where near-random access isn't detrimental, so it's not usually great
92 /// for file-io as it won't make use of sorted inputs well.
93 // TODO: better docs
94 pub fn in_parallel_with_slice<I, S, R, E>(
95 input: &mut [I],
96 _thread_limit: Option<usize>,
781aab86 97 new_thread_state: impl FnOnce(usize) -> S + Clone,
fe692bf9 98 mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
0a29b90c
FG
99 mut periodic: impl FnMut() -> Option<std::time::Duration>,
100 state_to_rval: impl FnOnce(S) -> R + Clone,
101 ) -> Result<Vec<R>, E> {
102 let mut state = new_thread_state(0);
fe692bf9
FG
103 let should_interrupt = &AtomicBool::default();
104 let threads_left = &AtomicIsize::default();
0a29b90c 105 for item in input {
fe692bf9 106 consume(item, &mut state, threads_left, should_interrupt)?;
0a29b90c
FG
107 if periodic().is_none() {
108 break;
109 }
110 }
111 Ok(vec![state_to_rval(state)])
112 }
113}
114
115#[cfg(not(feature = "parallel"))]
116pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};
117
118/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
119/// whose task is to aggregate these outputs into the final result returned by this function.
120///
121/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
122/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
123/// * For `reducer`, see the [`Reduce`] trait
124/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
125/// similar to the parallel version.
126///
127/// **This serial version performing all calculations on the current thread.**
128pub fn in_parallel<I, S, O, R>(
129 input: impl Iterator<Item = I>,
130 _thread_limit: Option<usize>,
781aab86 131 new_thread_state: impl FnOnce(usize) -> S,
fe692bf9 132 mut consume: impl FnMut(I, &mut S) -> O,
0a29b90c
FG
133 mut reducer: R,
134) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
135where
136 R: Reduce<Input = O>,
137{
138 let mut state = new_thread_state(0);
139 for item in input {
140 drop(reducer.feed(consume(item, &mut state))?);
141 }
142 reducer.finalize()
143}
781aab86
FG
144
145/// Read items from `input` and `consume` them in multiple threads,
146/// whose output output is collected by a `reducer`. Its task is to
147/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
148/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
149///
150/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
151/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
152/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
153/// created by `new_thread_state(…)`.
154/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
155/// * For `reducer`, see the [`Reduce`] trait
156#[cfg(not(feature = "parallel"))]
157pub fn in_parallel_with_finalize<I, S, O, R>(
158 input: impl Iterator<Item = I>,
159 _thread_limit: Option<usize>,
160 new_thread_state: impl FnOnce(usize) -> S,
161 mut consume: impl FnMut(I, &mut S) -> O,
162 finalize: impl FnOnce(S) -> O + Send + Clone,
163 mut reducer: R,
164) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
165where
166 R: Reduce<Input = O>,
167{
168 let mut state = new_thread_state(0);
169 for item in input {
170 drop(reducer.feed(consume(item, &mut state))?);
171 }
172 reducer.feed(finalize(state))?;
173 reducer.finalize()
174}