]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-utils/src/sync/parker.rs
New upstream version 1.44.1+dfsg1
[rustc.git] / vendor / crossbeam-utils / src / sync / parker.rs
1 use std::fmt;
2 use std::marker::PhantomData;
3 use std::sync::atomic::AtomicUsize;
4 use std::sync::atomic::Ordering::SeqCst;
5 use std::sync::{Arc, Condvar, Mutex};
6 use std::time::Duration;
7
8 /// A thread parking primitive.
9 ///
10 /// Conceptually, each `Parker` has an associated token which is initially not present:
11 ///
12 /// * The [`park`] method blocks the current thread unless or until the token is available, at
13 /// which point it automatically consumes the token. It may also return *spuriously*, without
14 /// consuming the token.
15 ///
16 /// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum
17 /// time.
18 ///
19 /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
20 /// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
21 /// returning immediately.
22 ///
23 /// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
24 /// [`park`] and [`unpark`].
25 ///
26 /// # Examples
27 ///
28 /// ```
29 /// use std::thread;
30 /// use std::time::Duration;
31 /// use crossbeam_utils::sync::Parker;
32 ///
33 /// let mut p = Parker::new();
34 /// let u = p.unparker().clone();
35 ///
36 /// // Make the token available.
37 /// u.unpark();
38 /// // Wakes up immediately and consumes the token.
39 /// p.park();
40 ///
41 /// thread::spawn(move || {
42 /// thread::sleep(Duration::from_millis(500));
43 /// u.unpark();
44 /// });
45 ///
46 /// // Wakes up when `u.unpark()` provides the token, but may also wake up
47 /// // spuriously before that without consuming the token.
48 /// p.park();
49 /// ```
50 ///
51 /// [`park`]: struct.Parker.html#method.park
52 /// [`park_timeout`]: struct.Parker.html#method.park_timeout
53 /// [`unpark`]: struct.Unparker.html#method.unpark
54 pub struct Parker {
55 unparker: Unparker,
56 _marker: PhantomData<*const ()>,
57 }
58
59 unsafe impl Send for Parker {}
60
61 impl Parker {
62 /// Creates a new `Parker`.
63 ///
64 /// # Examples
65 ///
66 /// ```
67 /// use crossbeam_utils::sync::Parker;
68 ///
69 /// let p = Parker::new();
70 /// ```
71 ///
72 pub fn new() -> Parker {
73 Parker {
74 unparker: Unparker {
75 inner: Arc::new(Inner {
76 state: AtomicUsize::new(EMPTY),
77 lock: Mutex::new(()),
78 cvar: Condvar::new(),
79 }),
80 },
81 _marker: PhantomData,
82 }
83 }
84
85 /// Blocks the current thread until the token is made available.
86 ///
87 /// A call to `park` may wake up spuriously without consuming the token, and callers should be
88 /// prepared for this possibility.
89 ///
90 /// # Examples
91 ///
92 /// ```
93 /// use crossbeam_utils::sync::Parker;
94 ///
95 /// let mut p = Parker::new();
96 /// let u = p.unparker().clone();
97 ///
98 /// // Make the token available.
99 /// u.unpark();
100 ///
101 /// // Wakes up immediately and consumes the token.
102 /// p.park();
103 /// ```
104 pub fn park(&self) {
105 self.unparker.inner.park(None);
106 }
107
108 /// Blocks the current thread until the token is made available, but only for a limited time.
109 ///
110 /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers
111 /// should be prepared for this possibility.
112 ///
113 /// # Examples
114 ///
115 /// ```
116 /// use std::time::Duration;
117 /// use crossbeam_utils::sync::Parker;
118 ///
119 /// let mut p = Parker::new();
120 ///
121 /// // Waits for the token to become available, but will not wait longer than 500 ms.
122 /// p.park_timeout(Duration::from_millis(500));
123 /// ```
124 pub fn park_timeout(&self, timeout: Duration) {
125 self.unparker.inner.park(Some(timeout));
126 }
127
128 /// Returns a reference to an associated [`Unparker`].
129 ///
130 /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
131 ///
132 /// # Examples
133 ///
134 /// ```
135 /// use crossbeam_utils::sync::Parker;
136 ///
137 /// let mut p = Parker::new();
138 /// let u = p.unparker().clone();
139 ///
140 /// // Make the token available.
141 /// u.unpark();
142 /// // Wakes up immediately and consumes the token.
143 /// p.park();
144 /// ```
145 ///
146 /// [`park`]: struct.Parker.html#method.park
147 /// [`park_timeout`]: struct.Parker.html#method.park_timeout
148 ///
149 /// [`Unparker`]: struct.Unparker.html
150 pub fn unparker(&self) -> &Unparker {
151 &self.unparker
152 }
153 }
154
155 impl fmt::Debug for Parker {
156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157 f.pad("Parker { .. }")
158 }
159 }
160
161 /// Unparks a thread parked by the associated [`Parker`].
162 ///
163 /// [`Parker`]: struct.Parker.html
164 pub struct Unparker {
165 inner: Arc<Inner>,
166 }
167
168 unsafe impl Send for Unparker {}
169 unsafe impl Sync for Unparker {}
170
171 impl Unparker {
172 /// Atomically makes the token available if it is not already.
173 ///
174 /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
175 /// any.
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// use std::thread;
181 /// use std::time::Duration;
182 /// use crossbeam_utils::sync::Parker;
183 ///
184 /// let mut p = Parker::new();
185 /// let u = p.unparker().clone();
186 ///
187 /// thread::spawn(move || {
188 /// thread::sleep(Duration::from_millis(500));
189 /// u.unpark();
190 /// });
191 ///
192 /// // Wakes up when `u.unpark()` provides the token, but may also wake up
193 /// // spuriously before that without consuming the token.
194 /// p.park();
195 /// ```
196 ///
197 /// [`park`]: struct.Parker.html#method.park
198 /// [`park_timeout`]: struct.Parker.html#method.park_timeout
199 pub fn unpark(&self) {
200 self.inner.unpark()
201 }
202 }
203
204 impl fmt::Debug for Unparker {
205 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
206 f.pad("Unparker { .. }")
207 }
208 }
209
210 impl Clone for Unparker {
211 fn clone(&self) -> Unparker {
212 Unparker {
213 inner: self.inner.clone(),
214 }
215 }
216 }
217
218 const EMPTY: usize = 0;
219 const PARKED: usize = 1;
220 const NOTIFIED: usize = 2;
221
222 struct Inner {
223 state: AtomicUsize,
224 lock: Mutex<()>,
225 cvar: Condvar,
226 }
227
228 impl Inner {
229 fn park(&self, timeout: Option<Duration>) {
230 // If we were previously notified then we consume this notification and return quickly.
231 if self
232 .state
233 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
234 .is_ok()
235 {
236 return;
237 }
238
239 // If the timeout is zero, then there is no need to actually block.
240 if let Some(ref dur) = timeout {
241 if *dur == Duration::from_millis(0) {
242 return;
243 }
244 }
245
246 // Otherwise we need to coordinate going to sleep.
247 let mut m = self.lock.lock().unwrap();
248
249 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
250 Ok(_) => {}
251 // Consume this notification to avoid spurious wakeups in the next park.
252 Err(NOTIFIED) => {
253 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
254 // because `unpark` may have been called again since we read `NOTIFIED` in the
255 // `compare_exchange` above. We must perform an acquire operation that synchronizes
256 // with that `unpark` to observe any writes it made before the call to `unpark`. To
257 // do that we must read from the write it made to `state`.
258 let old = self.state.swap(EMPTY, SeqCst);
259 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
260 return;
261 }
262 Err(n) => panic!("inconsistent park_timeout state: {}", n),
263 }
264
265 match timeout {
266 None => {
267 loop {
268 // Block the current thread on the conditional variable.
269 m = self.cvar.wait(m).unwrap();
270
271 match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
272 Ok(_) => return, // got a notification
273 Err(_) => {} // spurious wakeup, go back to sleep
274 }
275 }
276 }
277 Some(timeout) => {
278 // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
279 // notification we just want to unconditionally set `state` back to `EMPTY`, either
280 // consuming a notification or un-flagging ourselves as parked.
281 let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
282
283 match self.state.swap(EMPTY, SeqCst) {
284 NOTIFIED => {} // got a notification
285 PARKED => {} // no notification
286 n => panic!("inconsistent park_timeout state: {}", n),
287 }
288 }
289 }
290 }
291
292 pub fn unpark(&self) {
293 // To ensure the unparked thread will observe any writes we made before this call, we must
294 // perform a release operation that `park` can synchronize with. To do that we must write
295 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
296 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
297 match self.state.swap(NOTIFIED, SeqCst) {
298 EMPTY => return, // no one was waiting
299 NOTIFIED => return, // already unparked
300 PARKED => {} // gotta go wake someone up
301 _ => panic!("inconsistent state in unpark"),
302 }
303
304 // There is a period between when the parked thread sets `state` to `PARKED` (or last
305 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
306 // If we were to notify during this period it would be ignored and then when the parked
307 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
308 // stage so we can acquire `lock` to wait until it is ready to receive the notification.
309 //
310 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
311 // it doesn't get woken only to have to wait for us to release `lock`.
312 drop(self.lock.lock().unwrap());
313 self.cvar.notify_one();
314 }
315 }