1 //! Code that decides when workers should go to sleep. See README.md
5 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
6 use std
::sync
::{Condvar, Mutex}
;
16 const AWAKE
: usize = 0;
17 const SLEEPING
: usize = 1;
19 const ROUNDS_UNTIL_SLEEPY
: usize = 32;
20 const ROUNDS_UNTIL_ASLEEP
: usize = 64;
23 pub fn new() -> Sleep
{
25 state
: AtomicUsize
::new(AWAKE
),
27 tickle
: Condvar
::new(),
31 fn anyone_sleeping(&self, state
: usize) -> bool
{
35 fn any_worker_is_sleepy(&self, state
: usize) -> bool
{
39 fn worker_is_sleepy(&self, state
: usize, worker_index
: usize) -> bool
{
40 (state
>> 1) == (worker_index
+ 1)
43 fn with_sleepy_worker(&self, state
: usize, worker_index
: usize) -> usize {
44 debug_assert
!(state
== AWAKE
|| state
== SLEEPING
);
45 ((worker_index
+ 1) << 1) + state
49 pub fn work_found(&self, worker_index
: usize, yields
: usize) -> usize {
54 if yields
> ROUNDS_UNTIL_SLEEPY
{
55 // FIXME tickling here is a bit extreme; mostly we want to "release the lock"
56 // from us being sleepy, we don't necessarily need to wake others
58 self.tickle(worker_index
);
64 pub fn no_work_found(&self, worker_index
: usize, yields
: usize) -> usize {
69 if yields
< ROUNDS_UNTIL_SLEEPY
{
72 } else if yields
== ROUNDS_UNTIL_SLEEPY
{
74 if self.get_sleepy(worker_index
) {
79 } else if yields
< ROUNDS_UNTIL_ASLEEP
{
81 if self.still_sleepy(worker_index
) {
84 log
!(GotInterrupted { worker: worker_index }
);
88 debug_assert_eq
!(yields
, ROUNDS_UNTIL_ASLEEP
);
89 self.sleep(worker_index
);
94 pub fn tickle(&self, worker_index
: usize) {
95 // As described in README.md, this load must be SeqCst so as to ensure that:
96 // - if anyone is sleepy or asleep, we *definitely* see that now (and not eventually);
97 // - if anyone after us becomes sleepy or asleep, they see memory events that
98 // precede the call to `tickle()`, even though we did not do a write.
99 let old_state
= self.state
.load(Ordering
::SeqCst
);
100 if old_state
!= AWAKE
{
101 self.tickle_cold(worker_index
);
106 fn tickle_cold(&self, worker_index
: usize) {
107 // The `Release` ordering here suffices. The reasoning is that
108 // the atomic's own natural ordering ensure that any attempt
109 // to become sleepy/asleep either will come before/after this
110 // swap. If it comes *after*, then Release is good because we
111 // want it to see the action that generated this tickle. If it
112 // comes *before*, then we will see it here (but not other
113 // memory writes from that thread). If the other worker was
114 // becoming sleepy, the other writes don't matter. If they
115 // were were going to sleep, we will acquire lock and hence
116 // acquire their reads.
117 let old_state
= self.state
.swap(AWAKE
, Ordering
::Release
);
119 worker
: worker_index
,
120 old_state
: old_state
,
122 if self.anyone_sleeping(old_state
) {
123 let _data
= self.data
.lock().unwrap();
124 self.tickle
.notify_all();
128 fn get_sleepy(&self, worker_index
: usize) -> bool
{
130 // Acquire ordering suffices here. If some other worker
131 // was sleepy but no longer is, we will eventually see
132 // that, and until then it doesn't hurt to spin.
133 // Otherwise, we will do a compare-exchange which will
134 // assert a stronger order and acquire any reads etc that
136 let state
= self.state
.load(Ordering
::Acquire
);
138 worker
: worker_index
,
141 if self.any_worker_is_sleepy(state
) {
142 // somebody else is already sleepy, so we'll just wait our turn
143 debug_assert
!(!self.worker_is_sleepy(state
, worker_index
),
144 "worker {} called `is_sleepy()`, \
145 but they are already sleepy (state={})",
150 // make ourselves the sleepy one
151 let new_state
= self.with_sleepy_worker(state
, worker_index
);
153 // This must be SeqCst on success because we want to
156 // - That we observe any writes that preceded
157 // some prior tickle, and that tickle may have only
158 // done a SeqCst load on `self.state`.
159 // - That any subsequent tickle *definitely* sees this store.
161 // See the section on "Ensuring Sequentially
162 // Consistency" in README.md for more details.
164 // The failure ordering doesn't matter since we are
165 // about to spin around and do a fresh load.
167 .compare_exchange(state
, new_state
, Ordering
::SeqCst
, Ordering
::Relaxed
)
170 worker
: worker_index
,
172 new_state
: new_state
,
180 fn still_sleepy(&self, worker_index
: usize) -> bool
{
181 let state
= self.state
.load(Ordering
::SeqCst
);
182 self.worker_is_sleepy(state
, worker_index
)
185 fn sleep(&self, worker_index
: usize) {
187 // Acquire here suffices. If we observe that the current worker is still
188 // sleepy, then in fact we know that no writes have occurred, and anyhow
189 // we are going to do a CAS which will synchronize.
191 // If we observe that the state has changed, it must be
192 // due to a tickle, and then the Acquire means we also see
193 // any events that occured before that.
194 let state
= self.state
.load(Ordering
::Acquire
);
195 if self.worker_is_sleepy(state
, worker_index
) {
196 // It is important that we hold the lock when we do
197 // the CAS. Otherwise, if we were to CAS first, then
198 // the following sequence of events could occur:
200 // - Thread A (us) sets state to SLEEPING.
201 // - Thread B sets state to AWAKE.
202 // - Thread C sets state to SLEEPY(C).
203 // - Thread C sets state to SLEEPING.
204 // - Thread A reawakens, acquires lock, and goes to sleep.
206 // Now we missed the wake-up from thread B! But since
207 // we have the lock when we set the state to sleeping,
208 // that cannot happen. Note that the swap `tickle()`
209 // is not part of the lock, though, so let's play that
214 // - A loads state and see SLEEPY(A)
215 // - B swaps to AWAKE.
216 // - A locks, fails CAS
220 // - A loads state and see SLEEPY(A)
221 // - A locks, performs CAS
222 // - B swaps to AWAKE.
223 // - A waits (releasing lock)
224 // - B locks, notifies
226 // In general, acquiring the lock inside the loop
227 // seems like it could lead to bad performance, but
228 // actually it should be ok. This is because the only
229 // reason for the `compare_exchange` to fail is if an
230 // awaken comes, in which case the next cycle around
231 // the loop will just return.
232 let data
= self.data
.lock().unwrap();
234 // This must be SeqCst on success because we want to
237 // - That we observe any writes that preceded
238 // some prior tickle, and that tickle may have only
239 // done a SeqCst load on `self.state`.
240 // - That any subsequent tickle *definitely* sees this store.
242 // See the section on "Ensuring Sequentially
243 // Consistency" in README.md for more details.
245 // The failure ordering doesn't matter since we are
246 // about to spin around and do a fresh load.
248 .compare_exchange(state
, SLEEPING
, Ordering
::SeqCst
, Ordering
::Relaxed
)
250 // Don't do this in a loop. If we do it in a loop, we need
251 // some way to distinguish the ABA scenario where the pool
252 // was awoken but before we could process it somebody went
253 // to sleep. Note that if we get a false wakeup it's not a
254 // problem for us, we'll just loop around and maybe get
256 log
!(FellAsleep { worker: worker_index }
);
257 let _
= self.tickle
.wait(data
).unwrap();
258 log
!(GotAwoken { worker: worker_index }
);
262 log
!(GotInterrupted { worker: worker_index }
);