]>
Commit | Line | Data |
---|---|---|
6a06907d XL |
1 | use crate::latch::Latch; |
2 | use crate::tlv; | |
353b0b11 | 3 | use crate::tlv::Tlv; |
6a06907d | 4 | use crate::unwind; |
5099ac24 | 5 | use crossbeam_deque::{Injector, Steal}; |
94b46f34 XL |
6 | use std::any::Any; |
7 | use std::cell::UnsafeCell; | |
8 | use std::mem; | |
353b0b11 | 9 | use std::sync::Arc; |
94b46f34 | 10 | |
e74abb32 | 11 | pub(super) enum JobResult<T> { |
94b46f34 XL |
12 | None, |
13 | Ok(T), | |
e74abb32 | 14 | Panic(Box<dyn Any + Send>), |
94b46f34 XL |
15 | } |
16 | ||
17 | /// A `Job` is used to advertise work for other threads that they may | |
18 | /// want to steal. In accordance with time honored tradition, jobs are | |
19 | /// arranged in a deque, so that thieves can take from the top of the | |
20 | /// deque while the main worker manages the bottom of the deque. This | |
21 | /// deque is managed by the `thread_pool` module. | |
e74abb32 | 22 | pub(super) trait Job { |
94b46f34 XL |
23 | /// Unsafe: this may be called from a different thread than the one |
24 | /// which scheduled the job, so the implementer must ensure the | |
25 | /// appropriate traits are met, whether `Send`, `Sync`, or both. | |
353b0b11 | 26 | unsafe fn execute(this: *const ()); |
94b46f34 XL |
27 | } |
28 | ||
29 | /// Effectively a Job trait object. Each JobRef **must** be executed | |
30 | /// exactly once, or else data may leak. | |
31 | /// | |
32 | /// Internally, we store the job's data in a `*const ()` pointer. The | |
33 | /// true type is something like `*const StackJob<...>`, but we hide | |
34 | /// it. We also carry the "execute fn" from the `Job` trait. | |
e74abb32 | 35 | pub(super) struct JobRef { |
94b46f34 XL |
36 | pointer: *const (), |
37 | execute_fn: unsafe fn(*const ()), | |
38 | } | |
39 | ||
40 | unsafe impl Send for JobRef {} | |
41 | unsafe impl Sync for JobRef {} | |
42 | ||
43 | impl JobRef { | |
44 | /// Unsafe: caller asserts that `data` will remain valid until the | |
45 | /// job is executed. | |
e74abb32 | 46 | pub(super) unsafe fn new<T>(data: *const T) -> JobRef |
532ac7d7 XL |
47 | where |
48 | T: Job, | |
94b46f34 | 49 | { |
94b46f34 | 50 | // erase types: |
94b46f34 | 51 | JobRef { |
e74abb32 | 52 | pointer: data as *const (), |
353b0b11 | 53 | execute_fn: <T as Job>::execute, |
94b46f34 XL |
54 | } |
55 | } | |
56 | ||
353b0b11 FG |
57 | /// Returns an opaque handle that can be saved and compared, |
58 | /// without making `JobRef` itself `Copy + Eq`. | |
59 | #[inline] | |
60 | pub(super) fn id(&self) -> impl Eq { | |
61 | (self.pointer, self.execute_fn) | |
62 | } | |
63 | ||
94b46f34 | 64 | #[inline] |
353b0b11 | 65 | pub(super) unsafe fn execute(self) { |
94b46f34 XL |
66 | (self.execute_fn)(self.pointer) |
67 | } | |
68 | } | |
69 | ||
70 | /// A job that will be owned by a stack slot. This means that when it | |
71 | /// executes it need not free any heap data, the cleanup occurs when | |
72 | /// the stack frame is later popped. The function parameter indicates | |
73 | /// `true` if the job was stolen -- executed on a different thread. | |
e74abb32 | 74 | pub(super) struct StackJob<L, F, R> |
532ac7d7 XL |
75 | where |
76 | L: Latch + Sync, | |
77 | F: FnOnce(bool) -> R + Send, | |
78 | R: Send, | |
94b46f34 | 79 | { |
e74abb32 | 80 | pub(super) latch: L, |
94b46f34 XL |
81 | func: UnsafeCell<Option<F>>, |
82 | result: UnsafeCell<JobResult<R>>, | |
353b0b11 | 83 | tlv: Tlv, |
94b46f34 XL |
84 | } |
85 | ||
86 | impl<L, F, R> StackJob<L, F, R> | |
532ac7d7 XL |
87 | where |
88 | L: Latch + Sync, | |
89 | F: FnOnce(bool) -> R + Send, | |
90 | R: Send, | |
94b46f34 | 91 | { |
353b0b11 | 92 | pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob<L, F, R> { |
94b46f34 | 93 | StackJob { |
e74abb32 | 94 | latch, |
94b46f34 XL |
95 | func: UnsafeCell::new(Some(func)), |
96 | result: UnsafeCell::new(JobResult::None), | |
48663c56 | 97 | tlv, |
94b46f34 XL |
98 | } |
99 | } | |
100 | ||
e74abb32 | 101 | pub(super) unsafe fn as_job_ref(&self) -> JobRef { |
94b46f34 XL |
102 | JobRef::new(self) |
103 | } | |
104 | ||
e74abb32 | 105 | pub(super) unsafe fn run_inline(self, stolen: bool) -> R { |
94b46f34 XL |
106 | self.func.into_inner().unwrap()(stolen) |
107 | } | |
108 | ||
e74abb32 | 109 | pub(super) unsafe fn into_result(self) -> R { |
94b46f34 XL |
110 | self.result.into_inner().into_return_value() |
111 | } | |
112 | } | |
113 | ||
114 | impl<L, F, R> Job for StackJob<L, F, R> | |
532ac7d7 XL |
115 | where |
116 | L: Latch + Sync, | |
117 | F: FnOnce(bool) -> R + Send, | |
118 | R: Send, | |
94b46f34 | 119 | { |
353b0b11 FG |
120 | unsafe fn execute(this: *const ()) { |
121 | let this = &*(this as *const Self); | |
94b46f34 XL |
122 | tlv::set(this.tlv); |
123 | let abort = unwind::AbortIfPanic; | |
124 | let func = (*this.func.get()).take().unwrap(); | |
353b0b11 FG |
125 | (*this.result.get()) = JobResult::call(func); |
126 | Latch::set(&this.latch); | |
94b46f34 XL |
127 | mem::forget(abort); |
128 | } | |
129 | } | |
130 | ||
131 | /// Represents a job stored in the heap. Used to implement | |
132 | /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply | |
133 | /// invokes a closure, which then triggers the appropriate logic to | |
134 | /// signal that the job executed. | |
135 | /// | |
136 | /// (Probably `StackJob` should be refactored in a similar fashion.) | |
e74abb32 | 137 | pub(super) struct HeapJob<BODY> |
532ac7d7 XL |
138 | where |
139 | BODY: FnOnce() + Send, | |
94b46f34 | 140 | { |
353b0b11 FG |
141 | job: BODY, |
142 | tlv: Tlv, | |
94b46f34 XL |
143 | } |
144 | ||
145 | impl<BODY> HeapJob<BODY> | |
532ac7d7 XL |
146 | where |
147 | BODY: FnOnce() + Send, | |
94b46f34 | 148 | { |
353b0b11 FG |
149 | pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> { |
150 | Box::new(HeapJob { job, tlv }) | |
94b46f34 XL |
151 | } |
152 | ||
153 | /// Creates a `JobRef` from this job -- note that this hides all | |
154 | /// lifetimes, so it is up to you to ensure that this JobRef | |
155 | /// doesn't outlive any data that it closes over. | |
353b0b11 FG |
156 | pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef { |
157 | JobRef::new(Box::into_raw(self)) | |
158 | } | |
159 | ||
160 | /// Creates a static `JobRef` from this job. | |
161 | pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef | |
162 | where | |
163 | BODY: 'static, | |
164 | { | |
165 | unsafe { self.into_job_ref() } | |
94b46f34 XL |
166 | } |
167 | } | |
168 | ||
169 | impl<BODY> Job for HeapJob<BODY> | |
532ac7d7 XL |
170 | where |
171 | BODY: FnOnce() + Send, | |
94b46f34 | 172 | { |
353b0b11 FG |
173 | unsafe fn execute(this: *const ()) { |
174 | let this = Box::from_raw(this as *mut Self); | |
94b46f34 | 175 | tlv::set(this.tlv); |
353b0b11 FG |
176 | (this.job)(); |
177 | } | |
178 | } | |
179 | ||
180 | /// Represents a job stored in an `Arc` -- like `HeapJob`, but may | |
181 | /// be turned into multiple `JobRef`s and called multiple times. | |
182 | pub(super) struct ArcJob<BODY> | |
183 | where | |
184 | BODY: Fn() + Send + Sync, | |
185 | { | |
186 | job: BODY, | |
187 | } | |
188 | ||
189 | impl<BODY> ArcJob<BODY> | |
190 | where | |
191 | BODY: Fn() + Send + Sync, | |
192 | { | |
193 | pub(super) fn new(job: BODY) -> Arc<Self> { | |
194 | Arc::new(ArcJob { job }) | |
195 | } | |
196 | ||
197 | /// Creates a `JobRef` from this job -- note that this hides all | |
198 | /// lifetimes, so it is up to you to ensure that this JobRef | |
199 | /// doesn't outlive any data that it closes over. | |
200 | pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef { | |
201 | JobRef::new(Arc::into_raw(Arc::clone(this))) | |
202 | } | |
203 | ||
204 | /// Creates a static `JobRef` from this job. | |
205 | pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef | |
206 | where | |
207 | BODY: 'static, | |
208 | { | |
209 | unsafe { Self::as_job_ref(this) } | |
210 | } | |
211 | } | |
212 | ||
213 | impl<BODY> Job for ArcJob<BODY> | |
214 | where | |
215 | BODY: Fn() + Send + Sync, | |
216 | { | |
217 | unsafe fn execute(this: *const ()) { | |
218 | let this = Arc::from_raw(this as *mut Self); | |
219 | (this.job)(); | |
94b46f34 XL |
220 | } |
221 | } | |
222 | ||
223 | impl<T> JobResult<T> { | |
353b0b11 FG |
224 | fn call(func: impl FnOnce(bool) -> T) -> Self { |
225 | match unwind::halt_unwinding(|| func(true)) { | |
226 | Ok(x) => JobResult::Ok(x), | |
227 | Err(x) => JobResult::Panic(x), | |
228 | } | |
229 | } | |
230 | ||
94b46f34 XL |
231 | /// Convert the `JobResult` for a job that has finished (and hence |
232 | /// its JobResult is populated) into its return value. | |
233 | /// | |
234 | /// NB. This will panic if the job panicked. | |
e74abb32 | 235 | pub(super) fn into_return_value(self) -> T { |
94b46f34 XL |
236 | match self { |
237 | JobResult::None => unreachable!(), | |
238 | JobResult::Ok(x) => x, | |
239 | JobResult::Panic(x) => unwind::resume_unwinding(x), | |
240 | } | |
241 | } | |
242 | } | |
e74abb32 XL |
243 | |
244 | /// Indirect queue to provide FIFO job priority. | |
245 | pub(super) struct JobFifo { | |
5099ac24 | 246 | inner: Injector<JobRef>, |
e74abb32 XL |
247 | } |
248 | ||
249 | impl JobFifo { | |
250 | pub(super) fn new() -> Self { | |
251 | JobFifo { | |
5099ac24 | 252 | inner: Injector::new(), |
e74abb32 XL |
253 | } |
254 | } | |
255 | ||
256 | pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { | |
257 | // A little indirection ensures that spawns are always prioritized in FIFO order. The | |
258 | // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front | |
259 | // (FIFO), but either way they will end up popping from the front of this queue. | |
260 | self.inner.push(job_ref); | |
261 | JobRef::new(self) | |
262 | } | |
263 | } | |
264 | ||
265 | impl Job for JobFifo { | |
353b0b11 | 266 | unsafe fn execute(this: *const ()) { |
e74abb32 | 267 | // We "execute" a queue by executing its first job, FIFO. |
353b0b11 | 268 | let this = &*(this as *const Self); |
5099ac24 | 269 | loop { |
353b0b11 | 270 | match this.inner.steal() { |
5099ac24 FG |
271 | Steal::Success(job_ref) => break job_ref.execute(), |
272 | Steal::Empty => panic!("FIFO is empty"), | |
273 | Steal::Retry => {} | |
274 | } | |
275 | } | |
e74abb32 XL |
276 | } |
277 | } |