]>
Commit | Line | Data |
---|---|---|
0a29b90c FG |
1 | use crate::parallel::Reduce; |
2 | ||
3 | #[cfg(not(feature = "parallel"))] | |
4 | mod not_parallel { | |
fe692bf9 FG |
5 | use std::sync::atomic::{AtomicBool, AtomicIsize}; |
6 | ||
0a29b90c FG |
7 | /// Runs `left` and then `right`, one after another, returning their output when both are done. |
8 | pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { | |
9 | (left(), right()) | |
10 | } | |
11 | ||
12 | /// A scope for spawning threads. | |
fe692bf9 FG |
13 | pub struct Scope<'scope, 'env: 'scope> { |
14 | _scope: std::marker::PhantomData<&'scope mut &'scope ()>, | |
15 | _env: std::marker::PhantomData<&'env mut &'env ()>, | |
0a29b90c FG |
16 | } |
17 | ||
18 | pub struct ThreadBuilder; | |
19 | ||
20 | /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning. | |
21 | pub fn build_thread() -> ThreadBuilder { | |
22 | ThreadBuilder | |
23 | } | |
24 | ||
25 | #[allow(unsafe_code)] | |
fe692bf9 | 26 | unsafe impl Sync for Scope<'_, '_> {} |
0a29b90c FG |
27 | |
28 | impl ThreadBuilder { | |
29 | pub fn name(self, _new: String) -> Self { | |
30 | self | |
31 | } | |
fe692bf9 | 32 | pub fn spawn_scoped<'scope, 'env, F, T>( |
0a29b90c | 33 | &self, |
fe692bf9 | 34 | scope: &'scope Scope<'scope, 'env>, |
0a29b90c | 35 | f: F, |
fe692bf9 | 36 | ) -> std::io::Result<ScopedJoinHandle<'scope, T>> |
0a29b90c | 37 | where |
fe692bf9 FG |
38 | F: FnOnce() -> T + 'scope, |
39 | T: 'scope, | |
0a29b90c FG |
40 | { |
41 | Ok(scope.spawn(f)) | |
42 | } | |
43 | } | |
44 | ||
fe692bf9 FG |
45 | impl<'scope, 'env> Scope<'scope, 'env> { |
46 | /// Provided with this scope, let `f` start new threads that live within it. | |
47 | pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> | |
0a29b90c | 48 | where |
fe692bf9 FG |
49 | F: FnOnce() -> T + 'scope, |
50 | T: 'scope, | |
0a29b90c FG |
51 | { |
52 | ScopedJoinHandle { | |
53 | result: f(), | |
54 | _marker: Default::default(), | |
55 | } | |
56 | } | |
57 | } | |
58 | ||
59 | /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call. | |
60 | /// Note that this implementation will run the spawned functions immediately. | |
61 | pub fn threads<'env, F, R>(f: F) -> R | |
62 | where | |
fe692bf9 | 63 | F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R, |
0a29b90c FG |
64 | { |
65 | f(&Scope { | |
fe692bf9 FG |
66 | _scope: Default::default(), |
67 | _env: Default::default(), | |
0a29b90c FG |
68 | }) |
69 | } | |
70 | ||
71 | /// A handle that can be used to join its scoped thread. | |
72 | /// | |
73 | /// This struct is created by the [`Scope::spawn`] method and the | |
74 | /// [`ScopedThreadBuilder::spawn`] method. | |
75 | pub struct ScopedJoinHandle<'scope, T> { | |
76 | /// Holds the result of the inner closure. | |
77 | result: T, | |
78 | _marker: std::marker::PhantomData<&'scope mut &'scope ()>, | |
79 | } | |
80 | ||
81 | impl<T> ScopedJoinHandle<'_, T> { | |
82 | pub fn join(self) -> std::thread::Result<T> { | |
83 | Ok(self.result) | |
84 | } | |
fe692bf9 FG |
85 | pub fn is_finished(&self) -> bool { |
86 | true | |
87 | } | |
0a29b90c FG |
88 | } |
89 | ||
90 | /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. | |
91 | /// This is only good for operations where near-random access isn't detrimental, so it's not usually great | |
92 | /// for file-io as it won't make use of sorted inputs well. | |
93 | // TODO: better docs | |
94 | pub fn in_parallel_with_slice<I, S, R, E>( | |
95 | input: &mut [I], | |
96 | _thread_limit: Option<usize>, | |
781aab86 | 97 | new_thread_state: impl FnOnce(usize) -> S + Clone, |
fe692bf9 | 98 | mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, |
0a29b90c FG |
99 | mut periodic: impl FnMut() -> Option<std::time::Duration>, |
100 | state_to_rval: impl FnOnce(S) -> R + Clone, | |
101 | ) -> Result<Vec<R>, E> { | |
102 | let mut state = new_thread_state(0); | |
fe692bf9 FG |
103 | let should_interrupt = &AtomicBool::default(); |
104 | let threads_left = &AtomicIsize::default(); | |
0a29b90c | 105 | for item in input { |
fe692bf9 | 106 | consume(item, &mut state, threads_left, should_interrupt)?; |
0a29b90c FG |
107 | if periodic().is_none() { |
108 | break; | |
109 | } | |
110 | } | |
111 | Ok(vec![state_to_rval(state)]) | |
112 | } | |
113 | } | |
114 | ||
115 | #[cfg(not(feature = "parallel"))] | |
116 | pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle}; | |
117 | ||
118 | /// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`, | |
119 | /// whose task is to aggregate these outputs into the final result returned by this function. | |
120 | /// | |
121 | /// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` | |
122 | /// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state. | |
123 | /// * For `reducer`, see the [`Reduce`] trait | |
124 | /// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature | |
125 | /// similar to the parallel version. | |
126 | /// | |
127 | /// **This serial version performing all calculations on the current thread.** | |
128 | pub fn in_parallel<I, S, O, R>( | |
129 | input: impl Iterator<Item = I>, | |
130 | _thread_limit: Option<usize>, | |
781aab86 | 131 | new_thread_state: impl FnOnce(usize) -> S, |
fe692bf9 | 132 | mut consume: impl FnMut(I, &mut S) -> O, |
0a29b90c FG |
133 | mut reducer: R, |
134 | ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> | |
135 | where | |
136 | R: Reduce<Input = O>, | |
137 | { | |
138 | let mut state = new_thread_state(0); | |
139 | for item in input { | |
140 | drop(reducer.feed(consume(item, &mut state))?); | |
141 | } | |
142 | reducer.finalize() | |
143 | } | |
781aab86 FG |
144 | |
145 | /// Read items from `input` and `consume` them in multiple threads, | |
146 | /// whose output output is collected by a `reducer`. Its task is to | |
147 | /// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe. | |
148 | /// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier. | |
149 | /// | |
150 | /// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used. | |
151 | /// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` | |
152 | /// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially | |
153 | /// created by `new_thread_state(…)`. | |
154 | /// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`. | |
155 | /// * For `reducer`, see the [`Reduce`] trait | |
156 | #[cfg(not(feature = "parallel"))] | |
157 | pub fn in_parallel_with_finalize<I, S, O, R>( | |
158 | input: impl Iterator<Item = I>, | |
159 | _thread_limit: Option<usize>, | |
160 | new_thread_state: impl FnOnce(usize) -> S, | |
161 | mut consume: impl FnMut(I, &mut S) -> O, | |
162 | finalize: impl FnOnce(S) -> O + Send + Clone, | |
163 | mut reducer: R, | |
164 | ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> | |
165 | where | |
166 | R: Reduce<Input = O>, | |
167 | { | |
168 | let mut state = new_thread_state(0); | |
169 | for item in input { | |
170 | drop(reducer.feed(consume(item, &mut state))?); | |
171 | } | |
172 | reducer.feed(finalize(state))?; | |
173 | reducer.finalize() | |
174 | } |