]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/oneshot.rs
New upstream version 1.13.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / oneshot.rs
CommitLineData
1a4d82fc
JJ
1// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
11/// Oneshot channels/ports
12///
13/// This is the initial flavor of channels/ports used for comm module. This is
14/// an optimization for the one-use case of a channel. The major optimization of
15/// this type is to have one and exactly one allocation when the chan/port pair
16/// is created.
17///
18/// Another possible optimization would be to not use an Arc box because
19/// in theory we know when the shared packet can be deallocated (no real need
20/// for the atomic reference counting), but I was having trouble how to destroy
21/// the data early in a drop of a Port.
22///
23/// # Implementation
24///
c34b1796 25/// Oneshots are implemented around one atomic usize variable. This variable
bd371182 26/// indicates both the state of the port/chan but also contains any threads
1a4d82fc
JJ
27/// blocked on the port. All atomic operations happen on this one word.
28///
29/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
30/// on behalf of the channel side of things (it can be mentally thought of as
31/// consuming the port). This upgrade is then also stored in the shared packet.
32/// The one caveat to consider is that when a port sees a disconnected channel
33/// it must check for data because there is no "data plus upgrade" state.
34
35pub use self::Failure::*;
36pub use self::UpgradeResult::*;
37pub use self::SelectionResult::*;
38use self::MyUpgrade::*;
39
1a4d82fc
JJ
40use sync::mpsc::Receiver;
41use sync::mpsc::blocking::{self, SignalToken};
42use core::mem;
85aaf69f 43use sync::atomic::{AtomicUsize, Ordering};
3157f602 44use time::Instant;
1a4d82fc
JJ
45
46// Various states you can find a port in.
c34b1796
AL
47const EMPTY: usize = 0; // initial state: no data, no blocked receiver
48const DATA: usize = 1; // data ready for receiver to take
49const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
1a4d82fc
JJ
50// Any other value represents a pointer to a SignalToken value. The
51// protocol ensures that when the state moves *to* a pointer,
52// ownership of the token is given to the packet, and when the state
53// moves *from* a pointer, ownership of the token is transferred to
54// whoever changed the state.
55
56pub struct Packet<T> {
bd371182 57 // Internal state of the chan/port pair (stores the blocked thread as well)
85aaf69f 58 state: AtomicUsize,
1a4d82fc
JJ
59 // One-shot data slot location
60 data: Option<T>,
61 // when used for the second time, a oneshot channel must be upgraded, and
62 // this contains the slot for the upgrade
63 upgrade: MyUpgrade<T>,
64}
65
66pub enum Failure<T> {
67 Empty,
68 Disconnected,
69 Upgraded(Receiver<T>),
70}
71
72pub enum UpgradeResult {
73 UpSuccess,
74 UpDisconnected,
75 UpWoke(SignalToken),
76}
77
78pub enum SelectionResult<T> {
79 SelCanceled,
80 SelUpgraded(SignalToken, Receiver<T>),
81 SelSuccess,
82}
83
84enum MyUpgrade<T> {
85 NothingSent,
86 SendUsed,
87 GoUp(Receiver<T>),
88}
89
c34b1796 90impl<T> Packet<T> {
1a4d82fc
JJ
91 pub fn new() -> Packet<T> {
92 Packet {
93 data: None,
94 upgrade: NothingSent,
85aaf69f 95 state: AtomicUsize::new(EMPTY),
1a4d82fc
JJ
96 }
97 }
98
99 pub fn send(&mut self, t: T) -> Result<(), T> {
100 // Sanity check
101 match self.upgrade {
102 NothingSent => {}
103 _ => panic!("sending on a oneshot that's already sent on "),
104 }
105 assert!(self.data.is_none());
106 self.data = Some(t);
107 self.upgrade = SendUsed;
108
109 match self.state.swap(DATA, Ordering::SeqCst) {
110 // Sent the data, no one was waiting
111 EMPTY => Ok(()),
112
113 // Couldn't send the data, the port hung up first. Return the data
114 // back up the stack.
115 DISCONNECTED => {
116 Err(self.data.take().unwrap())
117 }
118
119 // Not possible, these are one-use channels
120 DATA => unreachable!(),
121
122 // There is a thread waiting on the other end. We leave the 'DATA'
123 // state inside so it'll pick it up on the other end.
124 ptr => unsafe {
c34b1796 125 SignalToken::cast_from_usize(ptr).signal();
1a4d82fc
JJ
126 Ok(())
127 }
128 }
129 }
130
131 // Just tests whether this channel has been sent on or not, this is only
132 // safe to use from the sender.
133 pub fn sent(&self) -> bool {
134 match self.upgrade {
135 NothingSent => false,
136 _ => true,
137 }
138 }
139
3157f602 140 pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
bd371182 141 // Attempt to not block the thread (it's a little expensive). If it looks
1a4d82fc
JJ
142 // like we're not empty, then immediately go through to `try_recv`.
143 if self.state.load(Ordering::SeqCst) == EMPTY {
144 let (wait_token, signal_token) = blocking::tokens();
c34b1796 145 let ptr = unsafe { signal_token.cast_to_usize() };
1a4d82fc
JJ
146
147 // race with senders to enter the blocking state
148 if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY {
3157f602
XL
149 if let Some(deadline) = deadline {
150 let timed_out = !wait_token.wait_max_until(deadline);
151 // Try to reset the state
152 if timed_out {
9e0c209e 153 self.abort_selection().map_err(Upgraded)?;
3157f602
XL
154 }
155 } else {
156 wait_token.wait();
157 debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
158 }
1a4d82fc
JJ
159 } else {
160 // drop the signal token, since we never blocked
c34b1796 161 drop(unsafe { SignalToken::cast_from_usize(ptr) });
1a4d82fc
JJ
162 }
163 }
164
165 self.try_recv()
166 }
167
168 pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
169 match self.state.load(Ordering::SeqCst) {
170 EMPTY => Err(Empty),
171
172 // We saw some data on the channel, but the channel can be used
173 // again to send us an upgrade. As a result, we need to re-insert
174 // into the channel that there's no data available (otherwise we'll
175 // just see DATA next time). This is done as a cmpxchg because if
176 // the state changes under our feet we'd rather just see that state
177 // change.
178 DATA => {
179 self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
180 match self.data.take() {
181 Some(data) => Ok(data),
182 None => unreachable!(),
183 }
184 }
185
186 // There's no guarantee that we receive before an upgrade happens,
187 // and an upgrade flags the channel as disconnected, so when we see
188 // this we first need to check if there's data available and *then*
189 // we go through and process the upgrade.
190 DISCONNECTED => {
191 match self.data.take() {
192 Some(data) => Ok(data),
193 None => {
194 match mem::replace(&mut self.upgrade, SendUsed) {
195 SendUsed | NothingSent => Err(Disconnected),
196 GoUp(upgrade) => Err(Upgraded(upgrade))
197 }
198 }
199 }
200 }
201
202 // We are the sole receiver; there cannot be a blocking
203 // receiver already.
204 _ => unreachable!()
205 }
206 }
207
208 // Returns whether the upgrade was completed. If the upgrade wasn't
209 // completed, then the port couldn't get sent to the other half (it will
210 // never receive it).
211 pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
212 let prev = match self.upgrade {
213 NothingSent => NothingSent,
214 SendUsed => SendUsed,
215 _ => panic!("upgrading again"),
216 };
217 self.upgrade = GoUp(up);
218
219 match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
220 // If the channel is empty or has data on it, then we're good to go.
221 // Senders will check the data before the upgrade (in case we
222 // plastered over the DATA state).
223 DATA | EMPTY => UpSuccess,
224
225 // If the other end is already disconnected, then we failed the
226 // upgrade. Be sure to trash the port we were given.
227 DISCONNECTED => { self.upgrade = prev; UpDisconnected }
228
229 // If someone's waiting, we gotta wake them up
c34b1796 230 ptr => UpWoke(unsafe { SignalToken::cast_from_usize(ptr) })
1a4d82fc
JJ
231 }
232 }
233
234 pub fn drop_chan(&mut self) {
235 match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
236 DATA | DISCONNECTED | EMPTY => {}
237
238 // If someone's waiting, we gotta wake them up
239 ptr => unsafe {
c34b1796 240 SignalToken::cast_from_usize(ptr).signal();
1a4d82fc
JJ
241 }
242 }
243 }
244
245 pub fn drop_port(&mut self) {
246 match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
247 // An empty channel has nothing to do, and a remotely disconnected
248 // channel also has nothing to do b/c we're about to run the drop
249 // glue
250 DISCONNECTED | EMPTY => {}
251
252 // There's data on the channel, so make sure we destroy it promptly.
253 // This is why not using an arc is a little difficult (need the box
254 // to stay valid while we take the data).
255 DATA => { self.data.take().unwrap(); }
256
257 // We're the only ones that can block on this port
258 _ => unreachable!()
259 }
260 }
261
262 ////////////////////////////////////////////////////////////////////////////
263 // select implementation
264 ////////////////////////////////////////////////////////////////////////////
265
266 // If Ok, the value is whether this port has data, if Err, then the upgraded
267 // port needs to be checked instead of this one.
268 pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
269 match self.state.load(Ordering::SeqCst) {
270 EMPTY => Ok(false), // Welp, we tried
271 DATA => Ok(true), // we have some un-acquired data
272 DISCONNECTED if self.data.is_some() => Ok(true), // we have data
273 DISCONNECTED => {
274 match mem::replace(&mut self.upgrade, SendUsed) {
275 // The other end sent us an upgrade, so we need to
276 // propagate upwards whether the upgrade can receive
277 // data
278 GoUp(upgrade) => Err(upgrade),
279
280 // If the other end disconnected without sending an
281 // upgrade, then we have data to receive (the channel is
282 // disconnected).
283 up => { self.upgrade = up; Ok(true) }
284 }
285 }
286 _ => unreachable!(), // we're the "one blocker"
287 }
288 }
289
290 // Attempts to start selection on this port. This can either succeed, fail
291 // because there is data, or fail because there is an upgrade pending.
292 pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
c34b1796 293 let ptr = unsafe { token.cast_to_usize() };
1a4d82fc
JJ
294 match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
295 EMPTY => SelSuccess,
296 DATA => {
c34b1796 297 drop(unsafe { SignalToken::cast_from_usize(ptr) });
1a4d82fc
JJ
298 SelCanceled
299 }
300 DISCONNECTED if self.data.is_some() => {
c34b1796 301 drop(unsafe { SignalToken::cast_from_usize(ptr) });
1a4d82fc
JJ
302 SelCanceled
303 }
304 DISCONNECTED => {
305 match mem::replace(&mut self.upgrade, SendUsed) {
306 // The other end sent us an upgrade, so we need to
307 // propagate upwards whether the upgrade can receive
308 // data
309 GoUp(upgrade) => {
c34b1796 310 SelUpgraded(unsafe { SignalToken::cast_from_usize(ptr) }, upgrade)
1a4d82fc
JJ
311 }
312
313 // If the other end disconnected without sending an
314 // upgrade, then we have data to receive (the channel is
315 // disconnected).
316 up => {
317 self.upgrade = up;
c34b1796 318 drop(unsafe { SignalToken::cast_from_usize(ptr) });
1a4d82fc
JJ
319 SelCanceled
320 }
321 }
322 }
323 _ => unreachable!(), // we're the "one blocker"
324 }
325 }
326
bd371182
AL
327 // Remove a previous selecting thread from this port. This ensures that the
328 // blocked thread will no longer be visible to any other threads.
1a4d82fc
JJ
329 //
330 // The return value indicates whether there's data on this port.
331 pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
332 let state = match self.state.load(Ordering::SeqCst) {
333 // Each of these states means that no further activity will happen
334 // with regard to abortion selection
335 s @ EMPTY |
336 s @ DATA |
337 s @ DISCONNECTED => s,
338
bd371182 339 // If we've got a blocked thread, then use an atomic to gain ownership
1a4d82fc
JJ
340 // of it (may fail)
341 ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst)
342 };
343
344 // Now that we've got ownership of our state, figure out what to do
345 // about it.
346 match state {
347 EMPTY => unreachable!(),
bd371182 348 // our thread used for select was stolen
1a4d82fc
JJ
349 DATA => Ok(true),
350
351 // If the other end has hung up, then we have complete ownership
352 // of the port. First, check if there was data waiting for us. This
353 // is possible if the other end sent something and then hung up.
354 //
355 // We then need to check to see if there was an upgrade requested,
356 // and if so, the upgraded port needs to have its selection aborted.
357 DISCONNECTED => {
358 if self.data.is_some() {
359 Ok(true)
360 } else {
361 match mem::replace(&mut self.upgrade, SendUsed) {
362 GoUp(port) => Err(port),
363 _ => Ok(true),
364 }
365 }
366 }
367
368 // We woke ourselves up from select.
369 ptr => unsafe {
c34b1796 370 drop(SignalToken::cast_from_usize(ptr));
1a4d82fc
JJ
371 Ok(false)
372 }
373 }
374 }
375}
376
c34b1796 377impl<T> Drop for Packet<T> {
1a4d82fc
JJ
378 fn drop(&mut self) {
379 assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
380 }
381}