]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | /// Oneshot channels/ports |
2 | /// | |
3 | /// This is the initial flavor of channels/ports used for comm module. This is | |
4 | /// an optimization for the one-use case of a channel. The major optimization of | |
5 | /// this type is to have one and exactly one allocation when the chan/port pair | |
6 | /// is created. | |
7 | /// | |
8 | /// Another possible optimization would be to not use an Arc box because | |
9 | /// in theory we know when the shared packet can be deallocated (no real need | |
10 | /// for the atomic reference counting), but I was having trouble how to destroy | |
11 | /// the data early in a drop of a Port. | |
12 | /// | |
13 | /// # Implementation | |
14 | /// | |
c34b1796 | 15 | /// Oneshots are implemented around one atomic usize variable. This variable |
bd371182 | 16 | /// indicates both the state of the port/chan but also contains any threads |
1a4d82fc JJ |
17 | /// blocked on the port. All atomic operations happen on this one word. |
18 | /// | |
19 | /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect | |
20 | /// on behalf of the channel side of things (it can be mentally thought of as | |
21 | /// consuming the port). This upgrade is then also stored in the shared packet. | |
22 | /// The one caveat to consider is that when a port sees a disconnected channel | |
23 | /// it must check for data because there is no "data plus upgrade" state. | |
24 | ||
25 | pub use self::Failure::*; | |
26 | pub use self::UpgradeResult::*; | |
27 | pub use self::SelectionResult::*; | |
28 | use self::MyUpgrade::*; | |
29 | ||
1a4d82fc JJ |
30 | use sync::mpsc::Receiver; |
31 | use sync::mpsc::blocking::{self, SignalToken}; | |
476ff2be SL |
32 | use cell::UnsafeCell; |
33 | use ptr; | |
85aaf69f | 34 | use sync::atomic::{AtomicUsize, Ordering}; |
3157f602 | 35 | use time::Instant; |
1a4d82fc JJ |
36 | |
37 | // Various states you can find a port in. | |
c34b1796 AL |
38 | const EMPTY: usize = 0; // initial state: no data, no blocked receiver |
39 | const DATA: usize = 1; // data ready for receiver to take | |
40 | const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded | |
1a4d82fc JJ |
41 | // Any other value represents a pointer to a SignalToken value. The |
42 | // protocol ensures that when the state moves *to* a pointer, | |
43 | // ownership of the token is given to the packet, and when the state | |
44 | // moves *from* a pointer, ownership of the token is transferred to | |
45 | // whoever changed the state. | |
46 | ||
47 | pub struct Packet<T> { | |
bd371182 | 48 | // Internal state of the chan/port pair (stores the blocked thread as well) |
85aaf69f | 49 | state: AtomicUsize, |
1a4d82fc | 50 | // One-shot data slot location |
476ff2be | 51 | data: UnsafeCell<Option<T>>, |
1a4d82fc JJ |
52 | // when used for the second time, a oneshot channel must be upgraded, and |
53 | // this contains the slot for the upgrade | |
476ff2be | 54 | upgrade: UnsafeCell<MyUpgrade<T>>, |
1a4d82fc JJ |
55 | } |
56 | ||
57 | pub enum Failure<T> { | |
58 | Empty, | |
59 | Disconnected, | |
60 | Upgraded(Receiver<T>), | |
61 | } | |
62 | ||
63 | pub enum UpgradeResult { | |
64 | UpSuccess, | |
65 | UpDisconnected, | |
66 | UpWoke(SignalToken), | |
67 | } | |
68 | ||
69 | pub enum SelectionResult<T> { | |
70 | SelCanceled, | |
71 | SelUpgraded(SignalToken, Receiver<T>), | |
72 | SelSuccess, | |
73 | } | |
74 | ||
75 | enum MyUpgrade<T> { | |
76 | NothingSent, | |
77 | SendUsed, | |
78 | GoUp(Receiver<T>), | |
79 | } | |
80 | ||
c34b1796 | 81 | impl<T> Packet<T> { |
1a4d82fc JJ |
82 | pub fn new() -> Packet<T> { |
83 | Packet { | |
476ff2be SL |
84 | data: UnsafeCell::new(None), |
85 | upgrade: UnsafeCell::new(NothingSent), | |
85aaf69f | 86 | state: AtomicUsize::new(EMPTY), |
1a4d82fc JJ |
87 | } |
88 | } | |
89 | ||
476ff2be SL |
90 | pub fn send(&self, t: T) -> Result<(), T> { |
91 | unsafe { | |
92 | // Sanity check | |
93 | match *self.upgrade.get() { | |
94 | NothingSent => {} | |
95 | _ => panic!("sending on a oneshot that's already sent on "), | |
1a4d82fc | 96 | } |
476ff2be SL |
97 | assert!((*self.data.get()).is_none()); |
98 | ptr::write(self.data.get(), Some(t)); | |
99 | ptr::write(self.upgrade.get(), SendUsed); | |
100 | ||
101 | match self.state.swap(DATA, Ordering::SeqCst) { | |
102 | // Sent the data, no one was waiting | |
103 | EMPTY => Ok(()), | |
104 | ||
105 | // Couldn't send the data, the port hung up first. Return the data | |
106 | // back up the stack. | |
107 | DISCONNECTED => { | |
108 | self.state.swap(DISCONNECTED, Ordering::SeqCst); | |
109 | ptr::write(self.upgrade.get(), NothingSent); | |
110 | Err((&mut *self.data.get()).take().unwrap()) | |
111 | } | |
1a4d82fc | 112 | |
476ff2be SL |
113 | // Not possible, these are one-use channels |
114 | DATA => unreachable!(), | |
1a4d82fc | 115 | |
476ff2be SL |
116 | // There is a thread waiting on the other end. We leave the 'DATA' |
117 | // state inside so it'll pick it up on the other end. | |
118 | ptr => { | |
119 | SignalToken::cast_from_usize(ptr).signal(); | |
120 | Ok(()) | |
121 | } | |
1a4d82fc JJ |
122 | } |
123 | } | |
124 | } | |
125 | ||
126 | // Just tests whether this channel has been sent on or not, this is only | |
127 | // safe to use from the sender. | |
128 | pub fn sent(&self) -> bool { | |
476ff2be SL |
129 | unsafe { |
130 | match *self.upgrade.get() { | |
131 | NothingSent => false, | |
132 | _ => true, | |
133 | } | |
1a4d82fc JJ |
134 | } |
135 | } | |
136 | ||
476ff2be | 137 | pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> { |
bd371182 | 138 | // Attempt to not block the thread (it's a little expensive). If it looks |
1a4d82fc JJ |
139 | // like we're not empty, then immediately go through to `try_recv`. |
140 | if self.state.load(Ordering::SeqCst) == EMPTY { | |
141 | let (wait_token, signal_token) = blocking::tokens(); | |
c34b1796 | 142 | let ptr = unsafe { signal_token.cast_to_usize() }; |
1a4d82fc JJ |
143 | |
144 | // race with senders to enter the blocking state | |
145 | if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY { | |
3157f602 XL |
146 | if let Some(deadline) = deadline { |
147 | let timed_out = !wait_token.wait_max_until(deadline); | |
148 | // Try to reset the state | |
149 | if timed_out { | |
9e0c209e | 150 | self.abort_selection().map_err(Upgraded)?; |
3157f602 XL |
151 | } |
152 | } else { | |
153 | wait_token.wait(); | |
154 | debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY); | |
155 | } | |
1a4d82fc JJ |
156 | } else { |
157 | // drop the signal token, since we never blocked | |
c34b1796 | 158 | drop(unsafe { SignalToken::cast_from_usize(ptr) }); |
1a4d82fc JJ |
159 | } |
160 | } | |
161 | ||
162 | self.try_recv() | |
163 | } | |
164 | ||
476ff2be SL |
165 | pub fn try_recv(&self) -> Result<T, Failure<T>> { |
166 | unsafe { | |
167 | match self.state.load(Ordering::SeqCst) { | |
168 | EMPTY => Err(Empty), | |
169 | ||
170 | // We saw some data on the channel, but the channel can be used | |
171 | // again to send us an upgrade. As a result, we need to re-insert | |
172 | // into the channel that there's no data available (otherwise we'll | |
173 | // just see DATA next time). This is done as a cmpxchg because if | |
174 | // the state changes under our feet we'd rather just see that state | |
175 | // change. | |
176 | DATA => { | |
177 | self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst); | |
178 | match (&mut *self.data.get()).take() { | |
179 | Some(data) => Ok(data), | |
180 | None => unreachable!(), | |
181 | } | |
1a4d82fc | 182 | } |
1a4d82fc | 183 | |
476ff2be SL |
184 | // There's no guarantee that we receive before an upgrade happens, |
185 | // and an upgrade flags the channel as disconnected, so when we see | |
186 | // this we first need to check if there's data available and *then* | |
187 | // we go through and process the upgrade. | |
188 | DISCONNECTED => { | |
189 | match (&mut *self.data.get()).take() { | |
190 | Some(data) => Ok(data), | |
191 | None => { | |
192 | match ptr::replace(self.upgrade.get(), SendUsed) { | |
193 | SendUsed | NothingSent => Err(Disconnected), | |
194 | GoUp(upgrade) => Err(Upgraded(upgrade)) | |
195 | } | |
1a4d82fc JJ |
196 | } |
197 | } | |
198 | } | |
1a4d82fc | 199 | |
476ff2be SL |
200 | // We are the sole receiver; there cannot be a blocking |
201 | // receiver already. | |
202 | _ => unreachable!() | |
203 | } | |
1a4d82fc JJ |
204 | } |
205 | } | |
206 | ||
207 | // Returns whether the upgrade was completed. If the upgrade wasn't | |
208 | // completed, then the port couldn't get sent to the other half (it will | |
209 | // never receive it). | |
476ff2be SL |
210 | pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult { |
211 | unsafe { | |
212 | let prev = match *self.upgrade.get() { | |
213 | NothingSent => NothingSent, | |
214 | SendUsed => SendUsed, | |
215 | _ => panic!("upgrading again"), | |
216 | }; | |
217 | ptr::write(self.upgrade.get(), GoUp(up)); | |
218 | ||
219 | match self.state.swap(DISCONNECTED, Ordering::SeqCst) { | |
220 | // If the channel is empty or has data on it, then we're good to go. | |
221 | // Senders will check the data before the upgrade (in case we | |
222 | // plastered over the DATA state). | |
223 | DATA | EMPTY => UpSuccess, | |
224 | ||
225 | // If the other end is already disconnected, then we failed the | |
226 | // upgrade. Be sure to trash the port we were given. | |
227 | DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected } | |
228 | ||
229 | // If someone's waiting, we gotta wake them up | |
230 | ptr => UpWoke(SignalToken::cast_from_usize(ptr)) | |
231 | } | |
1a4d82fc JJ |
232 | } |
233 | } | |
234 | ||
476ff2be | 235 | pub fn drop_chan(&self) { |
1a4d82fc JJ |
236 | match self.state.swap(DISCONNECTED, Ordering::SeqCst) { |
237 | DATA | DISCONNECTED | EMPTY => {} | |
238 | ||
239 | // If someone's waiting, we gotta wake them up | |
240 | ptr => unsafe { | |
c34b1796 | 241 | SignalToken::cast_from_usize(ptr).signal(); |
1a4d82fc JJ |
242 | } |
243 | } | |
244 | } | |
245 | ||
476ff2be | 246 | pub fn drop_port(&self) { |
1a4d82fc JJ |
247 | match self.state.swap(DISCONNECTED, Ordering::SeqCst) { |
248 | // An empty channel has nothing to do, and a remotely disconnected | |
249 | // channel also has nothing to do b/c we're about to run the drop | |
250 | // glue | |
251 | DISCONNECTED | EMPTY => {} | |
252 | ||
253 | // There's data on the channel, so make sure we destroy it promptly. | |
254 | // This is why not using an arc is a little difficult (need the box | |
255 | // to stay valid while we take the data). | |
476ff2be | 256 | DATA => unsafe { (&mut *self.data.get()).take().unwrap(); }, |
1a4d82fc JJ |
257 | |
258 | // We're the only ones that can block on this port | |
259 | _ => unreachable!() | |
260 | } | |
261 | } | |
262 | ||
263 | //////////////////////////////////////////////////////////////////////////// | |
264 | // select implementation | |
265 | //////////////////////////////////////////////////////////////////////////// | |
266 | ||
267 | // If Ok, the value is whether this port has data, if Err, then the upgraded | |
268 | // port needs to be checked instead of this one. | |
476ff2be SL |
269 | pub fn can_recv(&self) -> Result<bool, Receiver<T>> { |
270 | unsafe { | |
271 | match self.state.load(Ordering::SeqCst) { | |
272 | EMPTY => Ok(false), // Welp, we tried | |
273 | DATA => Ok(true), // we have some un-acquired data | |
274 | DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data | |
275 | DISCONNECTED => { | |
276 | match ptr::replace(self.upgrade.get(), SendUsed) { | |
277 | // The other end sent us an upgrade, so we need to | |
278 | // propagate upwards whether the upgrade can receive | |
279 | // data | |
280 | GoUp(upgrade) => Err(upgrade), | |
281 | ||
282 | // If the other end disconnected without sending an | |
283 | // upgrade, then we have data to receive (the channel is | |
284 | // disconnected). | |
285 | up => { ptr::write(self.upgrade.get(), up); Ok(true) } | |
286 | } | |
1a4d82fc | 287 | } |
476ff2be | 288 | _ => unreachable!(), // we're the "one blocker" |
1a4d82fc | 289 | } |
1a4d82fc JJ |
290 | } |
291 | } | |
292 | ||
293 | // Attempts to start selection on this port. This can either succeed, fail | |
294 | // because there is data, or fail because there is an upgrade pending. | |
476ff2be SL |
295 | pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> { |
296 | unsafe { | |
297 | let ptr = token.cast_to_usize(); | |
298 | match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) { | |
299 | EMPTY => SelSuccess, | |
300 | DATA => { | |
301 | drop(SignalToken::cast_from_usize(ptr)); | |
302 | SelCanceled | |
303 | } | |
304 | DISCONNECTED if (*self.data.get()).is_some() => { | |
305 | drop(SignalToken::cast_from_usize(ptr)); | |
306 | SelCanceled | |
307 | } | |
308 | DISCONNECTED => { | |
309 | match ptr::replace(self.upgrade.get(), SendUsed) { | |
310 | // The other end sent us an upgrade, so we need to | |
311 | // propagate upwards whether the upgrade can receive | |
312 | // data | |
313 | GoUp(upgrade) => { | |
314 | SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade) | |
315 | } | |
1a4d82fc | 316 | |
476ff2be SL |
317 | // If the other end disconnected without sending an |
318 | // upgrade, then we have data to receive (the channel is | |
319 | // disconnected). | |
320 | up => { | |
321 | ptr::write(self.upgrade.get(), up); | |
322 | drop(SignalToken::cast_from_usize(ptr)); | |
323 | SelCanceled | |
324 | } | |
1a4d82fc JJ |
325 | } |
326 | } | |
476ff2be | 327 | _ => unreachable!(), // we're the "one blocker" |
1a4d82fc | 328 | } |
1a4d82fc JJ |
329 | } |
330 | } | |
331 | ||
bd371182 AL |
332 | // Remove a previous selecting thread from this port. This ensures that the |
333 | // blocked thread will no longer be visible to any other threads. | |
1a4d82fc JJ |
334 | // |
335 | // The return value indicates whether there's data on this port. | |
476ff2be | 336 | pub fn abort_selection(&self) -> Result<bool, Receiver<T>> { |
1a4d82fc JJ |
337 | let state = match self.state.load(Ordering::SeqCst) { |
338 | // Each of these states means that no further activity will happen | |
339 | // with regard to abortion selection | |
340 | s @ EMPTY | | |
341 | s @ DATA | | |
342 | s @ DISCONNECTED => s, | |
343 | ||
bd371182 | 344 | // If we've got a blocked thread, then use an atomic to gain ownership |
1a4d82fc JJ |
345 | // of it (may fail) |
346 | ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst) | |
347 | }; | |
348 | ||
349 | // Now that we've got ownership of our state, figure out what to do | |
350 | // about it. | |
351 | match state { | |
352 | EMPTY => unreachable!(), | |
bd371182 | 353 | // our thread used for select was stolen |
1a4d82fc JJ |
354 | DATA => Ok(true), |
355 | ||
356 | // If the other end has hung up, then we have complete ownership | |
357 | // of the port. First, check if there was data waiting for us. This | |
358 | // is possible if the other end sent something and then hung up. | |
359 | // | |
360 | // We then need to check to see if there was an upgrade requested, | |
361 | // and if so, the upgraded port needs to have its selection aborted. | |
476ff2be SL |
362 | DISCONNECTED => unsafe { |
363 | if (*self.data.get()).is_some() { | |
1a4d82fc JJ |
364 | Ok(true) |
365 | } else { | |
476ff2be | 366 | match ptr::replace(self.upgrade.get(), SendUsed) { |
1a4d82fc JJ |
367 | GoUp(port) => Err(port), |
368 | _ => Ok(true), | |
369 | } | |
370 | } | |
476ff2be | 371 | }, |
1a4d82fc JJ |
372 | |
373 | // We woke ourselves up from select. | |
374 | ptr => unsafe { | |
c34b1796 | 375 | drop(SignalToken::cast_from_usize(ptr)); |
1a4d82fc JJ |
376 | Ok(false) |
377 | } | |
378 | } | |
379 | } | |
380 | } | |
381 | ||
c34b1796 | 382 | impl<T> Drop for Packet<T> { |
1a4d82fc JJ |
383 | fn drop(&mut self) { |
384 | assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED); | |
385 | } | |
386 | } |