]>
Commit | Line | Data |
---|---|---|
6a06907d XL |
1 | use crate::latch::Latch; |
2 | use crate::tlv; | |
3 | use crate::unwind; | |
e74abb32 | 4 | use crossbeam_queue::SegQueue; |
94b46f34 XL |
5 | use std::any::Any; |
6 | use std::cell::UnsafeCell; | |
7 | use std::mem; | |
94b46f34 | 8 | |
e74abb32 | 9 | pub(super) enum JobResult<T> { |
94b46f34 XL |
10 | None, |
11 | Ok(T), | |
e74abb32 | 12 | Panic(Box<dyn Any + Send>), |
94b46f34 XL |
13 | } |
14 | ||
15 | /// A `Job` is used to advertise work for other threads that they may | |
16 | /// want to steal. In accordance with time honored tradition, jobs are | |
17 | /// arranged in a deque, so that thieves can take from the top of the | |
18 | /// deque while the main worker manages the bottom of the deque. This | |
19 | /// deque is managed by the `thread_pool` module. | |
e74abb32 | 20 | pub(super) trait Job { |
94b46f34 XL |
21 | /// Unsafe: this may be called from a different thread than the one |
22 | /// which scheduled the job, so the implementer must ensure the | |
23 | /// appropriate traits are met, whether `Send`, `Sync`, or both. | |
24 | unsafe fn execute(this: *const Self); | |
25 | } | |
26 | ||
27 | /// Effectively a Job trait object. Each JobRef **must** be executed | |
28 | /// exactly once, or else data may leak. | |
29 | /// | |
30 | /// Internally, we store the job's data in a `*const ()` pointer. The | |
31 | /// true type is something like `*const StackJob<...>`, but we hide | |
32 | /// it. We also carry the "execute fn" from the `Job` trait. | |
33 | #[derive(Copy, Clone, Debug, PartialEq, Eq)] | |
e74abb32 | 34 | pub(super) struct JobRef { |
94b46f34 XL |
35 | pointer: *const (), |
36 | execute_fn: unsafe fn(*const ()), | |
37 | } | |
38 | ||
39 | unsafe impl Send for JobRef {} | |
40 | unsafe impl Sync for JobRef {} | |
41 | ||
42 | impl JobRef { | |
43 | /// Unsafe: caller asserts that `data` will remain valid until the | |
44 | /// job is executed. | |
e74abb32 | 45 | pub(super) unsafe fn new<T>(data: *const T) -> JobRef |
532ac7d7 XL |
46 | where |
47 | T: Job, | |
94b46f34 XL |
48 | { |
49 | let fn_ptr: unsafe fn(*const T) = <T as Job>::execute; | |
50 | ||
51 | // erase types: | |
94b46f34 | 52 | JobRef { |
e74abb32 XL |
53 | pointer: data as *const (), |
54 | execute_fn: mem::transmute(fn_ptr), | |
94b46f34 XL |
55 | } |
56 | } | |
57 | ||
58 | #[inline] | |
e74abb32 | 59 | pub(super) unsafe fn execute(&self) { |
94b46f34 XL |
60 | (self.execute_fn)(self.pointer) |
61 | } | |
62 | } | |
63 | ||
64 | /// A job that will be owned by a stack slot. This means that when it | |
65 | /// executes it need not free any heap data, the cleanup occurs when | |
66 | /// the stack frame is later popped. The function parameter indicates | |
67 | /// `true` if the job was stolen -- executed on a different thread. | |
e74abb32 | 68 | pub(super) struct StackJob<L, F, R> |
532ac7d7 XL |
69 | where |
70 | L: Latch + Sync, | |
71 | F: FnOnce(bool) -> R + Send, | |
72 | R: Send, | |
94b46f34 | 73 | { |
e74abb32 | 74 | pub(super) latch: L, |
94b46f34 XL |
75 | func: UnsafeCell<Option<F>>, |
76 | result: UnsafeCell<JobResult<R>>, | |
94b46f34 XL |
77 | tlv: usize, |
78 | } | |
79 | ||
80 | impl<L, F, R> StackJob<L, F, R> | |
532ac7d7 XL |
81 | where |
82 | L: Latch + Sync, | |
83 | F: FnOnce(bool) -> R + Send, | |
84 | R: Send, | |
94b46f34 | 85 | { |
e74abb32 | 86 | pub(super) fn new(tlv: usize, func: F, latch: L) -> StackJob<L, F, R> { |
94b46f34 | 87 | StackJob { |
e74abb32 | 88 | latch, |
94b46f34 XL |
89 | func: UnsafeCell::new(Some(func)), |
90 | result: UnsafeCell::new(JobResult::None), | |
48663c56 | 91 | tlv, |
94b46f34 XL |
92 | } |
93 | } | |
94 | ||
e74abb32 | 95 | pub(super) unsafe fn as_job_ref(&self) -> JobRef { |
94b46f34 XL |
96 | JobRef::new(self) |
97 | } | |
98 | ||
e74abb32 | 99 | pub(super) unsafe fn run_inline(self, stolen: bool) -> R { |
94b46f34 XL |
100 | self.func.into_inner().unwrap()(stolen) |
101 | } | |
102 | ||
e74abb32 | 103 | pub(super) unsafe fn into_result(self) -> R { |
94b46f34 XL |
104 | self.result.into_inner().into_return_value() |
105 | } | |
106 | } | |
107 | ||
108 | impl<L, F, R> Job for StackJob<L, F, R> | |
532ac7d7 XL |
109 | where |
110 | L: Latch + Sync, | |
111 | F: FnOnce(bool) -> R + Send, | |
112 | R: Send, | |
94b46f34 XL |
113 | { |
114 | unsafe fn execute(this: *const Self) { | |
e74abb32 XL |
115 | fn call<R>(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R { |
116 | move || func(true) | |
117 | } | |
118 | ||
94b46f34 | 119 | let this = &*this; |
94b46f34 XL |
120 | tlv::set(this.tlv); |
121 | let abort = unwind::AbortIfPanic; | |
122 | let func = (*this.func.get()).take().unwrap(); | |
e74abb32 | 123 | (*this.result.get()) = match unwind::halt_unwinding(call(func)) { |
94b46f34 XL |
124 | Ok(x) => JobResult::Ok(x), |
125 | Err(x) => JobResult::Panic(x), | |
126 | }; | |
127 | this.latch.set(); | |
128 | mem::forget(abort); | |
129 | } | |
130 | } | |
131 | ||
132 | /// Represents a job stored in the heap. Used to implement | |
133 | /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply | |
134 | /// invokes a closure, which then triggers the appropriate logic to | |
135 | /// signal that the job executed. | |
136 | /// | |
137 | /// (Probably `StackJob` should be refactored in a similar fashion.) | |
e74abb32 | 138 | pub(super) struct HeapJob<BODY> |
532ac7d7 XL |
139 | where |
140 | BODY: FnOnce() + Send, | |
94b46f34 XL |
141 | { |
142 | job: UnsafeCell<Option<BODY>>, | |
94b46f34 XL |
143 | tlv: usize, |
144 | } | |
145 | ||
146 | impl<BODY> HeapJob<BODY> | |
532ac7d7 XL |
147 | where |
148 | BODY: FnOnce() + Send, | |
94b46f34 | 149 | { |
e74abb32 | 150 | pub(super) fn new(tlv: usize, func: BODY) -> Self { |
94b46f34 XL |
151 | HeapJob { |
152 | job: UnsafeCell::new(Some(func)), | |
48663c56 | 153 | tlv, |
94b46f34 XL |
154 | } |
155 | } | |
156 | ||
157 | /// Creates a `JobRef` from this job -- note that this hides all | |
158 | /// lifetimes, so it is up to you to ensure that this JobRef | |
159 | /// doesn't outlive any data that it closes over. | |
e74abb32 | 160 | pub(super) unsafe fn as_job_ref(self: Box<Self>) -> JobRef { |
94b46f34 XL |
161 | let this: *const Self = mem::transmute(self); |
162 | JobRef::new(this) | |
163 | } | |
164 | } | |
165 | ||
166 | impl<BODY> Job for HeapJob<BODY> | |
532ac7d7 XL |
167 | where |
168 | BODY: FnOnce() + Send, | |
94b46f34 XL |
169 | { |
170 | unsafe fn execute(this: *const Self) { | |
171 | let this: Box<Self> = mem::transmute(this); | |
94b46f34 XL |
172 | tlv::set(this.tlv); |
173 | let job = (*this.job.get()).take().unwrap(); | |
174 | job(); | |
175 | } | |
176 | } | |
177 | ||
178 | impl<T> JobResult<T> { | |
179 | /// Convert the `JobResult` for a job that has finished (and hence | |
180 | /// its JobResult is populated) into its return value. | |
181 | /// | |
182 | /// NB. This will panic if the job panicked. | |
e74abb32 | 183 | pub(super) fn into_return_value(self) -> T { |
94b46f34 XL |
184 | match self { |
185 | JobResult::None => unreachable!(), | |
186 | JobResult::Ok(x) => x, | |
187 | JobResult::Panic(x) => unwind::resume_unwinding(x), | |
188 | } | |
189 | } | |
190 | } | |
e74abb32 XL |
191 | |
192 | /// Indirect queue to provide FIFO job priority. | |
193 | pub(super) struct JobFifo { | |
194 | inner: SegQueue<JobRef>, | |
195 | } | |
196 | ||
197 | impl JobFifo { | |
198 | pub(super) fn new() -> Self { | |
199 | JobFifo { | |
200 | inner: SegQueue::new(), | |
201 | } | |
202 | } | |
203 | ||
204 | pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { | |
205 | // A little indirection ensures that spawns are always prioritized in FIFO order. The | |
206 | // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front | |
207 | // (FIFO), but either way they will end up popping from the front of this queue. | |
208 | self.inner.push(job_ref); | |
209 | JobRef::new(self) | |
210 | } | |
211 | } | |
212 | ||
213 | impl Job for JobFifo { | |
214 | unsafe fn execute(this: *const Self) { | |
215 | // We "execute" a queue by executing its first job, FIFO. | |
216 | (*this).inner.pop().expect("job in fifo queue").execute() | |
217 | } | |
218 | } |