]> git.proxmox.com Git - rustc.git/blame - vendor/crossbeam-utils-0.6.6/src/sync/parker.rs
New upstream version 1.51.0+dfsg1
[rustc.git] / vendor / crossbeam-utils-0.6.6 / src / sync / parker.rs
CommitLineData
416331ca
XL
1use std::fmt;
2use std::marker::PhantomData;
3use std::sync::{Arc, Condvar, Mutex};
4use std::sync::atomic::AtomicUsize;
5use std::sync::atomic::Ordering::SeqCst;
6use std::time::Duration;
7
8/// A thread parking primitive.
9///
10/// Conceptually, each `Parker` has an associated token which is initially not present:
11///
12/// * The [`park`] method blocks the current thread unless or until the token is available, at
13/// which point it automatically consumes the token. It may also return *spuriously*, without
14/// consuming the token.
15///
16/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum
17/// time.
18///
19/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
20/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
21/// returning immediately.
22///
23/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
24/// [`park`] and [`unpark`].
25///
26/// # Examples
27///
28/// ```
29/// use std::thread;
30/// use std::time::Duration;
31/// use crossbeam_utils::sync::Parker;
32///
33/// let mut p = Parker::new();
34/// let u = p.unparker().clone();
35///
36/// // Make the token available.
37/// u.unpark();
38/// // Wakes up immediately and consumes the token.
39/// p.park();
40///
41/// thread::spawn(move || {
42/// thread::sleep(Duration::from_millis(500));
43/// u.unpark();
44/// });
45///
46/// // Wakes up when `u.unpark()` provides the token, but may also wake up
47/// // spuriously before that without consuming the token.
48/// p.park();
49/// ```
50///
51/// [`park`]: struct.Parker.html#method.park
52/// [`park_timeout`]: struct.Parker.html#method.park_timeout
53/// [`unpark`]: struct.Unparker.html#method.unpark
54pub struct Parker {
55 unparker: Unparker,
56 _marker: PhantomData<*const ()>,
57}
58
59unsafe impl Send for Parker {}
60
61impl Parker {
62 /// Creates a new `Parker`.
63 ///
64 /// # Examples
65 ///
66 /// ```
67 /// use crossbeam_utils::sync::Parker;
68 ///
69 /// let p = Parker::new();
70 /// ```
71 ///
72 pub fn new() -> Parker {
73 Parker {
74 unparker: Unparker {
75 inner: Arc::new(Inner {
76 state: AtomicUsize::new(EMPTY),
77 lock: Mutex::new(()),
78 cvar: Condvar::new(),
79 }),
80 },
81 _marker: PhantomData,
82 }
83 }
84
85 /// Blocks the current thread until the token is made available.
86 ///
87 /// A call to `park` may wake up spuriously without consuming the token, and callers should be
88 /// prepared for this possibility.
89 ///
90 /// # Examples
91 ///
92 /// ```
93 /// use crossbeam_utils::sync::Parker;
94 ///
95 /// let mut p = Parker::new();
96 /// let u = p.unparker().clone();
97 ///
98 /// // Make the token available.
99 /// u.unpark();
100 ///
101 /// // Wakes up immediately and consumes the token.
102 /// p.park();
103 /// ```
104 pub fn park(&self) {
105 self.unparker.inner.park(None);
106 }
107
108 /// Blocks the current thread until the token is made available, but only for a limited time.
109 ///
110 /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers
111 /// should be prepared for this possibility.
112 ///
113 /// # Examples
114 ///
115 /// ```
116 /// use std::time::Duration;
117 /// use crossbeam_utils::sync::Parker;
118 ///
119 /// let mut p = Parker::new();
120 ///
121 /// // Waits for the token to become available, but will not wait longer than 500 ms.
122 /// p.park_timeout(Duration::from_millis(500));
123 /// ```
124 pub fn park_timeout(&self, timeout: Duration) {
125 self.unparker.inner.park(Some(timeout));
126 }
127
128 /// Returns a reference to an associated [`Unparker`].
129 ///
130 /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
131 ///
132 /// # Examples
133 ///
134 /// ```
135 /// use crossbeam_utils::sync::Parker;
136 ///
137 /// let mut p = Parker::new();
138 /// let u = p.unparker().clone();
139 ///
140 /// // Make the token available.
141 /// u.unpark();
142 /// // Wakes up immediately and consumes the token.
143 /// p.park();
144 /// ```
145 ///
146 /// [`park`]: struct.Parker.html#method.park
147 /// [`park_timeout`]: struct.Parker.html#method.park_timeout
148 ///
149 /// [`Unparker`]: struct.Unparker.html
150 pub fn unparker(&self) -> &Unparker {
151 &self.unparker
152 }
153}
154
155impl fmt::Debug for Parker {
156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157 f.pad("Parker { .. }")
158 }
159}
160
161/// Unparks a thread parked by the associated [`Parker`].
162///
163/// [`Parker`]: struct.Parker.html
164pub struct Unparker {
165 inner: Arc<Inner>,
166}
167
168unsafe impl Send for Unparker {}
169unsafe impl Sync for Unparker {}
170
171impl Unparker {
172 /// Atomically makes the token available if it is not already.
173 ///
174 /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
175 /// any.
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// use std::thread;
181 /// use std::time::Duration;
182 /// use crossbeam_utils::sync::Parker;
183 ///
184 /// let mut p = Parker::new();
185 /// let u = p.unparker().clone();
186 ///
187 /// thread::spawn(move || {
188 /// thread::sleep(Duration::from_millis(500));
189 /// u.unpark();
190 /// });
191 ///
192 /// // Wakes up when `u.unpark()` provides the token, but may also wake up
193 /// // spuriously before that without consuming the token.
194 /// p.park();
195 /// ```
196 ///
197 /// [`park`]: struct.Parker.html#method.park
198 /// [`park_timeout`]: struct.Parker.html#method.park_timeout
199 pub fn unpark(&self) {
200 self.inner.unpark()
201 }
202}
203
204impl fmt::Debug for Unparker {
205 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
206 f.pad("Unparker { .. }")
207 }
208}
209
210impl Clone for Unparker {
211 fn clone(&self) -> Unparker {
212 Unparker {
213 inner: self.inner.clone(),
214 }
215 }
216}
217
218const EMPTY: usize = 0;
219const PARKED: usize = 1;
220const NOTIFIED: usize = 2;
221
222struct Inner {
223 state: AtomicUsize,
224 lock: Mutex<()>,
225 cvar: Condvar,
226}
227
228impl Inner {
229 fn park(&self, timeout: Option<Duration>) {
230 // If we were previously notified then we consume this notification and return quickly.
231 if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
232 return;
233 }
234
235 // If the timeout is zero, then there is no need to actually block.
236 if let Some(ref dur) = timeout {
237 if *dur == Duration::from_millis(0) {
238 return;
239 }
240 }
241
242 // Otherwise we need to coordinate going to sleep.
243 let mut m = self.lock.lock().unwrap();
244
245 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
246 Ok(_) => {}
247 // Consume this notification to avoid spurious wakeups in the next park.
248 Err(NOTIFIED) => {
249 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
250 // because `unpark` may have been called again since we read `NOTIFIED` in the
251 // `compare_exchange` above. We must perform an acquire operation that synchronizes
252 // with that `unpark` to observe any writes it made before the call to `unpark`. To
253 // do that we must read from the write it made to `state`.
254 let old = self.state.swap(EMPTY, SeqCst);
255 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
256 return;
257 }
258 Err(n) => panic!("inconsistent park_timeout state: {}", n),
259 }
260
261 match timeout {
262 None => {
263 loop {
264 // Block the current thread on the conditional variable.
265 m = self.cvar.wait(m).unwrap();
266
267 match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
268 Ok(_) => return, // got a notification
269 Err(_) => {} // spurious wakeup, go back to sleep
270 }
271 }
272 }
273 Some(timeout) => {
274 // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
275 // notification we just want to unconditionally set `state` back to `EMPTY`, either
276 // consuming a notification or un-flagging ourselves as parked.
277 let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
278
279 match self.state.swap(EMPTY, SeqCst) {
280 NOTIFIED => {} // got a notification
281 PARKED => {} // no notification
282 n => panic!("inconsistent park_timeout state: {}", n),
283 }
284 }
285 }
286 }
287
288 pub fn unpark(&self) {
289 // To ensure the unparked thread will observe any writes we made before this call, we must
290 // perform a release operation that `park` can synchronize with. To do that we must write
291 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
292 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
293 match self.state.swap(NOTIFIED, SeqCst) {
294 EMPTY => return, // no one was waiting
295 NOTIFIED => return, // already unparked
296 PARKED => {} // gotta go wake someone up
297 _ => panic!("inconsistent state in unpark"),
298 }
299
300 // There is a period between when the parked thread sets `state` to `PARKED` (or last
301 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
302 // If we were to notify during this period it would be ignored and then when the parked
303 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
304 // stage so we can acquire `lock` to wait until it is ready to receive the notification.
305 //
306 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
307 // it doesn't get woken only to have to wait for us to release `lock`.
308 drop(self.lock.lock().unwrap());
309 self.cvar.notify_one();
310 }
311}