]> git.proxmox.com Git - rustc.git/blob - vendor/gix-features/src/parallel/serial.rs
New upstream version 1.70.0+dfsg2
[rustc.git] / vendor / gix-features / src / parallel / serial.rs
1 use crate::parallel::Reduce;
2
3 #[cfg(not(feature = "parallel"))]
4 mod not_parallel {
5 /// Runs `left` and then `right`, one after another, returning their output when both are done.
6 pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
7 (left(), right())
8 }
9
10 /// A scope for spawning threads.
11 pub struct Scope<'env> {
12 _marker: std::marker::PhantomData<&'env mut &'env ()>,
13 }
14
15 pub struct ThreadBuilder;
16
17 /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
18 pub fn build_thread() -> ThreadBuilder {
19 ThreadBuilder
20 }
21
22 #[allow(unsafe_code)]
23 unsafe impl Sync for Scope<'_> {}
24
25 impl ThreadBuilder {
26 pub fn name(self, _new: String) -> Self {
27 self
28 }
29 pub fn spawn_scoped<'a, 'env, F, T>(
30 &self,
31 scope: &'a Scope<'env>,
32 f: F,
33 ) -> std::io::Result<ScopedJoinHandle<'a, T>>
34 where
35 F: FnOnce() -> T,
36 F: Send + 'env,
37 T: Send + 'env,
38 {
39 Ok(scope.spawn(f))
40 }
41 }
42
43 impl<'env> Scope<'env> {
44 pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
45 where
46 F: FnOnce() -> T,
47 F: Send + 'env,
48 T: Send + 'env,
49 {
50 ScopedJoinHandle {
51 result: f(),
52 _marker: Default::default(),
53 }
54 }
55 }
56
57 /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
58 /// Note that this implementation will run the spawned functions immediately.
59 pub fn threads<'env, F, R>(f: F) -> R
60 where
61 F: FnOnce(&Scope<'env>) -> R,
62 {
63 f(&Scope {
64 _marker: Default::default(),
65 })
66 }
67
68 /// A handle that can be used to join its scoped thread.
69 ///
70 /// This struct is created by the [`Scope::spawn`] method and the
71 /// [`ScopedThreadBuilder::spawn`] method.
72 pub struct ScopedJoinHandle<'scope, T> {
73 /// Holds the result of the inner closure.
74 result: T,
75 _marker: std::marker::PhantomData<&'scope mut &'scope ()>,
76 }
77
78 impl<T> ScopedJoinHandle<'_, T> {
79 pub fn join(self) -> std::thread::Result<T> {
80 Ok(self.result)
81 }
82 }
83
84 /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
85 /// This is only good for operations where near-random access isn't detrimental, so it's not usually great
86 /// for file-io as it won't make use of sorted inputs well.
87 // TODO: better docs
88 pub fn in_parallel_with_slice<I, S, R, E>(
89 input: &mut [I],
90 _thread_limit: Option<usize>,
91 mut new_thread_state: impl FnMut(usize) -> S + Clone,
92 mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone,
93 mut periodic: impl FnMut() -> Option<std::time::Duration>,
94 state_to_rval: impl FnOnce(S) -> R + Clone,
95 ) -> Result<Vec<R>, E> {
96 let mut state = new_thread_state(0);
97 for item in input {
98 consume(item, &mut state)?;
99 if periodic().is_none() {
100 break;
101 }
102 }
103 Ok(vec![state_to_rval(state)])
104 }
105 }
106
107 #[cfg(not(feature = "parallel"))]
108 pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};
109
110 /// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
111 /// whose task is to aggregate these outputs into the final result returned by this function.
112 ///
113 /// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
114 /// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
115 /// * For `reducer`, see the [`Reduce`] trait
116 /// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
117 /// similar to the parallel version.
118 ///
119 /// **This serial version performing all calculations on the current thread.**
120 pub fn in_parallel<I, S, O, R>(
121 input: impl Iterator<Item = I>,
122 _thread_limit: Option<usize>,
123 new_thread_state: impl Fn(usize) -> S,
124 consume: impl Fn(I, &mut S) -> O,
125 mut reducer: R,
126 ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
127 where
128 R: Reduce<Input = O>,
129 {
130 let mut state = new_thread_state(0);
131 for item in input {
132 drop(reducer.feed(consume(item, &mut state))?);
133 }
134 reducer.finalize()
135 }