1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 /// Oneshot channels/ports
13 /// This is the initial flavor of channels/ports used for comm module. This is
14 /// an optimization for the one-use case of a channel. The major optimization of
15 /// this type is to have one and exactly one allocation when the chan/port pair
18 /// Another possible optimization would be to not use an Arc box because
19 /// in theory we know when the shared packet can be deallocated (no real need
20 /// for the atomic reference counting), but I was having trouble how to destroy
21 /// the data early in a drop of a Port.
25 /// Oneshots are implemented around one atomic usize variable. This variable
26 /// indicates both the state of the port/chan but also contains any threads
27 /// blocked on the port. All atomic operations happen on this one word.
29 /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
30 /// on behalf of the channel side of things (it can be mentally thought of as
31 /// consuming the port). This upgrade is then also stored in the shared packet.
32 /// The one caveat to consider is that when a port sees a disconnected channel
33 /// it must check for data because there is no "data plus upgrade" state.
35 pub use self::Failure
::*;
36 pub use self::UpgradeResult
::*;
37 pub use self::SelectionResult
::*;
38 use self::MyUpgrade
::*;
42 use sync
::mpsc
::Receiver
;
43 use sync
::mpsc
::blocking
::{self, SignalToken}
;
45 use sync
::atomic
::{AtomicUsize, Ordering}
;
47 // Various states you can find a port in.
48 const EMPTY
: usize = 0; // initial state: no data, no blocked receiver
49 const DATA
: usize = 1; // data ready for receiver to take
50 const DISCONNECTED
: usize = 2; // channel is disconnected OR upgraded
51 // Any other value represents a pointer to a SignalToken value. The
52 // protocol ensures that when the state moves *to* a pointer,
53 // ownership of the token is given to the packet, and when the state
54 // moves *from* a pointer, ownership of the token is transferred to
55 // whoever changed the state.
57 pub struct Packet
<T
> {
58 // Internal state of the chan/port pair (stores the blocked thread as well)
60 // One-shot data slot location
62 // when used for the second time, a oneshot channel must be upgraded, and
63 // this contains the slot for the upgrade
64 upgrade
: MyUpgrade
<T
>,
70 Upgraded(Receiver
<T
>),
73 pub enum UpgradeResult
{
79 pub enum SelectionResult
<T
> {
81 SelUpgraded(SignalToken
, Receiver
<T
>),
92 pub fn new() -> Packet
<T
> {
96 state
: AtomicUsize
::new(EMPTY
),
100 pub fn send(&mut self, t
: T
) -> Result
<(), T
> {
104 _
=> panic
!("sending on a oneshot that's already sent on "),
106 assert
!(self.data
.is_none());
108 self.upgrade
= SendUsed
;
110 match self.state
.swap(DATA
, Ordering
::SeqCst
) {
111 // Sent the data, no one was waiting
114 // Couldn't send the data, the port hung up first. Return the data
115 // back up the stack.
117 Err(self.data
.take().unwrap())
120 // Not possible, these are one-use channels
121 DATA
=> unreachable
!(),
123 // There is a thread waiting on the other end. We leave the 'DATA'
124 // state inside so it'll pick it up on the other end.
126 SignalToken
::cast_from_usize(ptr
).signal();
132 // Just tests whether this channel has been sent on or not, this is only
133 // safe to use from the sender.
134 pub fn sent(&self) -> bool
{
136 NothingSent
=> false,
141 pub fn recv(&mut self) -> Result
<T
, Failure
<T
>> {
142 // Attempt to not block the thread (it's a little expensive). If it looks
143 // like we're not empty, then immediately go through to `try_recv`.
144 if self.state
.load(Ordering
::SeqCst
) == EMPTY
{
145 let (wait_token
, signal_token
) = blocking
::tokens();
146 let ptr
= unsafe { signal_token.cast_to_usize() }
;
148 // race with senders to enter the blocking state
149 if self.state
.compare_and_swap(EMPTY
, ptr
, Ordering
::SeqCst
) == EMPTY
{
151 debug_assert
!(self.state
.load(Ordering
::SeqCst
) != EMPTY
);
153 // drop the signal token, since we never blocked
154 drop(unsafe { SignalToken::cast_from_usize(ptr) }
);
161 pub fn try_recv(&mut self) -> Result
<T
, Failure
<T
>> {
162 match self.state
.load(Ordering
::SeqCst
) {
165 // We saw some data on the channel, but the channel can be used
166 // again to send us an upgrade. As a result, we need to re-insert
167 // into the channel that there's no data available (otherwise we'll
168 // just see DATA next time). This is done as a cmpxchg because if
169 // the state changes under our feet we'd rather just see that state
172 self.state
.compare_and_swap(DATA
, EMPTY
, Ordering
::SeqCst
);
173 match self.data
.take() {
174 Some(data
) => Ok(data
),
175 None
=> unreachable
!(),
179 // There's no guarantee that we receive before an upgrade happens,
180 // and an upgrade flags the channel as disconnected, so when we see
181 // this we first need to check if there's data available and *then*
182 // we go through and process the upgrade.
184 match self.data
.take() {
185 Some(data
) => Ok(data
),
187 match mem
::replace(&mut self.upgrade
, SendUsed
) {
188 SendUsed
| NothingSent
=> Err(Disconnected
),
189 GoUp(upgrade
) => Err(Upgraded(upgrade
))
195 // We are the sole receiver; there cannot be a blocking
201 // Returns whether the upgrade was completed. If the upgrade wasn't
202 // completed, then the port couldn't get sent to the other half (it will
203 // never receive it).
204 pub fn upgrade(&mut self, up
: Receiver
<T
>) -> UpgradeResult
{
205 let prev
= match self.upgrade
{
206 NothingSent
=> NothingSent
,
207 SendUsed
=> SendUsed
,
208 _
=> panic
!("upgrading again"),
210 self.upgrade
= GoUp(up
);
212 match self.state
.swap(DISCONNECTED
, Ordering
::SeqCst
) {
213 // If the channel is empty or has data on it, then we're good to go.
214 // Senders will check the data before the upgrade (in case we
215 // plastered over the DATA state).
216 DATA
| EMPTY
=> UpSuccess
,
218 // If the other end is already disconnected, then we failed the
219 // upgrade. Be sure to trash the port we were given.
220 DISCONNECTED
=> { self.upgrade = prev; UpDisconnected }
222 // If someone's waiting, we gotta wake them up
223 ptr
=> UpWoke(unsafe { SignalToken::cast_from_usize(ptr) }
)
227 pub fn drop_chan(&mut self) {
228 match self.state
.swap(DISCONNECTED
, Ordering
::SeqCst
) {
229 DATA
| DISCONNECTED
| EMPTY
=> {}
231 // If someone's waiting, we gotta wake them up
233 SignalToken
::cast_from_usize(ptr
).signal();
238 pub fn drop_port(&mut self) {
239 match self.state
.swap(DISCONNECTED
, Ordering
::SeqCst
) {
240 // An empty channel has nothing to do, and a remotely disconnected
241 // channel also has nothing to do b/c we're about to run the drop
243 DISCONNECTED
| EMPTY
=> {}
245 // There's data on the channel, so make sure we destroy it promptly.
246 // This is why not using an arc is a little difficult (need the box
247 // to stay valid while we take the data).
248 DATA
=> { self.data.take().unwrap(); }
250 // We're the only ones that can block on this port
255 ////////////////////////////////////////////////////////////////////////////
256 // select implementation
257 ////////////////////////////////////////////////////////////////////////////
259 // If Ok, the value is whether this port has data, if Err, then the upgraded
260 // port needs to be checked instead of this one.
261 pub fn can_recv(&mut self) -> Result
<bool
, Receiver
<T
>> {
262 match self.state
.load(Ordering
::SeqCst
) {
263 EMPTY
=> Ok(false), // Welp, we tried
264 DATA
=> Ok(true), // we have some un-acquired data
265 DISCONNECTED
if self.data
.is_some() => Ok(true), // we have data
267 match mem
::replace(&mut self.upgrade
, SendUsed
) {
268 // The other end sent us an upgrade, so we need to
269 // propagate upwards whether the upgrade can receive
271 GoUp(upgrade
) => Err(upgrade
),
273 // If the other end disconnected without sending an
274 // upgrade, then we have data to receive (the channel is
276 up
=> { self.upgrade = up; Ok(true) }
279 _
=> unreachable
!(), // we're the "one blocker"
283 // Attempts to start selection on this port. This can either succeed, fail
284 // because there is data, or fail because there is an upgrade pending.
285 pub fn start_selection(&mut self, token
: SignalToken
) -> SelectionResult
<T
> {
286 let ptr
= unsafe { token.cast_to_usize() }
;
287 match self.state
.compare_and_swap(EMPTY
, ptr
, Ordering
::SeqCst
) {
290 drop(unsafe { SignalToken::cast_from_usize(ptr) }
);
293 DISCONNECTED
if self.data
.is_some() => {
294 drop(unsafe { SignalToken::cast_from_usize(ptr) }
);
298 match mem
::replace(&mut self.upgrade
, SendUsed
) {
299 // The other end sent us an upgrade, so we need to
300 // propagate upwards whether the upgrade can receive
303 SelUpgraded(unsafe { SignalToken::cast_from_usize(ptr) }
, upgrade
)
306 // If the other end disconnected without sending an
307 // upgrade, then we have data to receive (the channel is
311 drop(unsafe { SignalToken::cast_from_usize(ptr) }
);
316 _
=> unreachable
!(), // we're the "one blocker"
320 // Remove a previous selecting thread from this port. This ensures that the
321 // blocked thread will no longer be visible to any other threads.
323 // The return value indicates whether there's data on this port.
324 pub fn abort_selection(&mut self) -> Result
<bool
, Receiver
<T
>> {
325 let state
= match self.state
.load(Ordering
::SeqCst
) {
326 // Each of these states means that no further activity will happen
327 // with regard to abortion selection
330 s @ DISCONNECTED
=> s
,
332 // If we've got a blocked thread, then use an atomic to gain ownership
334 ptr
=> self.state
.compare_and_swap(ptr
, EMPTY
, Ordering
::SeqCst
)
337 // Now that we've got ownership of our state, figure out what to do
340 EMPTY
=> unreachable
!(),
341 // our thread used for select was stolen
344 // If the other end has hung up, then we have complete ownership
345 // of the port. First, check if there was data waiting for us. This
346 // is possible if the other end sent something and then hung up.
348 // We then need to check to see if there was an upgrade requested,
349 // and if so, the upgraded port needs to have its selection aborted.
351 if self.data
.is_some() {
354 match mem
::replace(&mut self.upgrade
, SendUsed
) {
355 GoUp(port
) => Err(port
),
361 // We woke ourselves up from select.
363 drop(SignalToken
::cast_from_usize(ptr
));
370 impl<T
> Drop
for Packet
<T
> {
372 assert_eq
!(self.state
.load(Ordering
::SeqCst
), DISCONNECTED
);