]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/oneshot.rs
New upstream version 1.33.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / oneshot.rs
CommitLineData
1a4d82fc
JJ
1/// Oneshot channels/ports
2///
3/// This is the initial flavor of channels/ports used for comm module. This is
4/// an optimization for the one-use case of a channel. The major optimization of
5/// this type is to have one and exactly one allocation when the chan/port pair
6/// is created.
7///
8/// Another possible optimization would be to not use an Arc box because
9/// in theory we know when the shared packet can be deallocated (no real need
10/// for the atomic reference counting), but I was having trouble how to destroy
11/// the data early in a drop of a Port.
12///
13/// # Implementation
14///
c34b1796 15/// Oneshots are implemented around one atomic usize variable. This variable
bd371182 16/// indicates both the state of the port/chan but also contains any threads
1a4d82fc
JJ
17/// blocked on the port. All atomic operations happen on this one word.
18///
19/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
20/// on behalf of the channel side of things (it can be mentally thought of as
21/// consuming the port). This upgrade is then also stored in the shared packet.
22/// The one caveat to consider is that when a port sees a disconnected channel
23/// it must check for data because there is no "data plus upgrade" state.
24
25pub use self::Failure::*;
26pub use self::UpgradeResult::*;
27pub use self::SelectionResult::*;
28use self::MyUpgrade::*;
29
1a4d82fc
JJ
30use sync::mpsc::Receiver;
31use sync::mpsc::blocking::{self, SignalToken};
476ff2be
SL
32use cell::UnsafeCell;
33use ptr;
85aaf69f 34use sync::atomic::{AtomicUsize, Ordering};
3157f602 35use time::Instant;
1a4d82fc
JJ
36
37// Various states you can find a port in.
c34b1796
AL
38const EMPTY: usize = 0; // initial state: no data, no blocked receiver
39const DATA: usize = 1; // data ready for receiver to take
40const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
1a4d82fc
JJ
41// Any other value represents a pointer to a SignalToken value. The
42// protocol ensures that when the state moves *to* a pointer,
43// ownership of the token is given to the packet, and when the state
44// moves *from* a pointer, ownership of the token is transferred to
45// whoever changed the state.
46
47pub struct Packet<T> {
bd371182 48 // Internal state of the chan/port pair (stores the blocked thread as well)
85aaf69f 49 state: AtomicUsize,
1a4d82fc 50 // One-shot data slot location
476ff2be 51 data: UnsafeCell<Option<T>>,
1a4d82fc
JJ
52 // when used for the second time, a oneshot channel must be upgraded, and
53 // this contains the slot for the upgrade
476ff2be 54 upgrade: UnsafeCell<MyUpgrade<T>>,
1a4d82fc
JJ
55}
56
57pub enum Failure<T> {
58 Empty,
59 Disconnected,
60 Upgraded(Receiver<T>),
61}
62
63pub enum UpgradeResult {
64 UpSuccess,
65 UpDisconnected,
66 UpWoke(SignalToken),
67}
68
69pub enum SelectionResult<T> {
70 SelCanceled,
71 SelUpgraded(SignalToken, Receiver<T>),
72 SelSuccess,
73}
74
75enum MyUpgrade<T> {
76 NothingSent,
77 SendUsed,
78 GoUp(Receiver<T>),
79}
80
c34b1796 81impl<T> Packet<T> {
1a4d82fc
JJ
82 pub fn new() -> Packet<T> {
83 Packet {
476ff2be
SL
84 data: UnsafeCell::new(None),
85 upgrade: UnsafeCell::new(NothingSent),
85aaf69f 86 state: AtomicUsize::new(EMPTY),
1a4d82fc
JJ
87 }
88 }
89
476ff2be
SL
90 pub fn send(&self, t: T) -> Result<(), T> {
91 unsafe {
92 // Sanity check
93 match *self.upgrade.get() {
94 NothingSent => {}
95 _ => panic!("sending on a oneshot that's already sent on "),
1a4d82fc 96 }
476ff2be
SL
97 assert!((*self.data.get()).is_none());
98 ptr::write(self.data.get(), Some(t));
99 ptr::write(self.upgrade.get(), SendUsed);
100
101 match self.state.swap(DATA, Ordering::SeqCst) {
102 // Sent the data, no one was waiting
103 EMPTY => Ok(()),
104
105 // Couldn't send the data, the port hung up first. Return the data
106 // back up the stack.
107 DISCONNECTED => {
108 self.state.swap(DISCONNECTED, Ordering::SeqCst);
109 ptr::write(self.upgrade.get(), NothingSent);
110 Err((&mut *self.data.get()).take().unwrap())
111 }
1a4d82fc 112
476ff2be
SL
113 // Not possible, these are one-use channels
114 DATA => unreachable!(),
1a4d82fc 115
476ff2be
SL
116 // There is a thread waiting on the other end. We leave the 'DATA'
117 // state inside so it'll pick it up on the other end.
118 ptr => {
119 SignalToken::cast_from_usize(ptr).signal();
120 Ok(())
121 }
1a4d82fc
JJ
122 }
123 }
124 }
125
126 // Just tests whether this channel has been sent on or not, this is only
127 // safe to use from the sender.
128 pub fn sent(&self) -> bool {
476ff2be
SL
129 unsafe {
130 match *self.upgrade.get() {
131 NothingSent => false,
132 _ => true,
133 }
1a4d82fc
JJ
134 }
135 }
136
476ff2be 137 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
bd371182 138 // Attempt to not block the thread (it's a little expensive). If it looks
1a4d82fc
JJ
139 // like we're not empty, then immediately go through to `try_recv`.
140 if self.state.load(Ordering::SeqCst) == EMPTY {
141 let (wait_token, signal_token) = blocking::tokens();
c34b1796 142 let ptr = unsafe { signal_token.cast_to_usize() };
1a4d82fc
JJ
143
144 // race with senders to enter the blocking state
145 if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY {
3157f602
XL
146 if let Some(deadline) = deadline {
147 let timed_out = !wait_token.wait_max_until(deadline);
148 // Try to reset the state
149 if timed_out {
9e0c209e 150 self.abort_selection().map_err(Upgraded)?;
3157f602
XL
151 }
152 } else {
153 wait_token.wait();
154 debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
155 }
1a4d82fc
JJ
156 } else {
157 // drop the signal token, since we never blocked
c34b1796 158 drop(unsafe { SignalToken::cast_from_usize(ptr) });
1a4d82fc
JJ
159 }
160 }
161
162 self.try_recv()
163 }
164
476ff2be
SL
165 pub fn try_recv(&self) -> Result<T, Failure<T>> {
166 unsafe {
167 match self.state.load(Ordering::SeqCst) {
168 EMPTY => Err(Empty),
169
170 // We saw some data on the channel, but the channel can be used
171 // again to send us an upgrade. As a result, we need to re-insert
172 // into the channel that there's no data available (otherwise we'll
173 // just see DATA next time). This is done as a cmpxchg because if
174 // the state changes under our feet we'd rather just see that state
175 // change.
176 DATA => {
177 self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
178 match (&mut *self.data.get()).take() {
179 Some(data) => Ok(data),
180 None => unreachable!(),
181 }
1a4d82fc 182 }
1a4d82fc 183
476ff2be
SL
184 // There's no guarantee that we receive before an upgrade happens,
185 // and an upgrade flags the channel as disconnected, so when we see
186 // this we first need to check if there's data available and *then*
187 // we go through and process the upgrade.
188 DISCONNECTED => {
189 match (&mut *self.data.get()).take() {
190 Some(data) => Ok(data),
191 None => {
192 match ptr::replace(self.upgrade.get(), SendUsed) {
193 SendUsed | NothingSent => Err(Disconnected),
194 GoUp(upgrade) => Err(Upgraded(upgrade))
195 }
1a4d82fc
JJ
196 }
197 }
198 }
1a4d82fc 199
476ff2be
SL
200 // We are the sole receiver; there cannot be a blocking
201 // receiver already.
202 _ => unreachable!()
203 }
1a4d82fc
JJ
204 }
205 }
206
207 // Returns whether the upgrade was completed. If the upgrade wasn't
208 // completed, then the port couldn't get sent to the other half (it will
209 // never receive it).
476ff2be
SL
210 pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
211 unsafe {
212 let prev = match *self.upgrade.get() {
213 NothingSent => NothingSent,
214 SendUsed => SendUsed,
215 _ => panic!("upgrading again"),
216 };
217 ptr::write(self.upgrade.get(), GoUp(up));
218
219 match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
220 // If the channel is empty or has data on it, then we're good to go.
221 // Senders will check the data before the upgrade (in case we
222 // plastered over the DATA state).
223 DATA | EMPTY => UpSuccess,
224
225 // If the other end is already disconnected, then we failed the
226 // upgrade. Be sure to trash the port we were given.
227 DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }
228
229 // If someone's waiting, we gotta wake them up
230 ptr => UpWoke(SignalToken::cast_from_usize(ptr))
231 }
1a4d82fc
JJ
232 }
233 }
234
476ff2be 235 pub fn drop_chan(&self) {
1a4d82fc
JJ
236 match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
237 DATA | DISCONNECTED | EMPTY => {}
238
239 // If someone's waiting, we gotta wake them up
240 ptr => unsafe {
c34b1796 241 SignalToken::cast_from_usize(ptr).signal();
1a4d82fc
JJ
242 }
243 }
244 }
245
476ff2be 246 pub fn drop_port(&self) {
1a4d82fc
JJ
247 match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
248 // An empty channel has nothing to do, and a remotely disconnected
249 // channel also has nothing to do b/c we're about to run the drop
250 // glue
251 DISCONNECTED | EMPTY => {}
252
253 // There's data on the channel, so make sure we destroy it promptly.
254 // This is why not using an arc is a little difficult (need the box
255 // to stay valid while we take the data).
476ff2be 256 DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },
1a4d82fc
JJ
257
258 // We're the only ones that can block on this port
259 _ => unreachable!()
260 }
261 }
262
263 ////////////////////////////////////////////////////////////////////////////
264 // select implementation
265 ////////////////////////////////////////////////////////////////////////////
266
267 // If Ok, the value is whether this port has data, if Err, then the upgraded
268 // port needs to be checked instead of this one.
476ff2be
SL
269 pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
270 unsafe {
271 match self.state.load(Ordering::SeqCst) {
272 EMPTY => Ok(false), // Welp, we tried
273 DATA => Ok(true), // we have some un-acquired data
274 DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
275 DISCONNECTED => {
276 match ptr::replace(self.upgrade.get(), SendUsed) {
277 // The other end sent us an upgrade, so we need to
278 // propagate upwards whether the upgrade can receive
279 // data
280 GoUp(upgrade) => Err(upgrade),
281
282 // If the other end disconnected without sending an
283 // upgrade, then we have data to receive (the channel is
284 // disconnected).
285 up => { ptr::write(self.upgrade.get(), up); Ok(true) }
286 }
1a4d82fc 287 }
476ff2be 288 _ => unreachable!(), // we're the "one blocker"
1a4d82fc 289 }
1a4d82fc
JJ
290 }
291 }
292
293 // Attempts to start selection on this port. This can either succeed, fail
294 // because there is data, or fail because there is an upgrade pending.
476ff2be
SL
295 pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
296 unsafe {
297 let ptr = token.cast_to_usize();
298 match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
299 EMPTY => SelSuccess,
300 DATA => {
301 drop(SignalToken::cast_from_usize(ptr));
302 SelCanceled
303 }
304 DISCONNECTED if (*self.data.get()).is_some() => {
305 drop(SignalToken::cast_from_usize(ptr));
306 SelCanceled
307 }
308 DISCONNECTED => {
309 match ptr::replace(self.upgrade.get(), SendUsed) {
310 // The other end sent us an upgrade, so we need to
311 // propagate upwards whether the upgrade can receive
312 // data
313 GoUp(upgrade) => {
314 SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
315 }
1a4d82fc 316
476ff2be
SL
317 // If the other end disconnected without sending an
318 // upgrade, then we have data to receive (the channel is
319 // disconnected).
320 up => {
321 ptr::write(self.upgrade.get(), up);
322 drop(SignalToken::cast_from_usize(ptr));
323 SelCanceled
324 }
1a4d82fc
JJ
325 }
326 }
476ff2be 327 _ => unreachable!(), // we're the "one blocker"
1a4d82fc 328 }
1a4d82fc
JJ
329 }
330 }
331
bd371182
AL
332 // Remove a previous selecting thread from this port. This ensures that the
333 // blocked thread will no longer be visible to any other threads.
1a4d82fc
JJ
334 //
335 // The return value indicates whether there's data on this port.
476ff2be 336 pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
1a4d82fc
JJ
337 let state = match self.state.load(Ordering::SeqCst) {
338 // Each of these states means that no further activity will happen
339 // with regard to abortion selection
340 s @ EMPTY |
341 s @ DATA |
342 s @ DISCONNECTED => s,
343
bd371182 344 // If we've got a blocked thread, then use an atomic to gain ownership
1a4d82fc
JJ
345 // of it (may fail)
346 ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst)
347 };
348
349 // Now that we've got ownership of our state, figure out what to do
350 // about it.
351 match state {
352 EMPTY => unreachable!(),
bd371182 353 // our thread used for select was stolen
1a4d82fc
JJ
354 DATA => Ok(true),
355
356 // If the other end has hung up, then we have complete ownership
357 // of the port. First, check if there was data waiting for us. This
358 // is possible if the other end sent something and then hung up.
359 //
360 // We then need to check to see if there was an upgrade requested,
361 // and if so, the upgraded port needs to have its selection aborted.
476ff2be
SL
362 DISCONNECTED => unsafe {
363 if (*self.data.get()).is_some() {
1a4d82fc
JJ
364 Ok(true)
365 } else {
476ff2be 366 match ptr::replace(self.upgrade.get(), SendUsed) {
1a4d82fc
JJ
367 GoUp(port) => Err(port),
368 _ => Ok(true),
369 }
370 }
476ff2be 371 },
1a4d82fc
JJ
372
373 // We woke ourselves up from select.
374 ptr => unsafe {
c34b1796 375 drop(SignalToken::cast_from_usize(ptr));
1a4d82fc
JJ
376 Ok(false)
377 }
378 }
379 }
380}
381
c34b1796 382impl<T> Drop for Packet<T> {
1a4d82fc
JJ
383 fn drop(&mut self) {
384 assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
385 }
386}