3 /// This is the flavor of channels which are optimized for one sender and one
4 /// receiver. The sender will be upgraded to a shared channel if the channel is
7 /// High level implementation details can be found in the comment of the parent
9 pub use self::Failure
::*;
11 pub use self::UpgradeResult
::*;
16 use crate::cell
::UnsafeCell
;
19 use crate::time
::Instant
;
21 use crate::sync
::atomic
::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}
;
22 use crate::sync
::mpsc
::blocking
::{self, SignalToken}
;
23 use crate::sync
::mpsc
::spsc_queue
as spsc
;
24 use crate::sync
::mpsc
::Receiver
;
26 const DISCONNECTED
: isize = isize::MIN
;
28 const MAX_STEALS
: isize = 5;
30 const MAX_STEALS
: isize = 1 << 20;
32 pub struct Packet
<T
> {
33 // internal queue for all messages
34 queue
: spsc
::Queue
<Message
<T
>, ProducerAddition
, ConsumerAddition
>,
37 struct ProducerAddition
{
38 cnt
: AtomicIsize
, // How many items are on this channel
39 to_wake
: AtomicUsize
, // SignalToken for the blocked thread to wake up
41 port_dropped
: AtomicBool
, // flag if the channel has been destroyed.
44 struct ConsumerAddition
{
45 steals
: UnsafeCell
<isize>, // How many times has a port received without blocking?
51 Upgraded(Receiver
<T
>),
54 pub enum UpgradeResult
{
60 // Any message could contain an "upgrade request" to a new shared port, so the
61 // internal queue it's a queue of T, but rather Message<T>
68 pub fn new() -> Packet
<T
> {
71 spsc
::Queue
::with_additions(
74 cnt
: AtomicIsize
::new(0),
75 to_wake
: AtomicUsize
::new(0),
77 port_dropped
: AtomicBool
::new(false),
79 ConsumerAddition { steals: UnsafeCell::new(0) }
,
85 pub fn send(&self, t
: T
) -> Result
<(), T
> {
86 // If the other port has deterministically gone away, then definitely
87 // must return the data back up the stack. Otherwise, the data is
88 // considered as being sent.
89 if self.queue
.producer_addition().port_dropped
.load(Ordering
::SeqCst
) {
93 match self.do_send(Data(t
)) {
94 UpSuccess
| UpDisconnected
=> {}
102 pub fn upgrade(&self, up
: Receiver
<T
>) -> UpgradeResult
{
103 // If the port has gone away, then there's no need to proceed any
105 if self.queue
.producer_addition().port_dropped
.load(Ordering
::SeqCst
) {
106 return UpDisconnected
;
109 self.do_send(GoUp(up
))
112 fn do_send(&self, t
: Message
<T
>) -> UpgradeResult
{
114 match self.queue
.producer_addition().cnt
.fetch_add(1, Ordering
::SeqCst
) {
115 // As described in the mod's doc comment, -1 == wakeup
116 -1 => UpWoke(self.take_to_wake()),
117 // As as described before, SPSC queues must be >= -2
120 // Be sure to preserve the disconnected state, and the return value
121 // in this case is going to be whether our data was received or not.
122 // This manifests itself on whether we have an empty queue or not.
124 // Primarily, are required to drain the queue here because the port
125 // will never remove this data. We can only have at most one item to
126 // drain (the port drains the rest).
128 self.queue
.producer_addition().cnt
.store(DISCONNECTED
, Ordering
::SeqCst
);
129 let first
= self.queue
.pop();
130 let second
= self.queue
.pop();
131 assert
!(second
.is_none());
134 Some(..) => UpSuccess
, // we failed to send the data
135 None
=> UpDisconnected
, // we successfully sent data
139 // Otherwise we just sent some data on a non-waiting queue, so just
140 // make sure the world is sane and carry on!
148 // Consumes ownership of the 'to_wake' field.
149 fn take_to_wake(&self) -> SignalToken
{
150 let ptr
= self.queue
.producer_addition().to_wake
.load(Ordering
::SeqCst
);
151 self.queue
.producer_addition().to_wake
.store(0, Ordering
::SeqCst
);
153 unsafe { SignalToken::cast_from_usize(ptr) }
156 // Decrements the count on the channel for a sleeper, returning the sleeper
157 // back if it shouldn't sleep. Note that this is the location where we take
158 // steals into account.
159 fn decrement(&self, token
: SignalToken
) -> Result
<(), SignalToken
> {
160 assert_eq
!(self.queue
.producer_addition().to_wake
.load(Ordering
::SeqCst
), 0);
161 let ptr
= unsafe { token.cast_to_usize() }
;
162 self.queue
.producer_addition().to_wake
.store(ptr
, Ordering
::SeqCst
);
164 let steals
= unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) }
;
166 match self.queue
.producer_addition().cnt
.fetch_sub(1 + steals
, Ordering
::SeqCst
) {
168 self.queue
.producer_addition().cnt
.store(DISCONNECTED
, Ordering
::SeqCst
);
170 // If we factor in our steals and notice that the channel has no
171 // data, we successfully sleep
180 self.queue
.producer_addition().to_wake
.store(0, Ordering
::SeqCst
);
181 Err(unsafe { SignalToken::cast_from_usize(ptr) }
)
184 pub fn recv(&self, deadline
: Option
<Instant
>) -> Result
<T
, Failure
<T
>> {
185 // Optimistic preflight check (scheduling is expensive).
186 match self.try_recv() {
191 // Welp, our channel has no data. Deschedule the current thread and
192 // initiate the blocking protocol.
193 let (wait_token
, signal_token
) = blocking
::tokens();
194 if self.decrement(signal_token
).is_ok() {
195 if let Some(deadline
) = deadline
{
196 let timed_out
= !wait_token
.wait_max_until(deadline
);
198 self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded
)?
;
205 match self.try_recv() {
206 // Messages which actually popped from the queue shouldn't count as
207 // a steal, so offset the decrement here (we already have our
208 // "steal" factored into the channel count above).
209 data @
Ok(..) | data @
Err(Upgraded(..)) => unsafe {
210 *self.queue
.consumer_addition().steals
.get() -= 1;
218 pub fn try_recv(&self) -> Result
<T
, Failure
<T
>> {
219 match self.queue
.pop() {
220 // If we stole some data, record to that effect (this will be
221 // factored into cnt later on).
223 // Note that we don't allow steals to grow without bound in order to
224 // prevent eventual overflow of either steals or cnt as an overflow
225 // would have catastrophic results. Sometimes, steals > cnt, but
226 // other times cnt > steals, so we don't know the relation between
227 // steals and cnt. This code path is executed only rarely, so we do
228 // a pretty slow operation, of swapping 0 into cnt, taking steals
229 // down as much as possible (without going negative), and then
230 // adding back in whatever we couldn't factor into steals.
231 Some(data
) => unsafe {
232 if *self.queue
.consumer_addition().steals
.get() > MAX_STEALS
{
233 match self.queue
.producer_addition().cnt
.swap(0, Ordering
::SeqCst
) {
238 .store(DISCONNECTED
, Ordering
::SeqCst
);
241 let m
= cmp
::min(n
, *self.queue
.consumer_addition().steals
.get());
242 *self.queue
.consumer_addition().steals
.get() -= m
;
246 assert
!(*self.queue
.consumer_addition().steals
.get() >= 0);
248 *self.queue
.consumer_addition().steals
.get() += 1;
251 GoUp(up
) => Err(Upgraded(up
)),
256 match self.queue
.producer_addition().cnt
.load(Ordering
::SeqCst
) {
257 n
if n
!= DISCONNECTED
=> Err(Empty
),
259 // This is a little bit of a tricky case. We failed to pop
260 // data above, and then we have viewed that the channel is
261 // disconnected. In this window more data could have been
262 // sent on the channel. It doesn't really make sense to
263 // return that the channel is disconnected when there's
264 // actually data on it, so be extra sure there's no data by
265 // popping one more time.
267 // We can ignore steals because the other end is
268 // disconnected and we'll never need to really factor in our
270 _
=> match self.queue
.pop() {
271 Some(Data(t
)) => Ok(t
),
272 Some(GoUp(up
)) => Err(Upgraded(up
)),
273 None
=> Err(Disconnected
),
280 pub fn drop_chan(&self) {
281 // Dropping a channel is pretty simple, we just flag it as disconnected
282 // and then wakeup a blocker if there is one.
283 match self.queue
.producer_addition().cnt
.swap(DISCONNECTED
, Ordering
::SeqCst
) {
285 self.take_to_wake().signal();
294 pub fn drop_port(&self) {
295 // Dropping a port seems like a fairly trivial thing. In theory all we
296 // need to do is flag that we're disconnected and then everything else
297 // can take over (we don't have anyone to wake up).
299 // The catch for Ports is that we want to drop the entire contents of
300 // the queue. There are multiple reasons for having this property, the
301 // largest of which is that if another chan is waiting in this channel
302 // (but not received yet), then waiting on that port will cause a
305 // So if we accept that we must now destroy the entire contents of the
306 // queue, this code may make a bit more sense. The tricky part is that
307 // we can't let any in-flight sends go un-dropped, we have to make sure
308 // *everything* is dropped and nothing new will come onto the channel.
310 // The first thing we do is set a flag saying that we're done for. All
311 // sends are gated on this flag, so we're immediately guaranteed that
312 // there are a bounded number of active sends that we'll have to deal
314 self.queue
.producer_addition().port_dropped
.store(true, Ordering
::SeqCst
);
316 // Now that we're guaranteed to deal with a bounded number of senders,
317 // we need to drain the queue. This draining process happens atomically
318 // with respect to the "count" of the channel. If the count is nonzero
319 // (with steals taken into account), then there must be data on the
320 // channel. In this case we drain everything and then try again. We will
321 // continue to fail while active senders send data while we're dropping
322 // data, but eventually we're guaranteed to break out of this loop
323 // (because there is a bounded number of senders).
324 let mut steals
= unsafe { *self.queue.consumer_addition().steals.get() }
;
326 let cnt
= self.queue
.producer_addition().cnt
.compare_and_swap(
331 cnt
!= DISCONNECTED
&& cnt
!= steals
333 while let Some(_
) = self.queue
.pop() {
338 // At this point in time, we have gated all future senders from sending,
339 // and we have flagged the channel as being disconnected. The senders
340 // still have some responsibility, however, because some sends may not
341 // complete until after we flag the disconnection. There are more
342 // details in the sending methods that see DISCONNECTED
345 ////////////////////////////////////////////////////////////////////////////
346 // select implementation
347 ////////////////////////////////////////////////////////////////////////////
349 // increment the count on the channel (used for selection)
350 fn bump(&self, amt
: isize) -> isize {
351 match self.queue
.producer_addition().cnt
.fetch_add(amt
, Ordering
::SeqCst
) {
353 self.queue
.producer_addition().cnt
.store(DISCONNECTED
, Ordering
::SeqCst
);
360 // Removes a previous thread from being blocked in this port
361 pub fn abort_selection(&self, was_upgrade
: bool
) -> Result
<bool
, Receiver
<T
>> {
362 // If we're aborting selection after upgrading from a oneshot, then
363 // we're guarantee that no one is waiting. The only way that we could
364 // have seen the upgrade is if data was actually sent on the channel
365 // half again. For us, this means that there is guaranteed to be data on
366 // this channel. Furthermore, we're guaranteed that there was no
367 // start_selection previously, so there's no need to modify `self.cnt`
370 // Hence, because of these invariants, we immediately return `Ok(true)`.
371 // Note that the data may not actually be sent on the channel just yet.
372 // The other end could have flagged the upgrade but not sent data to
373 // this end. This is fine because we know it's a small bounded windows
374 // of time until the data is actually sent.
376 assert_eq
!(unsafe { *self.queue.consumer_addition().steals.get() }
, 0);
377 assert_eq
!(self.queue
.producer_addition().to_wake
.load(Ordering
::SeqCst
), 0);
381 // We want to make sure that the count on the channel goes non-negative,
382 // and in the stream case we can have at most one steal, so just assume
383 // that we had one steal.
385 let prev
= self.bump(steals
+ 1);
387 // If we were previously disconnected, then we know for sure that there
388 // is no thread in to_wake, so just keep going
389 let has_data
= if prev
== DISCONNECTED
{
390 assert_eq
!(self.queue
.producer_addition().to_wake
.load(Ordering
::SeqCst
), 0);
391 true // there is data, that data is that we're disconnected
393 let cur
= prev
+ steals
+ 1;
396 // If the previous count was negative, then we just made things go
397 // positive, hence we passed the -1 boundary and we're responsible
398 // for removing the to_wake() field and trashing it.
400 // If the previous count was positive then we're in a tougher
401 // situation. A possible race is that a sender just incremented
402 // through -1 (meaning it's going to try to wake a thread up), but it
403 // hasn't yet read the to_wake. In order to prevent a future recv()
404 // from waking up too early (this sender picking up the plastered
405 // over to_wake), we spin loop here waiting for to_wake to be 0.
406 // Note that this entire select() implementation needs an overhaul,
407 // and this is *not* the worst part of it, so this is not done as a
408 // final solution but rather out of necessity for now to get
409 // something working.
411 drop(self.take_to_wake());
413 while self.queue
.producer_addition().to_wake
.load(Ordering
::SeqCst
) != 0 {
418 assert_eq
!(*self.queue
.consumer_addition().steals
.get(), 0);
419 *self.queue
.consumer_addition().steals
.get() = steals
;
422 // if we were previously positive, then there's surely data to
427 // Now that we've determined that this queue "has data", we peek at the
428 // queue to see if the data is an upgrade or not. If it's an upgrade,
429 // then we need to destroy this port and abort selection on the
432 match self.queue
.peek() {
433 Some(&mut GoUp(..)) => match self.queue
.pop() {
434 Some(GoUp(port
)) => Err(port
),
445 impl<T
> Drop
for Packet
<T
> {
447 // Note that this load is not only an assert for correctness about
448 // disconnection, but also a proper fence before the read of
449 // `to_wake`, so this assert cannot be removed with also removing
450 // the `to_wake` assert.
451 assert_eq
!(self.queue
.producer_addition().cnt
.load(Ordering
::SeqCst
), DISCONNECTED
);
452 assert_eq
!(self.queue
.producer_addition().to_wake
.load(Ordering
::SeqCst
), 0);