1 use crate::latch
::Latch
;
5 use crossbeam_deque
::{Injector, Steal}
;
7 use std
::cell
::UnsafeCell
;
11 pub(super) enum JobResult
<T
> {
14 Panic(Box
<dyn Any
+ Send
>),
17 /// A `Job` is used to advertise work for other threads that they may
18 /// want to steal. In accordance with time honored tradition, jobs are
19 /// arranged in a deque, so that thieves can take from the top of the
20 /// deque while the main worker manages the bottom of the deque. This
21 /// deque is managed by the `thread_pool` module.
22 pub(super) trait Job
{
23 /// Unsafe: this may be called from a different thread than the one
24 /// which scheduled the job, so the implementer must ensure the
25 /// appropriate traits are met, whether `Send`, `Sync`, or both.
26 unsafe fn execute(this
: *const ());
29 /// Effectively a Job trait object. Each JobRef **must** be executed
30 /// exactly once, or else data may leak.
32 /// Internally, we store the job's data in a `*const ()` pointer. The
33 /// true type is something like `*const StackJob<...>`, but we hide
34 /// it. We also carry the "execute fn" from the `Job` trait.
35 pub(super) struct JobRef
{
37 execute_fn
: unsafe fn(*const ()),
40 unsafe impl Send
for JobRef {}
41 unsafe impl Sync
for JobRef {}
44 /// Unsafe: caller asserts that `data` will remain valid until the
46 pub(super) unsafe fn new
<T
>(data
: *const T
) -> JobRef
52 pointer
: data
as *const (),
53 execute_fn
: <T
as Job
>::execute
,
57 /// Returns an opaque handle that can be saved and compared,
58 /// without making `JobRef` itself `Copy + Eq`.
60 pub(super) fn id(&self) -> impl Eq
{
61 (self.pointer
, self.execute_fn
)
65 pub(super) unsafe fn execute(self) {
66 (self.execute_fn
)(self.pointer
)
70 /// A job that will be owned by a stack slot. This means that when it
71 /// executes it need not free any heap data, the cleanup occurs when
72 /// the stack frame is later popped. The function parameter indicates
73 /// `true` if the job was stolen -- executed on a different thread.
74 pub(super) struct StackJob
<L
, F
, R
>
77 F
: FnOnce(bool
) -> R
+ Send
,
81 func
: UnsafeCell
<Option
<F
>>,
82 result
: UnsafeCell
<JobResult
<R
>>,
86 impl<L
, F
, R
> StackJob
<L
, F
, R
>
89 F
: FnOnce(bool
) -> R
+ Send
,
92 pub(super) fn new(tlv
: Tlv
, func
: F
, latch
: L
) -> StackJob
<L
, F
, R
> {
95 func
: UnsafeCell
::new(Some(func
)),
96 result
: UnsafeCell
::new(JobResult
::None
),
101 pub(super) unsafe fn as_job_ref(&self) -> JobRef
{
105 pub(super) unsafe fn run_inline(self, stolen
: bool
) -> R
{
106 self.func
.into_inner().unwrap()(stolen
)
109 pub(super) unsafe fn into_result(self) -> R
{
110 self.result
.into_inner().into_return_value()
114 impl<L
, F
, R
> Job
for StackJob
<L
, F
, R
>
117 F
: FnOnce(bool
) -> R
+ Send
,
120 unsafe fn execute(this
: *const ()) {
121 let this
= &*(this
as *const Self);
123 let abort
= unwind
::AbortIfPanic
;
124 let func
= (*this
.func
.get()).take().unwrap();
125 (*this
.result
.get()) = JobResult
::call(func
);
126 Latch
::set(&this
.latch
);
131 /// Represents a job stored in the heap. Used to implement
132 /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
133 /// invokes a closure, which then triggers the appropriate logic to
134 /// signal that the job executed.
136 /// (Probably `StackJob` should be refactored in a similar fashion.)
137 pub(super) struct HeapJob
<BODY
>
139 BODY
: FnOnce() + Send
,
145 impl<BODY
> HeapJob
<BODY
>
147 BODY
: FnOnce() + Send
,
149 pub(super) fn new(tlv
: Tlv
, job
: BODY
) -> Box
<Self> {
150 Box
::new(HeapJob { job, tlv }
)
153 /// Creates a `JobRef` from this job -- note that this hides all
154 /// lifetimes, so it is up to you to ensure that this JobRef
155 /// doesn't outlive any data that it closes over.
156 pub(super) unsafe fn into_job_ref(self: Box
<Self>) -> JobRef
{
157 JobRef
::new(Box
::into_raw(self))
160 /// Creates a static `JobRef` from this job.
161 pub(super) fn into_static_job_ref(self: Box
<Self>) -> JobRef
165 unsafe { self.into_job_ref() }
169 impl<BODY
> Job
for HeapJob
<BODY
>
171 BODY
: FnOnce() + Send
,
173 unsafe fn execute(this
: *const ()) {
174 let this
= Box
::from_raw(this
as *mut Self);
180 /// Represents a job stored in an `Arc` -- like `HeapJob`, but may
181 /// be turned into multiple `JobRef`s and called multiple times.
182 pub(super) struct ArcJob
<BODY
>
184 BODY
: Fn() + Send
+ Sync
,
189 impl<BODY
> ArcJob
<BODY
>
191 BODY
: Fn() + Send
+ Sync
,
193 pub(super) fn new(job
: BODY
) -> Arc
<Self> {
194 Arc
::new(ArcJob { job }
)
197 /// Creates a `JobRef` from this job -- note that this hides all
198 /// lifetimes, so it is up to you to ensure that this JobRef
199 /// doesn't outlive any data that it closes over.
200 pub(super) unsafe fn as_job_ref(this
: &Arc
<Self>) -> JobRef
{
201 JobRef
::new(Arc
::into_raw(Arc
::clone(this
)))
204 /// Creates a static `JobRef` from this job.
205 pub(super) fn as_static_job_ref(this
: &Arc
<Self>) -> JobRef
209 unsafe { Self::as_job_ref(this) }
213 impl<BODY
> Job
for ArcJob
<BODY
>
215 BODY
: Fn() + Send
+ Sync
,
217 unsafe fn execute(this
: *const ()) {
218 let this
= Arc
::from_raw(this
as *mut Self);
223 impl<T
> JobResult
<T
> {
224 fn call(func
: impl FnOnce(bool
) -> T
) -> Self {
225 match unwind
::halt_unwinding(|| func(true)) {
226 Ok(x
) => JobResult
::Ok(x
),
227 Err(x
) => JobResult
::Panic(x
),
231 /// Convert the `JobResult` for a job that has finished (and hence
232 /// its JobResult is populated) into its return value.
234 /// NB. This will panic if the job panicked.
235 pub(super) fn into_return_value(self) -> T
{
237 JobResult
::None
=> unreachable
!(),
238 JobResult
::Ok(x
) => x
,
239 JobResult
::Panic(x
) => unwind
::resume_unwinding(x
),
244 /// Indirect queue to provide FIFO job priority.
245 pub(super) struct JobFifo
{
246 inner
: Injector
<JobRef
>,
250 pub(super) fn new() -> Self {
252 inner
: Injector
::new(),
256 pub(super) unsafe fn push(&self, job_ref
: JobRef
) -> JobRef
{
257 // A little indirection ensures that spawns are always prioritized in FIFO order. The
258 // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
259 // (FIFO), but either way they will end up popping from the front of this queue.
260 self.inner
.push(job_ref
);
265 impl Job
for JobFifo
{
266 unsafe fn execute(this
: *const ()) {
267 // We "execute" a queue by executing its first job, FIFO.
268 let this
= &*(this
as *const Self);
270 match this
.inner
.steal() {
271 Steal
::Success(job_ref
) => break job_ref
.execute(),
272 Steal
::Empty
=> panic
!("FIFO is empty"),