]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | // Copyright 2014 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
10 | ||
11 | /// Oneshot channels/ports | |
12 | /// | |
13 | /// This is the initial flavor of channels/ports used for comm module. This is | |
14 | /// an optimization for the one-use case of a channel. The major optimization of | |
15 | /// this type is to have one and exactly one allocation when the chan/port pair | |
16 | /// is created. | |
17 | /// | |
18 | /// Another possible optimization would be to not use an Arc box because | |
19 | /// in theory we know when the shared packet can be deallocated (no real need | |
20 | /// for the atomic reference counting), but I was having trouble how to destroy | |
21 | /// the data early in a drop of a Port. | |
22 | /// | |
23 | /// # Implementation | |
24 | /// | |
c34b1796 | 25 | /// Oneshots are implemented around one atomic usize variable. This variable |
bd371182 | 26 | /// indicates both the state of the port/chan but also contains any threads |
1a4d82fc JJ |
27 | /// blocked on the port. All atomic operations happen on this one word. |
28 | /// | |
29 | /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect | |
30 | /// on behalf of the channel side of things (it can be mentally thought of as | |
31 | /// consuming the port). This upgrade is then also stored in the shared packet. | |
32 | /// The one caveat to consider is that when a port sees a disconnected channel | |
33 | /// it must check for data because there is no "data plus upgrade" state. | |
34 | ||
35 | pub use self::Failure::*; | |
36 | pub use self::UpgradeResult::*; | |
37 | pub use self::SelectionResult::*; | |
38 | use self::MyUpgrade::*; | |
39 | ||
1a4d82fc JJ |
40 | use sync::mpsc::Receiver; |
41 | use sync::mpsc::blocking::{self, SignalToken}; | |
42 | use core::mem; | |
85aaf69f | 43 | use sync::atomic::{AtomicUsize, Ordering}; |
3157f602 | 44 | use time::Instant; |
1a4d82fc JJ |
45 | |
46 | // Various states you can find a port in. | |
c34b1796 AL |
47 | const EMPTY: usize = 0; // initial state: no data, no blocked receiver |
48 | const DATA: usize = 1; // data ready for receiver to take | |
49 | const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded | |
1a4d82fc JJ |
50 | // Any other value represents a pointer to a SignalToken value. The |
51 | // protocol ensures that when the state moves *to* a pointer, | |
52 | // ownership of the token is given to the packet, and when the state | |
53 | // moves *from* a pointer, ownership of the token is transferred to | |
54 | // whoever changed the state. | |
55 | ||
56 | pub struct Packet<T> { | |
bd371182 | 57 | // Internal state of the chan/port pair (stores the blocked thread as well) |
85aaf69f | 58 | state: AtomicUsize, |
1a4d82fc JJ |
59 | // One-shot data slot location |
60 | data: Option<T>, | |
61 | // when used for the second time, a oneshot channel must be upgraded, and | |
62 | // this contains the slot for the upgrade | |
63 | upgrade: MyUpgrade<T>, | |
64 | } | |
65 | ||
66 | pub enum Failure<T> { | |
67 | Empty, | |
68 | Disconnected, | |
69 | Upgraded(Receiver<T>), | |
70 | } | |
71 | ||
72 | pub enum UpgradeResult { | |
73 | UpSuccess, | |
74 | UpDisconnected, | |
75 | UpWoke(SignalToken), | |
76 | } | |
77 | ||
78 | pub enum SelectionResult<T> { | |
79 | SelCanceled, | |
80 | SelUpgraded(SignalToken, Receiver<T>), | |
81 | SelSuccess, | |
82 | } | |
83 | ||
84 | enum MyUpgrade<T> { | |
85 | NothingSent, | |
86 | SendUsed, | |
87 | GoUp(Receiver<T>), | |
88 | } | |
89 | ||
c34b1796 | 90 | impl<T> Packet<T> { |
1a4d82fc JJ |
91 | pub fn new() -> Packet<T> { |
92 | Packet { | |
93 | data: None, | |
94 | upgrade: NothingSent, | |
85aaf69f | 95 | state: AtomicUsize::new(EMPTY), |
1a4d82fc JJ |
96 | } |
97 | } | |
98 | ||
99 | pub fn send(&mut self, t: T) -> Result<(), T> { | |
100 | // Sanity check | |
101 | match self.upgrade { | |
102 | NothingSent => {} | |
103 | _ => panic!("sending on a oneshot that's already sent on "), | |
104 | } | |
105 | assert!(self.data.is_none()); | |
106 | self.data = Some(t); | |
107 | self.upgrade = SendUsed; | |
108 | ||
109 | match self.state.swap(DATA, Ordering::SeqCst) { | |
110 | // Sent the data, no one was waiting | |
111 | EMPTY => Ok(()), | |
112 | ||
113 | // Couldn't send the data, the port hung up first. Return the data | |
114 | // back up the stack. | |
115 | DISCONNECTED => { | |
116 | Err(self.data.take().unwrap()) | |
117 | } | |
118 | ||
119 | // Not possible, these are one-use channels | |
120 | DATA => unreachable!(), | |
121 | ||
122 | // There is a thread waiting on the other end. We leave the 'DATA' | |
123 | // state inside so it'll pick it up on the other end. | |
124 | ptr => unsafe { | |
c34b1796 | 125 | SignalToken::cast_from_usize(ptr).signal(); |
1a4d82fc JJ |
126 | Ok(()) |
127 | } | |
128 | } | |
129 | } | |
130 | ||
131 | // Just tests whether this channel has been sent on or not, this is only | |
132 | // safe to use from the sender. | |
133 | pub fn sent(&self) -> bool { | |
134 | match self.upgrade { | |
135 | NothingSent => false, | |
136 | _ => true, | |
137 | } | |
138 | } | |
139 | ||
3157f602 | 140 | pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure<T>> { |
bd371182 | 141 | // Attempt to not block the thread (it's a little expensive). If it looks |
1a4d82fc JJ |
142 | // like we're not empty, then immediately go through to `try_recv`. |
143 | if self.state.load(Ordering::SeqCst) == EMPTY { | |
144 | let (wait_token, signal_token) = blocking::tokens(); | |
c34b1796 | 145 | let ptr = unsafe { signal_token.cast_to_usize() }; |
1a4d82fc JJ |
146 | |
147 | // race with senders to enter the blocking state | |
148 | if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY { | |
3157f602 XL |
149 | if let Some(deadline) = deadline { |
150 | let timed_out = !wait_token.wait_max_until(deadline); | |
151 | // Try to reset the state | |
152 | if timed_out { | |
153 | try!(self.abort_selection().map_err(Upgraded)); | |
154 | } | |
155 | } else { | |
156 | wait_token.wait(); | |
157 | debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY); | |
158 | } | |
1a4d82fc JJ |
159 | } else { |
160 | // drop the signal token, since we never blocked | |
c34b1796 | 161 | drop(unsafe { SignalToken::cast_from_usize(ptr) }); |
1a4d82fc JJ |
162 | } |
163 | } | |
164 | ||
165 | self.try_recv() | |
166 | } | |
167 | ||
168 | pub fn try_recv(&mut self) -> Result<T, Failure<T>> { | |
169 | match self.state.load(Ordering::SeqCst) { | |
170 | EMPTY => Err(Empty), | |
171 | ||
172 | // We saw some data on the channel, but the channel can be used | |
173 | // again to send us an upgrade. As a result, we need to re-insert | |
174 | // into the channel that there's no data available (otherwise we'll | |
175 | // just see DATA next time). This is done as a cmpxchg because if | |
176 | // the state changes under our feet we'd rather just see that state | |
177 | // change. | |
178 | DATA => { | |
179 | self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst); | |
180 | match self.data.take() { | |
181 | Some(data) => Ok(data), | |
182 | None => unreachable!(), | |
183 | } | |
184 | } | |
185 | ||
186 | // There's no guarantee that we receive before an upgrade happens, | |
187 | // and an upgrade flags the channel as disconnected, so when we see | |
188 | // this we first need to check if there's data available and *then* | |
189 | // we go through and process the upgrade. | |
190 | DISCONNECTED => { | |
191 | match self.data.take() { | |
192 | Some(data) => Ok(data), | |
193 | None => { | |
194 | match mem::replace(&mut self.upgrade, SendUsed) { | |
195 | SendUsed | NothingSent => Err(Disconnected), | |
196 | GoUp(upgrade) => Err(Upgraded(upgrade)) | |
197 | } | |
198 | } | |
199 | } | |
200 | } | |
201 | ||
202 | // We are the sole receiver; there cannot be a blocking | |
203 | // receiver already. | |
204 | _ => unreachable!() | |
205 | } | |
206 | } | |
207 | ||
208 | // Returns whether the upgrade was completed. If the upgrade wasn't | |
209 | // completed, then the port couldn't get sent to the other half (it will | |
210 | // never receive it). | |
211 | pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { | |
212 | let prev = match self.upgrade { | |
213 | NothingSent => NothingSent, | |
214 | SendUsed => SendUsed, | |
215 | _ => panic!("upgrading again"), | |
216 | }; | |
217 | self.upgrade = GoUp(up); | |
218 | ||
219 | match self.state.swap(DISCONNECTED, Ordering::SeqCst) { | |
220 | // If the channel is empty or has data on it, then we're good to go. | |
221 | // Senders will check the data before the upgrade (in case we | |
222 | // plastered over the DATA state). | |
223 | DATA | EMPTY => UpSuccess, | |
224 | ||
225 | // If the other end is already disconnected, then we failed the | |
226 | // upgrade. Be sure to trash the port we were given. | |
227 | DISCONNECTED => { self.upgrade = prev; UpDisconnected } | |
228 | ||
229 | // If someone's waiting, we gotta wake them up | |
c34b1796 | 230 | ptr => UpWoke(unsafe { SignalToken::cast_from_usize(ptr) }) |
1a4d82fc JJ |
231 | } |
232 | } | |
233 | ||
234 | pub fn drop_chan(&mut self) { | |
235 | match self.state.swap(DISCONNECTED, Ordering::SeqCst) { | |
236 | DATA | DISCONNECTED | EMPTY => {} | |
237 | ||
238 | // If someone's waiting, we gotta wake them up | |
239 | ptr => unsafe { | |
c34b1796 | 240 | SignalToken::cast_from_usize(ptr).signal(); |
1a4d82fc JJ |
241 | } |
242 | } | |
243 | } | |
244 | ||
245 | pub fn drop_port(&mut self) { | |
246 | match self.state.swap(DISCONNECTED, Ordering::SeqCst) { | |
247 | // An empty channel has nothing to do, and a remotely disconnected | |
248 | // channel also has nothing to do b/c we're about to run the drop | |
249 | // glue | |
250 | DISCONNECTED | EMPTY => {} | |
251 | ||
252 | // There's data on the channel, so make sure we destroy it promptly. | |
253 | // This is why not using an arc is a little difficult (need the box | |
254 | // to stay valid while we take the data). | |
255 | DATA => { self.data.take().unwrap(); } | |
256 | ||
257 | // We're the only ones that can block on this port | |
258 | _ => unreachable!() | |
259 | } | |
260 | } | |
261 | ||
262 | //////////////////////////////////////////////////////////////////////////// | |
263 | // select implementation | |
264 | //////////////////////////////////////////////////////////////////////////// | |
265 | ||
266 | // If Ok, the value is whether this port has data, if Err, then the upgraded | |
267 | // port needs to be checked instead of this one. | |
268 | pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { | |
269 | match self.state.load(Ordering::SeqCst) { | |
270 | EMPTY => Ok(false), // Welp, we tried | |
271 | DATA => Ok(true), // we have some un-acquired data | |
272 | DISCONNECTED if self.data.is_some() => Ok(true), // we have data | |
273 | DISCONNECTED => { | |
274 | match mem::replace(&mut self.upgrade, SendUsed) { | |
275 | // The other end sent us an upgrade, so we need to | |
276 | // propagate upwards whether the upgrade can receive | |
277 | // data | |
278 | GoUp(upgrade) => Err(upgrade), | |
279 | ||
280 | // If the other end disconnected without sending an | |
281 | // upgrade, then we have data to receive (the channel is | |
282 | // disconnected). | |
283 | up => { self.upgrade = up; Ok(true) } | |
284 | } | |
285 | } | |
286 | _ => unreachable!(), // we're the "one blocker" | |
287 | } | |
288 | } | |
289 | ||
290 | // Attempts to start selection on this port. This can either succeed, fail | |
291 | // because there is data, or fail because there is an upgrade pending. | |
292 | pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> { | |
c34b1796 | 293 | let ptr = unsafe { token.cast_to_usize() }; |
1a4d82fc JJ |
294 | match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) { |
295 | EMPTY => SelSuccess, | |
296 | DATA => { | |
c34b1796 | 297 | drop(unsafe { SignalToken::cast_from_usize(ptr) }); |
1a4d82fc JJ |
298 | SelCanceled |
299 | } | |
300 | DISCONNECTED if self.data.is_some() => { | |
c34b1796 | 301 | drop(unsafe { SignalToken::cast_from_usize(ptr) }); |
1a4d82fc JJ |
302 | SelCanceled |
303 | } | |
304 | DISCONNECTED => { | |
305 | match mem::replace(&mut self.upgrade, SendUsed) { | |
306 | // The other end sent us an upgrade, so we need to | |
307 | // propagate upwards whether the upgrade can receive | |
308 | // data | |
309 | GoUp(upgrade) => { | |
c34b1796 | 310 | SelUpgraded(unsafe { SignalToken::cast_from_usize(ptr) }, upgrade) |
1a4d82fc JJ |
311 | } |
312 | ||
313 | // If the other end disconnected without sending an | |
314 | // upgrade, then we have data to receive (the channel is | |
315 | // disconnected). | |
316 | up => { | |
317 | self.upgrade = up; | |
c34b1796 | 318 | drop(unsafe { SignalToken::cast_from_usize(ptr) }); |
1a4d82fc JJ |
319 | SelCanceled |
320 | } | |
321 | } | |
322 | } | |
323 | _ => unreachable!(), // we're the "one blocker" | |
324 | } | |
325 | } | |
326 | ||
bd371182 AL |
327 | // Remove a previous selecting thread from this port. This ensures that the |
328 | // blocked thread will no longer be visible to any other threads. | |
1a4d82fc JJ |
329 | // |
330 | // The return value indicates whether there's data on this port. | |
331 | pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> { | |
332 | let state = match self.state.load(Ordering::SeqCst) { | |
333 | // Each of these states means that no further activity will happen | |
334 | // with regard to abortion selection | |
335 | s @ EMPTY | | |
336 | s @ DATA | | |
337 | s @ DISCONNECTED => s, | |
338 | ||
bd371182 | 339 | // If we've got a blocked thread, then use an atomic to gain ownership |
1a4d82fc JJ |
340 | // of it (may fail) |
341 | ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst) | |
342 | }; | |
343 | ||
344 | // Now that we've got ownership of our state, figure out what to do | |
345 | // about it. | |
346 | match state { | |
347 | EMPTY => unreachable!(), | |
bd371182 | 348 | // our thread used for select was stolen |
1a4d82fc JJ |
349 | DATA => Ok(true), |
350 | ||
351 | // If the other end has hung up, then we have complete ownership | |
352 | // of the port. First, check if there was data waiting for us. This | |
353 | // is possible if the other end sent something and then hung up. | |
354 | // | |
355 | // We then need to check to see if there was an upgrade requested, | |
356 | // and if so, the upgraded port needs to have its selection aborted. | |
357 | DISCONNECTED => { | |
358 | if self.data.is_some() { | |
359 | Ok(true) | |
360 | } else { | |
361 | match mem::replace(&mut self.upgrade, SendUsed) { | |
362 | GoUp(port) => Err(port), | |
363 | _ => Ok(true), | |
364 | } | |
365 | } | |
366 | } | |
367 | ||
368 | // We woke ourselves up from select. | |
369 | ptr => unsafe { | |
c34b1796 | 370 | drop(SignalToken::cast_from_usize(ptr)); |
1a4d82fc JJ |
371 | Ok(false) |
372 | } | |
373 | } | |
374 | } | |
375 | } | |
376 | ||
c34b1796 | 377 | impl<T> Drop for Packet<T> { |
1a4d82fc JJ |
378 | fn drop(&mut self) { |
379 | assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED); | |
380 | } | |
381 | } |