]>
Commit | Line | Data |
---|---|---|
416331ca XL |
1 | use std::fmt; |
2 | use std::marker::PhantomData; | |
3 | use std::sync::{Arc, Condvar, Mutex}; | |
4 | use std::sync::atomic::AtomicUsize; | |
5 | use std::sync::atomic::Ordering::SeqCst; | |
6 | use std::time::Duration; | |
7 | ||
8 | /// A thread parking primitive. | |
9 | /// | |
10 | /// Conceptually, each `Parker` has an associated token which is initially not present: | |
11 | /// | |
12 | /// * The [`park`] method blocks the current thread unless or until the token is available, at | |
13 | /// which point it automatically consumes the token. It may also return *spuriously*, without | |
14 | /// consuming the token. | |
15 | /// | |
16 | /// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum | |
17 | /// time. | |
18 | /// | |
19 | /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the | |
20 | /// token is initially absent, [`unpark`] followed by [`park`] will result in the second call | |
21 | /// returning immediately. | |
22 | /// | |
23 | /// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using | |
24 | /// [`park`] and [`unpark`]. | |
25 | /// | |
26 | /// # Examples | |
27 | /// | |
28 | /// ``` | |
29 | /// use std::thread; | |
30 | /// use std::time::Duration; | |
31 | /// use crossbeam_utils::sync::Parker; | |
32 | /// | |
33 | /// let mut p = Parker::new(); | |
34 | /// let u = p.unparker().clone(); | |
35 | /// | |
36 | /// // Make the token available. | |
37 | /// u.unpark(); | |
38 | /// // Wakes up immediately and consumes the token. | |
39 | /// p.park(); | |
40 | /// | |
41 | /// thread::spawn(move || { | |
42 | /// thread::sleep(Duration::from_millis(500)); | |
43 | /// u.unpark(); | |
44 | /// }); | |
45 | /// | |
46 | /// // Wakes up when `u.unpark()` provides the token, but may also wake up | |
47 | /// // spuriously before that without consuming the token. | |
48 | /// p.park(); | |
49 | /// ``` | |
50 | /// | |
51 | /// [`park`]: struct.Parker.html#method.park | |
52 | /// [`park_timeout`]: struct.Parker.html#method.park_timeout | |
53 | /// [`unpark`]: struct.Unparker.html#method.unpark | |
54 | pub struct Parker { | |
55 | unparker: Unparker, | |
56 | _marker: PhantomData<*const ()>, | |
57 | } | |
58 | ||
59 | unsafe impl Send for Parker {} | |
60 | ||
61 | impl Parker { | |
62 | /// Creates a new `Parker`. | |
63 | /// | |
64 | /// # Examples | |
65 | /// | |
66 | /// ``` | |
67 | /// use crossbeam_utils::sync::Parker; | |
68 | /// | |
69 | /// let p = Parker::new(); | |
70 | /// ``` | |
71 | /// | |
72 | pub fn new() -> Parker { | |
73 | Parker { | |
74 | unparker: Unparker { | |
75 | inner: Arc::new(Inner { | |
76 | state: AtomicUsize::new(EMPTY), | |
77 | lock: Mutex::new(()), | |
78 | cvar: Condvar::new(), | |
79 | }), | |
80 | }, | |
81 | _marker: PhantomData, | |
82 | } | |
83 | } | |
84 | ||
85 | /// Blocks the current thread until the token is made available. | |
86 | /// | |
87 | /// A call to `park` may wake up spuriously without consuming the token, and callers should be | |
88 | /// prepared for this possibility. | |
89 | /// | |
90 | /// # Examples | |
91 | /// | |
92 | /// ``` | |
93 | /// use crossbeam_utils::sync::Parker; | |
94 | /// | |
95 | /// let mut p = Parker::new(); | |
96 | /// let u = p.unparker().clone(); | |
97 | /// | |
98 | /// // Make the token available. | |
99 | /// u.unpark(); | |
100 | /// | |
101 | /// // Wakes up immediately and consumes the token. | |
102 | /// p.park(); | |
103 | /// ``` | |
104 | pub fn park(&self) { | |
105 | self.unparker.inner.park(None); | |
106 | } | |
107 | ||
108 | /// Blocks the current thread until the token is made available, but only for a limited time. | |
109 | /// | |
110 | /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers | |
111 | /// should be prepared for this possibility. | |
112 | /// | |
113 | /// # Examples | |
114 | /// | |
115 | /// ``` | |
116 | /// use std::time::Duration; | |
117 | /// use crossbeam_utils::sync::Parker; | |
118 | /// | |
119 | /// let mut p = Parker::new(); | |
120 | /// | |
121 | /// // Waits for the token to become available, but will not wait longer than 500 ms. | |
122 | /// p.park_timeout(Duration::from_millis(500)); | |
123 | /// ``` | |
124 | pub fn park_timeout(&self, timeout: Duration) { | |
125 | self.unparker.inner.park(Some(timeout)); | |
126 | } | |
127 | ||
128 | /// Returns a reference to an associated [`Unparker`]. | |
129 | /// | |
130 | /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. | |
131 | /// | |
132 | /// # Examples | |
133 | /// | |
134 | /// ``` | |
135 | /// use crossbeam_utils::sync::Parker; | |
136 | /// | |
137 | /// let mut p = Parker::new(); | |
138 | /// let u = p.unparker().clone(); | |
139 | /// | |
140 | /// // Make the token available. | |
141 | /// u.unpark(); | |
142 | /// // Wakes up immediately and consumes the token. | |
143 | /// p.park(); | |
144 | /// ``` | |
145 | /// | |
146 | /// [`park`]: struct.Parker.html#method.park | |
147 | /// [`park_timeout`]: struct.Parker.html#method.park_timeout | |
148 | /// | |
149 | /// [`Unparker`]: struct.Unparker.html | |
150 | pub fn unparker(&self) -> &Unparker { | |
151 | &self.unparker | |
152 | } | |
153 | } | |
154 | ||
155 | impl fmt::Debug for Parker { | |
156 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
157 | f.pad("Parker { .. }") | |
158 | } | |
159 | } | |
160 | ||
161 | /// Unparks a thread parked by the associated [`Parker`]. | |
162 | /// | |
163 | /// [`Parker`]: struct.Parker.html | |
164 | pub struct Unparker { | |
165 | inner: Arc<Inner>, | |
166 | } | |
167 | ||
168 | unsafe impl Send for Unparker {} | |
169 | unsafe impl Sync for Unparker {} | |
170 | ||
171 | impl Unparker { | |
172 | /// Atomically makes the token available if it is not already. | |
173 | /// | |
174 | /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is | |
175 | /// any. | |
176 | /// | |
177 | /// # Examples | |
178 | /// | |
179 | /// ``` | |
180 | /// use std::thread; | |
181 | /// use std::time::Duration; | |
182 | /// use crossbeam_utils::sync::Parker; | |
183 | /// | |
184 | /// let mut p = Parker::new(); | |
185 | /// let u = p.unparker().clone(); | |
186 | /// | |
187 | /// thread::spawn(move || { | |
188 | /// thread::sleep(Duration::from_millis(500)); | |
189 | /// u.unpark(); | |
190 | /// }); | |
191 | /// | |
192 | /// // Wakes up when `u.unpark()` provides the token, but may also wake up | |
193 | /// // spuriously before that without consuming the token. | |
194 | /// p.park(); | |
195 | /// ``` | |
196 | /// | |
197 | /// [`park`]: struct.Parker.html#method.park | |
198 | /// [`park_timeout`]: struct.Parker.html#method.park_timeout | |
199 | pub fn unpark(&self) { | |
200 | self.inner.unpark() | |
201 | } | |
202 | } | |
203 | ||
204 | impl fmt::Debug for Unparker { | |
205 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
206 | f.pad("Unparker { .. }") | |
207 | } | |
208 | } | |
209 | ||
210 | impl Clone for Unparker { | |
211 | fn clone(&self) -> Unparker { | |
212 | Unparker { | |
213 | inner: self.inner.clone(), | |
214 | } | |
215 | } | |
216 | } | |
217 | ||
218 | const EMPTY: usize = 0; | |
219 | const PARKED: usize = 1; | |
220 | const NOTIFIED: usize = 2; | |
221 | ||
222 | struct Inner { | |
223 | state: AtomicUsize, | |
224 | lock: Mutex<()>, | |
225 | cvar: Condvar, | |
226 | } | |
227 | ||
228 | impl Inner { | |
229 | fn park(&self, timeout: Option<Duration>) { | |
230 | // If we were previously notified then we consume this notification and return quickly. | |
231 | if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { | |
232 | return; | |
233 | } | |
234 | ||
235 | // If the timeout is zero, then there is no need to actually block. | |
236 | if let Some(ref dur) = timeout { | |
237 | if *dur == Duration::from_millis(0) { | |
238 | return; | |
239 | } | |
240 | } | |
241 | ||
242 | // Otherwise we need to coordinate going to sleep. | |
243 | let mut m = self.lock.lock().unwrap(); | |
244 | ||
245 | match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { | |
246 | Ok(_) => {} | |
247 | // Consume this notification to avoid spurious wakeups in the next park. | |
248 | Err(NOTIFIED) => { | |
249 | // We must read `state` here, even though we know it will be `NOTIFIED`. This is | |
250 | // because `unpark` may have been called again since we read `NOTIFIED` in the | |
251 | // `compare_exchange` above. We must perform an acquire operation that synchronizes | |
252 | // with that `unpark` to observe any writes it made before the call to `unpark`. To | |
253 | // do that we must read from the write it made to `state`. | |
254 | let old = self.state.swap(EMPTY, SeqCst); | |
255 | assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); | |
256 | return; | |
257 | } | |
258 | Err(n) => panic!("inconsistent park_timeout state: {}", n), | |
259 | } | |
260 | ||
261 | match timeout { | |
262 | None => { | |
263 | loop { | |
264 | // Block the current thread on the conditional variable. | |
265 | m = self.cvar.wait(m).unwrap(); | |
266 | ||
267 | match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { | |
268 | Ok(_) => return, // got a notification | |
269 | Err(_) => {} // spurious wakeup, go back to sleep | |
270 | } | |
271 | } | |
272 | } | |
273 | Some(timeout) => { | |
274 | // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a | |
275 | // notification we just want to unconditionally set `state` back to `EMPTY`, either | |
276 | // consuming a notification or un-flagging ourselves as parked. | |
277 | let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap(); | |
278 | ||
279 | match self.state.swap(EMPTY, SeqCst) { | |
280 | NOTIFIED => {} // got a notification | |
281 | PARKED => {} // no notification | |
282 | n => panic!("inconsistent park_timeout state: {}", n), | |
283 | } | |
284 | } | |
285 | } | |
286 | } | |
287 | ||
288 | pub fn unpark(&self) { | |
289 | // To ensure the unparked thread will observe any writes we made before this call, we must | |
290 | // perform a release operation that `park` can synchronize with. To do that we must write | |
291 | // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather | |
292 | // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. | |
293 | match self.state.swap(NOTIFIED, SeqCst) { | |
294 | EMPTY => return, // no one was waiting | |
295 | NOTIFIED => return, // already unparked | |
296 | PARKED => {} // gotta go wake someone up | |
297 | _ => panic!("inconsistent state in unpark"), | |
298 | } | |
299 | ||
300 | // There is a period between when the parked thread sets `state` to `PARKED` (or last | |
301 | // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. | |
302 | // If we were to notify during this period it would be ignored and then when the parked | |
303 | // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this | |
304 | // stage so we can acquire `lock` to wait until it is ready to receive the notification. | |
305 | // | |
306 | // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes | |
307 | // it doesn't get woken only to have to wait for us to release `lock`. | |
308 | drop(self.lock.lock().unwrap()); | |
309 | self.cvar.notify_one(); | |
310 | } | |
311 | } |