1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
24 //! These channels come in two flavors:
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
57 //! use std::sync::mpsc::channel;
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //! tx.send(10).unwrap();
64 //! assert_eq!(rx.recv().unwrap(), 10);
71 //! use std::sync::mpsc::channel;
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
78 //! let tx = tx.clone();
79 //! thread::spawn(move|| {
80 //! tx.send(i).unwrap();
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
90 //! Propagating panics:
93 //! use std::sync::mpsc::channel;
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
99 //! assert!(rx.recv().is_err());
102 //! Synchronous channels:
106 //! use std::sync::mpsc::sync_channel;
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //! // This will wait for the parent thread to start receiving
111 //! tx.send(53).unwrap();
113 //! rx.recv().unwrap();
116 #![stable(feature = "rust1", since = "1.0.0")]
118 // A description of how Rust's channel implementation works
120 // Channels are supposed to be the basic building block for all other
121 // concurrent primitives that are used in Rust. As a result, the channel type
122 // needs to be highly optimized, flexible, and broad enough for use everywhere.
124 // The choice of implementation of all channels is to be built on lock-free data
125 // structures. The channels themselves are then consequently also lock-free data
126 // structures. As always with lock-free code, this is a very "here be dragons"
127 // territory, especially because I'm unaware of any academic papers that have
128 // gone into great length about channels of these flavors.
130 // ## Flavors of channels
132 // From the perspective of a consumer of this library, there is only one flavor
133 // of channel. This channel can be used as a stream and cloned to allow multiple
134 // senders. Under the hood, however, there are actually three flavors of
137 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
138 // They contain as few atomics as possible and involve one and
139 // exactly one allocation.
140 // * Streams - these channels are optimized for the non-shared use case. They
141 // use a different concurrent queue that is more tailored for this
142 // use case. The initial allocation of this flavor of channel is not
144 // * Shared - this is the most general form of channel that this module offers,
145 // a channel with multiple senders. This type is as optimized as it
146 // can be, but the previous two types mentioned are much faster for
149 // ## Concurrent queues
151 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
152 // recv() obviously blocks. This means that under the hood there must be some
153 // shared and concurrent queue holding all of the actual data.
155 // With two flavors of channels, two flavors of queues are also used. We have
156 // chosen to use queues from a well-known author that are abbreviated as SPSC
157 // and MPSC (single producer, single consumer and multiple producer, single
158 // consumer). SPSC queues are used for streams while MPSC queues are used for
161 // ### SPSC optimizations
163 // The SPSC queue found online is essentially a linked list of nodes where one
164 // half of the nodes are the "queue of data" and the other half of nodes are a
165 // cache of unused nodes. The unused nodes are used such that an allocation is
166 // not required on every push() and a free doesn't need to happen on every
169 // As found online, however, the cache of nodes is of an infinite size. This
170 // means that if a channel at one point in its life had 50k items in the queue,
171 // then the queue will always have the capacity for 50k items. I believed that
172 // this was an unnecessary limitation of the implementation, so I have altered
173 // the queue to optionally have a bound on the cache size.
175 // By default, streams will have an unbounded SPSC queue with a small-ish cache
176 // size. The hope is that the cache is still large enough to have very fast
177 // send() operations while not too large such that millions of channels can
180 // ### MPSC optimizations
182 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183 // a linked list under the hood to earn its unboundedness, but I have not put
184 // forth much effort into having a cache of nodes similar to the SPSC queue.
186 // For now, I believe that this is "ok" because shared channels are not the most
187 // common type, but soon we may wish to revisit this queue choice and determine
188 // another candidate for backend storage of shared channels.
190 // ## Overview of the Implementation
192 // Now that there's a little background on the concurrent queues used, it's
193 // worth going into much more detail about the channels themselves. The basic
194 // pseudocode for a send/recv are:
198 // queue.push(t) return if queue.pop()
199 // if increment() == -1 deschedule {
200 // wakeup() if decrement() > 0
201 // cancel_deschedule()
205 // As mentioned before, there are no locks in this implementation, only atomic
206 // instructions are used.
208 // ### The internal atomic counter
210 // Every channel has a shared counter with each half to keep track of the size
211 // of the queue. This counter is used to abort descheduling by the receiver and
212 // to know when to wake up on the sending side.
214 // As seen in the pseudocode, senders will increment this count and receivers
215 // will decrement the count. The theory behind this is that if a sender sees a
216 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217 // then it doesn't need to block.
219 // The recv() method has a beginning call to pop(), and if successful, it needs
220 // to decrement the count. It is a crucial implementation detail that this
221 // decrement does *not* happen to the shared counter. If this were the case,
222 // then it would be possible for the counter to be very negative when there were
223 // no receivers waiting, in which case the senders would have to determine when
224 // it was actually appropriate to wake up a receiver.
226 // Instead, the "steal count" is kept track of separately (not atomically
227 // because it's only used by receivers), and then the decrement() call when
228 // descheduling will lump in all of the recent steals into one large decrement.
230 // The implication of this is that if a sender sees a -1 count, then there's
231 // guaranteed to be a waiter waiting!
233 // ## Native Implementation
235 // A major goal of these channels is to work seamlessly on and off the runtime.
236 // All of the previous race conditions have been worded in terms of
237 // scheduler-isms (which is obviously not available without the runtime).
239 // For now, native usage of channels (off the runtime) will fall back onto
240 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
242 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243 // condition variable.
247 // Being able to support selection over channels has greatly influenced this
248 // design, and not only does selection need to work inside the runtime, but also
249 // outside the runtime.
251 // The implementation is fairly straightforward. The goal of select() is not to
252 // return some data, but only to return which channel can receive data without
253 // blocking. The implementation is essentially the entire blocking procedure
254 // followed by an increment as soon as its woken up. The cancellation procedure
255 // involves an increment and swapping out of to_wake to acquire ownership of the
256 // thread to unblock.
258 // Sadly this current implementation requires multiple allocations, so I have
259 // seen the throughput of select() be much worse than it should be. I do not
260 // believe that there is anything fundamental that needs to change about these
261 // channels, however, in order to support a more efficient select().
265 // And now that you've seen all the races that I found and attempted to fix,
266 // here's the code for you to find some more!
272 use cell
::UnsafeCell
;
275 #[unstable(feature = "mpsc_select", issue = "27800")]
276 pub use self::select
::{Select, Handle}
;
277 use self::select
::StartResult
;
278 use self::select
::StartResult
::*;
279 use self::blocking
::SignalToken
;
290 /// The receiving-half of Rust's channel type. This half can only be owned by
292 #[stable(feature = "rust1", since = "1.0.0")]
293 pub struct Receiver
<T
> {
294 inner
: UnsafeCell
<Flavor
<T
>>,
297 // The receiver port can be sent from place to place, so long as it
298 // is not used to receive non-sendable things.
299 #[stable(feature = "rust1", since = "1.0.0")]
300 unsafe impl<T
: Send
> Send
for Receiver
<T
> { }
302 #[stable(feature = "rust1", since = "1.0.0")]
303 impl<T
> !Sync
for Receiver
<T
> { }
305 /// An iterator over messages on a receiver, this iterator will block
306 /// whenever `next` is called, waiting for a new message, and `None` will be
307 /// returned when the corresponding channel has hung up.
308 #[stable(feature = "rust1", since = "1.0.0")]
309 pub struct Iter
<'a
, T
: 'a
> {
313 /// An owning iterator over messages on a receiver, this iterator will block
314 /// whenever `next` is called, waiting for a new message, and `None` will be
315 /// returned when the corresponding channel has hung up.
316 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
317 pub struct IntoIter
<T
> {
321 /// The sending-half of Rust's asynchronous channel type. This half can only be
322 /// owned by one thread, but it can be cloned to send to other threads.
323 #[stable(feature = "rust1", since = "1.0.0")]
324 pub struct Sender
<T
> {
325 inner
: UnsafeCell
<Flavor
<T
>>,
328 // The send port can be sent from place to place, so long as it
329 // is not used to send non-sendable things.
330 #[stable(feature = "rust1", since = "1.0.0")]
331 unsafe impl<T
: Send
> Send
for Sender
<T
> { }
333 #[stable(feature = "rust1", since = "1.0.0")]
334 impl<T
> !Sync
for Sender
<T
> { }
336 /// The sending-half of Rust's synchronous channel type. This half can only be
337 /// owned by one thread, but it can be cloned to send to other threads.
338 #[stable(feature = "rust1", since = "1.0.0")]
339 pub struct SyncSender
<T
> {
340 inner
: Arc
<UnsafeCell
<sync
::Packet
<T
>>>,
343 #[stable(feature = "rust1", since = "1.0.0")]
344 unsafe impl<T
: Send
> Send
for SyncSender
<T
> {}
346 #[stable(feature = "rust1", since = "1.0.0")]
347 impl<T
> !Sync
for SyncSender
<T
> {}
349 /// An error returned from the `send` function on channels.
351 /// A `send` operation can only fail if the receiving end of a channel is
352 /// disconnected, implying that the data could never be received. The error
353 /// contains the data being sent as a payload so it can be recovered.
354 #[stable(feature = "rust1", since = "1.0.0")]
355 #[derive(PartialEq, Eq, Clone, Copy)]
356 pub struct SendError
<T
>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
358 /// An error returned from the `recv` function on a `Receiver`.
360 /// The `recv` operation can only fail if the sending half of a channel is
361 /// disconnected, implying that no further messages will ever be received.
362 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
363 #[stable(feature = "rust1", since = "1.0.0")]
364 pub struct RecvError
;
366 /// This enumeration is the list of the possible reasons that `try_recv` could
367 /// not return data when called.
368 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
369 #[stable(feature = "rust1", since = "1.0.0")]
370 pub enum TryRecvError
{
371 /// This channel is currently empty, but the sender(s) have not yet
372 /// disconnected, so data may yet become available.
373 #[stable(feature = "rust1", since = "1.0.0")]
376 /// This channel's sending half has become disconnected, and there will
377 /// never be any more data received on this channel
378 #[stable(feature = "rust1", since = "1.0.0")]
382 /// This enumeration is the list of the possible error outcomes for the
383 /// `SyncSender::try_send` method.
384 #[stable(feature = "rust1", since = "1.0.0")]
385 #[derive(PartialEq, Eq, Clone, Copy)]
386 pub enum TrySendError
<T
> {
387 /// The data could not be sent on the channel because it would require that
388 /// the callee block to send the data.
390 /// If this is a buffered channel, then the buffer is full at this time. If
391 /// this is not a buffered channel, then there is no receiver available to
392 /// acquire the data.
393 #[stable(feature = "rust1", since = "1.0.0")]
394 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
396 /// This channel's receiving half has disconnected, so the data could not be
397 /// sent. The data is returned back to the callee in this case.
398 #[stable(feature = "rust1", since = "1.0.0")]
399 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
403 Oneshot(Arc
<UnsafeCell
<oneshot
::Packet
<T
>>>),
404 Stream(Arc
<UnsafeCell
<stream
::Packet
<T
>>>),
405 Shared(Arc
<UnsafeCell
<shared
::Packet
<T
>>>),
406 Sync(Arc
<UnsafeCell
<sync
::Packet
<T
>>>),
410 trait UnsafeFlavor
<T
> {
411 fn inner_unsafe(&self) -> &UnsafeCell
<Flavor
<T
>>;
412 unsafe fn inner_mut(&self) -> &mut Flavor
<T
> {
413 &mut *self.inner_unsafe().get()
415 unsafe fn inner(&self) -> &Flavor
<T
> {
416 &*self.inner_unsafe().get()
419 impl<T
> UnsafeFlavor
<T
> for Sender
<T
> {
420 fn inner_unsafe(&self) -> &UnsafeCell
<Flavor
<T
>> {
424 impl<T
> UnsafeFlavor
<T
> for Receiver
<T
> {
425 fn inner_unsafe(&self) -> &UnsafeCell
<Flavor
<T
>> {
430 /// Creates a new asynchronous channel, returning the sender/receiver halves.
432 /// All data sent on the sender will become available on the receiver, and no
433 /// send will block the calling thread (this channel has an "infinite buffer").
438 /// use std::sync::mpsc::channel;
441 /// // tx is the sending half (tx for transmission), and rx is the receiving
442 /// // half (rx for receiving).
443 /// let (tx, rx) = channel();
445 /// // Spawn off an expensive computation
446 /// thread::spawn(move|| {
447 /// # fn expensive_computation() {}
448 /// tx.send(expensive_computation()).unwrap();
451 /// // Do some useful work for awhile
453 /// // Let's see what that answer was
454 /// println!("{:?}", rx.recv().unwrap());
456 #[stable(feature = "rust1", since = "1.0.0")]
457 pub fn channel
<T
>() -> (Sender
<T
>, Receiver
<T
>) {
458 let a
= Arc
::new(UnsafeCell
::new(oneshot
::Packet
::new()));
459 (Sender
::new(Flavor
::Oneshot(a
.clone())), Receiver
::new(Flavor
::Oneshot(a
)))
462 /// Creates a new synchronous, bounded channel.
464 /// Like asynchronous channels, the `Receiver` will block until a message
465 /// becomes available. These channels differ greatly in the semantics of the
466 /// sender from asynchronous channels, however.
468 /// This channel has an internal buffer on which messages will be queued. When
469 /// the internal buffer becomes full, future sends will *block* waiting for the
470 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
471 /// becomes "rendezvous channel" where each send will not return until a recv
472 /// is paired with it.
474 /// As with asynchronous channels, all senders will panic in `send` if the
475 /// `Receiver` has been destroyed.
480 /// use std::sync::mpsc::sync_channel;
483 /// let (tx, rx) = sync_channel(1);
485 /// // this returns immediately
486 /// tx.send(1).unwrap();
488 /// thread::spawn(move|| {
489 /// // this will block until the previous message has been received
490 /// tx.send(2).unwrap();
493 /// assert_eq!(rx.recv().unwrap(), 1);
494 /// assert_eq!(rx.recv().unwrap(), 2);
496 #[stable(feature = "rust1", since = "1.0.0")]
497 pub fn sync_channel
<T
>(bound
: usize) -> (SyncSender
<T
>, Receiver
<T
>) {
498 let a
= Arc
::new(UnsafeCell
::new(sync
::Packet
::new(bound
)));
499 (SyncSender
::new(a
.clone()), Receiver
::new(Flavor
::Sync(a
)))
502 ////////////////////////////////////////////////////////////////////////////////
504 ////////////////////////////////////////////////////////////////////////////////
507 fn new(inner
: Flavor
<T
>) -> Sender
<T
> {
509 inner
: UnsafeCell
::new(inner
),
513 /// Attempts to send a value on this channel, returning it back if it could
516 /// A successful send occurs when it is determined that the other end of
517 /// the channel has not hung up already. An unsuccessful send would be one
518 /// where the corresponding receiver has already been deallocated. Note
519 /// that a return value of `Err` means that the data will never be
520 /// received, but a return value of `Ok` does *not* mean that the data
521 /// will be received. It is possible for the corresponding receiver to
522 /// hang up immediately after this function returns `Ok`.
524 /// This method will never block the current thread.
529 /// use std::sync::mpsc::channel;
531 /// let (tx, rx) = channel();
533 /// // This send is always successful
534 /// tx.send(1).unwrap();
536 /// // This send will fail because the receiver is gone
538 /// assert_eq!(tx.send(1).err().unwrap().0, 1);
540 #[stable(feature = "rust1", since = "1.0.0")]
541 pub fn send(&self, t
: T
) -> Result
<(), SendError
<T
>> {
542 let (new_inner
, ret
) = match *unsafe { self.inner() }
{
543 Flavor
::Oneshot(ref p
) => {
547 return (*p
).send(t
).map_err(SendError
);
550 Arc
::new(UnsafeCell
::new(stream
::Packet
::new()));
551 let rx
= Receiver
::new(Flavor
::Stream(a
.clone()));
552 match (*p
).upgrade(rx
) {
553 oneshot
::UpSuccess
=> {
554 let ret
= (*a
.get()).send(t
);
557 oneshot
::UpDisconnected
=> (a
, Err(t
)),
558 oneshot
::UpWoke(token
) => {
559 // This send cannot panic because the thread is
560 // asleep (we're looking at it), so the receiver
562 (*a
.get()).send(t
).ok().unwrap();
570 Flavor
::Stream(ref p
) => return unsafe {
571 (*p
.get()).send(t
).map_err(SendError
)
573 Flavor
::Shared(ref p
) => return unsafe {
574 (*p
.get()).send(t
).map_err(SendError
)
576 Flavor
::Sync(..) => unreachable
!(),
580 let tmp
= Sender
::new(Flavor
::Stream(new_inner
));
581 mem
::swap(self.inner_mut(), tmp
.inner_mut());
583 ret
.map_err(SendError
)
587 #[stable(feature = "rust1", since = "1.0.0")]
588 impl<T
> Clone
for Sender
<T
> {
589 fn clone(&self) -> Sender
<T
> {
590 let (packet
, sleeper
, guard
) = match *unsafe { self.inner() }
{
591 Flavor
::Oneshot(ref p
) => {
592 let a
= Arc
::new(UnsafeCell
::new(shared
::Packet
::new()));
594 let guard
= (*a
.get()).postinit_lock();
595 let rx
= Receiver
::new(Flavor
::Shared(a
.clone()));
596 match (*p
.get()).upgrade(rx
) {
598 oneshot
::UpDisconnected
=> (a
, None
, guard
),
599 oneshot
::UpWoke(task
) => (a
, Some(task
), guard
)
603 Flavor
::Stream(ref p
) => {
604 let a
= Arc
::new(UnsafeCell
::new(shared
::Packet
::new()));
606 let guard
= (*a
.get()).postinit_lock();
607 let rx
= Receiver
::new(Flavor
::Shared(a
.clone()));
608 match (*p
.get()).upgrade(rx
) {
610 stream
::UpDisconnected
=> (a
, None
, guard
),
611 stream
::UpWoke(task
) => (a
, Some(task
), guard
),
615 Flavor
::Shared(ref p
) => {
616 unsafe { (*p.get()).clone_chan(); }
617 return Sender
::new(Flavor
::Shared(p
.clone()));
619 Flavor
::Sync(..) => unreachable
!(),
623 (*packet
.get()).inherit_blocker(sleeper
, guard
);
625 let tmp
= Sender
::new(Flavor
::Shared(packet
.clone()));
626 mem
::swap(self.inner_mut(), tmp
.inner_mut());
628 Sender
::new(Flavor
::Shared(packet
))
632 #[stable(feature = "rust1", since = "1.0.0")]
633 impl<T
> Drop
for Sender
<T
> {
635 match *unsafe { self.inner_mut() }
{
636 Flavor
::Oneshot(ref mut p
) => unsafe { (*p.get()).drop_chan(); }
,
637 Flavor
::Stream(ref mut p
) => unsafe { (*p.get()).drop_chan(); }
,
638 Flavor
::Shared(ref mut p
) => unsafe { (*p.get()).drop_chan(); }
,
639 Flavor
::Sync(..) => unreachable
!(),
644 #[stable(feature = "mpsc_debug", since = "1.7.0")]
645 impl<T
> fmt
::Debug
for Sender
<T
> {
646 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
647 write
!(f
, "Sender {{ .. }}")
651 ////////////////////////////////////////////////////////////////////////////////
653 ////////////////////////////////////////////////////////////////////////////////
655 impl<T
> SyncSender
<T
> {
656 fn new(inner
: Arc
<UnsafeCell
<sync
::Packet
<T
>>>) -> SyncSender
<T
> {
657 SyncSender { inner: inner }
660 /// Sends a value on this synchronous channel.
662 /// This function will *block* until space in the internal buffer becomes
663 /// available or a receiver is available to hand off the message to.
665 /// Note that a successful send does *not* guarantee that the receiver will
666 /// ever see the data if there is a buffer on this channel. Items may be
667 /// enqueued in the internal buffer for the receiver to receive at a later
668 /// time. If the buffer size is 0, however, it can be guaranteed that the
669 /// receiver has indeed received the data if this function returns success.
671 /// This function will never panic, but it may return `Err` if the
672 /// `Receiver` has disconnected and is no longer able to receive
674 #[stable(feature = "rust1", since = "1.0.0")]
675 pub fn send(&self, t
: T
) -> Result
<(), SendError
<T
>> {
676 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
679 /// Attempts to send a value on this channel without blocking.
681 /// This method differs from `send` by returning immediately if the
682 /// channel's buffer is full or no receiver is waiting to acquire some
683 /// data. Compared with `send`, this function has two failure cases
684 /// instead of one (one for disconnection, one for a full buffer).
686 /// See `SyncSender::send` for notes about guarantees of whether the
687 /// receiver has received the data or not if this function is successful.
688 #[stable(feature = "rust1", since = "1.0.0")]
689 pub fn try_send(&self, t
: T
) -> Result
<(), TrySendError
<T
>> {
690 unsafe { (*self.inner.get()).try_send(t) }
694 #[stable(feature = "rust1", since = "1.0.0")]
695 impl<T
> Clone
for SyncSender
<T
> {
696 fn clone(&self) -> SyncSender
<T
> {
697 unsafe { (*self.inner.get()).clone_chan(); }
698 SyncSender
::new(self.inner
.clone())
702 #[stable(feature = "rust1", since = "1.0.0")]
703 impl<T
> Drop
for SyncSender
<T
> {
705 unsafe { (*self.inner.get()).drop_chan(); }
709 #[stable(feature = "mpsc_debug", since = "1.7.0")]
710 impl<T
> fmt
::Debug
for SyncSender
<T
> {
711 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
712 write
!(f
, "SyncSender {{ .. }}")
716 ////////////////////////////////////////////////////////////////////////////////
718 ////////////////////////////////////////////////////////////////////////////////
720 impl<T
> Receiver
<T
> {
721 fn new(inner
: Flavor
<T
>) -> Receiver
<T
> {
722 Receiver { inner: UnsafeCell::new(inner) }
725 /// Attempts to return a pending value on this receiver without blocking
727 /// This method will never block the caller in order to wait for data to
728 /// become available. Instead, this will always return immediately with a
729 /// possible option of pending data on the channel.
731 /// This is useful for a flavor of "optimistic check" before deciding to
732 /// block on a receiver.
733 #[stable(feature = "rust1", since = "1.0.0")]
734 pub fn try_recv(&self) -> Result
<T
, TryRecvError
> {
736 let new_port
= match *unsafe { self.inner() }
{
737 Flavor
::Oneshot(ref p
) => {
738 match unsafe { (*p.get()).try_recv() }
{
739 Ok(t
) => return Ok(t
),
740 Err(oneshot
::Empty
) => return Err(TryRecvError
::Empty
),
741 Err(oneshot
::Disconnected
) => {
742 return Err(TryRecvError
::Disconnected
)
744 Err(oneshot
::Upgraded(rx
)) => rx
,
747 Flavor
::Stream(ref p
) => {
748 match unsafe { (*p.get()).try_recv() }
{
749 Ok(t
) => return Ok(t
),
750 Err(stream
::Empty
) => return Err(TryRecvError
::Empty
),
751 Err(stream
::Disconnected
) => {
752 return Err(TryRecvError
::Disconnected
)
754 Err(stream
::Upgraded(rx
)) => rx
,
757 Flavor
::Shared(ref p
) => {
758 match unsafe { (*p.get()).try_recv() }
{
759 Ok(t
) => return Ok(t
),
760 Err(shared
::Empty
) => return Err(TryRecvError
::Empty
),
761 Err(shared
::Disconnected
) => {
762 return Err(TryRecvError
::Disconnected
)
766 Flavor
::Sync(ref p
) => {
767 match unsafe { (*p.get()).try_recv() }
{
768 Ok(t
) => return Ok(t
),
769 Err(sync
::Empty
) => return Err(TryRecvError
::Empty
),
770 Err(sync
::Disconnected
) => {
771 return Err(TryRecvError
::Disconnected
)
777 mem
::swap(self.inner_mut(),
778 new_port
.inner_mut());
783 /// Attempts to wait for a value on this receiver, returning an error if the
784 /// corresponding channel has hung up.
786 /// This function will always block the current thread if there is no data
787 /// available and it's possible for more data to be sent. Once a message is
788 /// sent to the corresponding `Sender`, then this receiver will wake up and
789 /// return that message.
791 /// If the corresponding `Sender` has disconnected, or it disconnects while
792 /// this call is blocking, this call will wake up and return `Err` to
793 /// indicate that no more messages can ever be received on this channel.
794 /// However, since channels are buffered, messages sent before the disconnect
795 /// will still be properly received.
800 /// use std::sync::mpsc;
803 /// let (send, recv) = mpsc::channel();
804 /// let handle = thread::spawn(move || {
805 /// send.send(1u8).unwrap();
808 /// handle.join().unwrap();
810 /// assert_eq!(Ok(1), recv.recv());
813 /// Buffering behavior:
816 /// use std::sync::mpsc;
818 /// use std::sync::mpsc::RecvError;
820 /// let (send, recv) = mpsc::channel();
821 /// let handle = thread::spawn(move || {
822 /// send.send(1u8).unwrap();
823 /// send.send(2).unwrap();
824 /// send.send(3).unwrap();
828 /// // wait for the thread to join so we ensure the sender is dropped
829 /// handle.join().unwrap();
831 /// assert_eq!(Ok(1), recv.recv());
832 /// assert_eq!(Ok(2), recv.recv());
833 /// assert_eq!(Ok(3), recv.recv());
834 /// assert_eq!(Err(RecvError), recv.recv());
836 #[stable(feature = "rust1", since = "1.0.0")]
837 pub fn recv(&self) -> Result
<T
, RecvError
> {
839 let new_port
= match *unsafe { self.inner() }
{
840 Flavor
::Oneshot(ref p
) => {
841 match unsafe { (*p.get()).recv() }
{
842 Ok(t
) => return Ok(t
),
843 Err(oneshot
::Empty
) => return unreachable
!(),
844 Err(oneshot
::Disconnected
) => return Err(RecvError
),
845 Err(oneshot
::Upgraded(rx
)) => rx
,
848 Flavor
::Stream(ref p
) => {
849 match unsafe { (*p.get()).recv() }
{
850 Ok(t
) => return Ok(t
),
851 Err(stream
::Empty
) => return unreachable
!(),
852 Err(stream
::Disconnected
) => return Err(RecvError
),
853 Err(stream
::Upgraded(rx
)) => rx
,
856 Flavor
::Shared(ref p
) => {
857 match unsafe { (*p.get()).recv() }
{
858 Ok(t
) => return Ok(t
),
859 Err(shared
::Empty
) => return unreachable
!(),
860 Err(shared
::Disconnected
) => return Err(RecvError
),
863 Flavor
::Sync(ref p
) => return unsafe {
864 (*p
.get()).recv().map_err(|()| RecvError
)
868 mem
::swap(self.inner_mut(), new_port
.inner_mut());
873 /// Returns an iterator that will block waiting for messages, but never
874 /// `panic!`. It will return `None` when the channel has hung up.
875 #[stable(feature = "rust1", since = "1.0.0")]
876 pub fn iter(&self) -> Iter
<T
> {
881 impl<T
> select
::Packet
for Receiver
<T
> {
882 fn can_recv(&self) -> bool
{
884 let new_port
= match *unsafe { self.inner() }
{
885 Flavor
::Oneshot(ref p
) => {
886 match unsafe { (*p.get()).can_recv() }
{
887 Ok(ret
) => return ret
,
888 Err(upgrade
) => upgrade
,
891 Flavor
::Stream(ref p
) => {
892 match unsafe { (*p.get()).can_recv() }
{
893 Ok(ret
) => return ret
,
894 Err(upgrade
) => upgrade
,
897 Flavor
::Shared(ref p
) => {
898 return unsafe { (*p.get()).can_recv() }
;
900 Flavor
::Sync(ref p
) => {
901 return unsafe { (*p.get()).can_recv() }
;
905 mem
::swap(self.inner_mut(),
906 new_port
.inner_mut());
911 fn start_selection(&self, mut token
: SignalToken
) -> StartResult
{
913 let (t
, new_port
) = match *unsafe { self.inner() }
{
914 Flavor
::Oneshot(ref p
) => {
915 match unsafe { (*p.get()).start_selection(token) }
{
916 oneshot
::SelSuccess
=> return Installed
,
917 oneshot
::SelCanceled
=> return Abort
,
918 oneshot
::SelUpgraded(t
, rx
) => (t
, rx
),
921 Flavor
::Stream(ref p
) => {
922 match unsafe { (*p.get()).start_selection(token) }
{
923 stream
::SelSuccess
=> return Installed
,
924 stream
::SelCanceled
=> return Abort
,
925 stream
::SelUpgraded(t
, rx
) => (t
, rx
),
928 Flavor
::Shared(ref p
) => {
929 return unsafe { (*p.get()).start_selection(token) }
;
931 Flavor
::Sync(ref p
) => {
932 return unsafe { (*p.get()).start_selection(token) }
;
937 mem
::swap(self.inner_mut(), new_port
.inner_mut());
942 fn abort_selection(&self) -> bool
{
943 let mut was_upgrade
= false;
945 let result
= match *unsafe { self.inner() }
{
946 Flavor
::Oneshot(ref p
) => unsafe { (*p.get()).abort_selection() }
,
947 Flavor
::Stream(ref p
) => unsafe {
948 (*p
.get()).abort_selection(was_upgrade
)
950 Flavor
::Shared(ref p
) => return unsafe {
951 (*p
.get()).abort_selection(was_upgrade
)
953 Flavor
::Sync(ref p
) => return unsafe {
954 (*p
.get()).abort_selection()
957 let new_port
= match result { Ok(b) => return b, Err(p) => p }
;
960 mem
::swap(self.inner_mut(),
961 new_port
.inner_mut());
967 #[stable(feature = "rust1", since = "1.0.0")]
968 impl<'a
, T
> Iterator
for Iter
<'a
, T
> {
971 fn next(&mut self) -> Option
<T
> { self.rx.recv().ok() }
974 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
975 impl<'a
, T
> IntoIterator
for &'a Receiver
<T
> {
977 type IntoIter
= Iter
<'a
, T
>;
979 fn into_iter(self) -> Iter
<'a
, T
> { self.iter() }
982 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
983 impl<T
> Iterator
for IntoIter
<T
> {
985 fn next(&mut self) -> Option
<T
> { self.rx.recv().ok() }
988 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
989 impl <T
> IntoIterator
for Receiver
<T
> {
991 type IntoIter
= IntoIter
<T
>;
993 fn into_iter(self) -> IntoIter
<T
> {
994 IntoIter { rx: self }
998 #[stable(feature = "rust1", since = "1.0.0")]
999 impl<T
> Drop
for Receiver
<T
> {
1000 fn drop(&mut self) {
1001 match *unsafe { self.inner_mut() }
{
1002 Flavor
::Oneshot(ref mut p
) => unsafe { (*p.get()).drop_port(); }
,
1003 Flavor
::Stream(ref mut p
) => unsafe { (*p.get()).drop_port(); }
,
1004 Flavor
::Shared(ref mut p
) => unsafe { (*p.get()).drop_port(); }
,
1005 Flavor
::Sync(ref mut p
) => unsafe { (*p.get()).drop_port(); }
,
1010 #[stable(feature = "mpsc_debug", since = "1.7.0")]
1011 impl<T
> fmt
::Debug
for Receiver
<T
> {
1012 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1013 write
!(f
, "Receiver {{ .. }}")
1017 #[stable(feature = "rust1", since = "1.0.0")]
1018 impl<T
> fmt
::Debug
for SendError
<T
> {
1019 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1020 "SendError(..)".fmt(f
)
1024 #[stable(feature = "rust1", since = "1.0.0")]
1025 impl<T
> fmt
::Display
for SendError
<T
> {
1026 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1027 "sending on a closed channel".fmt(f
)
1031 #[stable(feature = "rust1", since = "1.0.0")]
1032 impl<T
: Send
+ Reflect
> error
::Error
for SendError
<T
> {
1033 fn description(&self) -> &str {
1034 "sending on a closed channel"
1037 fn cause(&self) -> Option
<&error
::Error
> {
1042 #[stable(feature = "rust1", since = "1.0.0")]
1043 impl<T
> fmt
::Debug
for TrySendError
<T
> {
1044 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1046 TrySendError
::Full(..) => "Full(..)".fmt(f
),
1047 TrySendError
::Disconnected(..) => "Disconnected(..)".fmt(f
),
1052 #[stable(feature = "rust1", since = "1.0.0")]
1053 impl<T
> fmt
::Display
for TrySendError
<T
> {
1054 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1056 TrySendError
::Full(..) => {
1057 "sending on a full channel".fmt(f
)
1059 TrySendError
::Disconnected(..) => {
1060 "sending on a closed channel".fmt(f
)
1066 #[stable(feature = "rust1", since = "1.0.0")]
1067 impl<T
: Send
+ Reflect
> error
::Error
for TrySendError
<T
> {
1069 fn description(&self) -> &str {
1071 TrySendError
::Full(..) => {
1072 "sending on a full channel"
1074 TrySendError
::Disconnected(..) => {
1075 "sending on a closed channel"
1080 fn cause(&self) -> Option
<&error
::Error
> {
1085 #[stable(feature = "rust1", since = "1.0.0")]
1086 impl fmt
::Display
for RecvError
{
1087 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1088 "receiving on a closed channel".fmt(f
)
1092 #[stable(feature = "rust1", since = "1.0.0")]
1093 impl error
::Error
for RecvError
{
1095 fn description(&self) -> &str {
1096 "receiving on a closed channel"
1099 fn cause(&self) -> Option
<&error
::Error
> {
1104 #[stable(feature = "rust1", since = "1.0.0")]
1105 impl fmt
::Display
for TryRecvError
{
1106 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1108 TryRecvError
::Empty
=> {
1109 "receiving on an empty channel".fmt(f
)
1111 TryRecvError
::Disconnected
=> {
1112 "receiving on a closed channel".fmt(f
)
1118 #[stable(feature = "rust1", since = "1.0.0")]
1119 impl error
::Error
for TryRecvError
{
1121 fn description(&self) -> &str {
1123 TryRecvError
::Empty
=> {
1124 "receiving on an empty channel"
1126 TryRecvError
::Disconnected
=> {
1127 "receiving on a closed channel"
1132 fn cause(&self) -> Option
<&error
::Error
> {
1145 pub fn stress_factor() -> usize {
1146 match env
::var("RUST_TEST_STRESS") {
1147 Ok(val
) => val
.parse().unwrap(),
1154 let (tx
, rx
) = channel
::<i32>();
1155 tx
.send(1).unwrap();
1156 assert_eq
!(rx
.recv().unwrap(), 1);
1161 let (tx
, _rx
) = channel
::<Box
<isize>>();
1162 tx
.send(box 1).unwrap();
1166 fn drop_full_shared() {
1167 let (tx
, _rx
) = channel
::<Box
<isize>>();
1170 tx
.send(box 1).unwrap();
1175 let (tx
, rx
) = channel
::<i32>();
1176 tx
.send(1).unwrap();
1177 assert_eq
!(rx
.recv().unwrap(), 1);
1178 let tx
= tx
.clone();
1179 tx
.send(1).unwrap();
1180 assert_eq
!(rx
.recv().unwrap(), 1);
1184 fn smoke_threads() {
1185 let (tx
, rx
) = channel
::<i32>();
1186 let _t
= thread
::spawn(move|| {
1187 tx
.send(1).unwrap();
1189 assert_eq
!(rx
.recv().unwrap(), 1);
1193 fn smoke_port_gone() {
1194 let (tx
, rx
) = channel
::<i32>();
1196 assert
!(tx
.send(1).is_err());
1200 fn smoke_shared_port_gone() {
1201 let (tx
, rx
) = channel
::<i32>();
1203 assert
!(tx
.send(1).is_err())
1207 fn smoke_shared_port_gone2() {
1208 let (tx
, rx
) = channel
::<i32>();
1210 let tx2
= tx
.clone();
1212 assert
!(tx2
.send(1).is_err());
1216 fn port_gone_concurrent() {
1217 let (tx
, rx
) = channel
::<i32>();
1218 let _t
= thread
::spawn(move|| {
1221 while tx
.send(1).is_ok() {}
1225 fn port_gone_concurrent_shared() {
1226 let (tx
, rx
) = channel
::<i32>();
1227 let tx2
= tx
.clone();
1228 let _t
= thread
::spawn(move|| {
1231 while tx
.send(1).is_ok() && tx2
.send(1).is_ok() {}
1235 fn smoke_chan_gone() {
1236 let (tx
, rx
) = channel
::<i32>();
1238 assert
!(rx
.recv().is_err());
1242 fn smoke_chan_gone_shared() {
1243 let (tx
, rx
) = channel
::<()>();
1244 let tx2
= tx
.clone();
1247 assert
!(rx
.recv().is_err());
1251 fn chan_gone_concurrent() {
1252 let (tx
, rx
) = channel
::<i32>();
1253 let _t
= thread
::spawn(move|| {
1254 tx
.send(1).unwrap();
1255 tx
.send(1).unwrap();
1257 while rx
.recv().is_ok() {}
1262 let (tx
, rx
) = channel
::<i32>();
1263 let t
= thread
::spawn(move|| {
1264 for _
in 0..10000 { tx.send(1).unwrap(); }
1267 assert_eq
!(rx
.recv().unwrap(), 1);
1269 t
.join().ok().unwrap();
1273 fn stress_shared() {
1274 const AMT
: u32 = 10000;
1275 const NTHREADS
: u32 = 8;
1276 let (tx
, rx
) = channel
::<i32>();
1278 let t
= thread
::spawn(move|| {
1279 for _
in 0..AMT
* NTHREADS
{
1280 assert_eq
!(rx
.recv().unwrap(), 1);
1282 match rx
.try_recv() {
1288 for _
in 0..NTHREADS
{
1289 let tx
= tx
.clone();
1290 thread
::spawn(move|| {
1291 for _
in 0..AMT { tx.send(1).unwrap(); }
1295 t
.join().ok().unwrap();
1299 fn send_from_outside_runtime() {
1300 let (tx1
, rx1
) = channel
::<()>();
1301 let (tx2
, rx2
) = channel
::<i32>();
1302 let t1
= thread
::spawn(move|| {
1303 tx1
.send(()).unwrap();
1305 assert_eq
!(rx2
.recv().unwrap(), 1);
1308 rx1
.recv().unwrap();
1309 let t2
= thread
::spawn(move|| {
1311 tx2
.send(1).unwrap();
1314 t1
.join().ok().unwrap();
1315 t2
.join().ok().unwrap();
1319 fn recv_from_outside_runtime() {
1320 let (tx
, rx
) = channel
::<i32>();
1321 let t
= thread
::spawn(move|| {
1323 assert_eq
!(rx
.recv().unwrap(), 1);
1327 tx
.send(1).unwrap();
1329 t
.join().ok().unwrap();
1334 let (tx1
, rx1
) = channel
::<i32>();
1335 let (tx2
, rx2
) = channel
::<i32>();
1336 let t1
= thread
::spawn(move|| {
1337 assert_eq
!(rx1
.recv().unwrap(), 1);
1338 tx2
.send(2).unwrap();
1340 let t2
= thread
::spawn(move|| {
1341 tx1
.send(1).unwrap();
1342 assert_eq
!(rx2
.recv().unwrap(), 2);
1344 t1
.join().ok().unwrap();
1345 t2
.join().ok().unwrap();
1349 fn oneshot_single_thread_close_port_first() {
1350 // Simple test of closing without sending
1351 let (_tx
, rx
) = channel
::<i32>();
1356 fn oneshot_single_thread_close_chan_first() {
1357 // Simple test of closing without sending
1358 let (tx
, _rx
) = channel
::<i32>();
1363 fn oneshot_single_thread_send_port_close() {
1364 // Testing that the sender cleans up the payload if receiver is closed
1365 let (tx
, rx
) = channel
::<Box
<i32>>();
1367 assert
!(tx
.send(box 0).is_err());
1371 fn oneshot_single_thread_recv_chan_close() {
1372 // Receiving on a closed chan will panic
1373 let res
= thread
::spawn(move|| {
1374 let (tx
, rx
) = channel
::<i32>();
1379 assert
!(res
.is_err());
1383 fn oneshot_single_thread_send_then_recv() {
1384 let (tx
, rx
) = channel
::<Box
<i32>>();
1385 tx
.send(box 10).unwrap();
1386 assert
!(rx
.recv().unwrap() == box 10);
1390 fn oneshot_single_thread_try_send_open() {
1391 let (tx
, rx
) = channel
::<i32>();
1392 assert
!(tx
.send(10).is_ok());
1393 assert
!(rx
.recv().unwrap() == 10);
1397 fn oneshot_single_thread_try_send_closed() {
1398 let (tx
, rx
) = channel
::<i32>();
1400 assert
!(tx
.send(10).is_err());
1404 fn oneshot_single_thread_try_recv_open() {
1405 let (tx
, rx
) = channel
::<i32>();
1406 tx
.send(10).unwrap();
1407 assert
!(rx
.recv() == Ok(10));
1411 fn oneshot_single_thread_try_recv_closed() {
1412 let (tx
, rx
) = channel
::<i32>();
1414 assert
!(rx
.recv().is_err());
1418 fn oneshot_single_thread_peek_data() {
1419 let (tx
, rx
) = channel
::<i32>();
1420 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
1421 tx
.send(10).unwrap();
1422 assert_eq
!(rx
.try_recv(), Ok(10));
1426 fn oneshot_single_thread_peek_close() {
1427 let (tx
, rx
) = channel
::<i32>();
1429 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
1430 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
1434 fn oneshot_single_thread_peek_open() {
1435 let (_tx
, rx
) = channel
::<i32>();
1436 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
1440 fn oneshot_multi_task_recv_then_send() {
1441 let (tx
, rx
) = channel
::<Box
<i32>>();
1442 let _t
= thread
::spawn(move|| {
1443 assert
!(rx
.recv().unwrap() == box 10);
1446 tx
.send(box 10).unwrap();
1450 fn oneshot_multi_task_recv_then_close() {
1451 let (tx
, rx
) = channel
::<Box
<i32>>();
1452 let _t
= thread
::spawn(move|| {
1455 let res
= thread
::spawn(move|| {
1456 assert
!(rx
.recv().unwrap() == box 10);
1458 assert
!(res
.is_err());
1462 fn oneshot_multi_thread_close_stress() {
1463 for _
in 0..stress_factor() {
1464 let (tx
, rx
) = channel
::<i32>();
1465 let _t
= thread
::spawn(move|| {
1473 fn oneshot_multi_thread_send_close_stress() {
1474 for _
in 0..stress_factor() {
1475 let (tx
, rx
) = channel
::<i32>();
1476 let _t
= thread
::spawn(move|| {
1479 let _
= thread
::spawn(move|| {
1480 tx
.send(1).unwrap();
1486 fn oneshot_multi_thread_recv_close_stress() {
1487 for _
in 0..stress_factor() {
1488 let (tx
, rx
) = channel
::<i32>();
1489 thread
::spawn(move|| {
1490 let res
= thread
::spawn(move|| {
1493 assert
!(res
.is_err());
1495 let _t
= thread
::spawn(move|| {
1496 thread
::spawn(move|| {
1504 fn oneshot_multi_thread_send_recv_stress() {
1505 for _
in 0..stress_factor() {
1506 let (tx
, rx
) = channel
::<Box
<isize>>();
1507 let _t
= thread
::spawn(move|| {
1508 tx
.send(box 10).unwrap();
1510 assert
!(rx
.recv().unwrap() == box 10);
1515 fn stream_send_recv_stress() {
1516 for _
in 0..stress_factor() {
1517 let (tx
, rx
) = channel();
1522 fn send(tx
: Sender
<Box
<i32>>, i
: i32) {
1523 if i
== 10 { return }
1525 thread
::spawn(move|| {
1526 tx
.send(box i
).unwrap();
1531 fn recv(rx
: Receiver
<Box
<i32>>, i
: i32) {
1532 if i
== 10 { return }
1534 thread
::spawn(move|| {
1535 assert
!(rx
.recv().unwrap() == box i
);
1544 // Regression test that we don't run out of stack in scheduler context
1545 let (tx
, rx
) = channel();
1546 for _
in 0..10000 { tx.send(()).unwrap(); }
1547 for _
in 0..10000 { rx.recv().unwrap(); }
1551 fn shared_chan_stress() {
1552 let (tx
, rx
) = channel();
1553 let total
= stress_factor() + 100;
1555 let tx
= tx
.clone();
1556 thread
::spawn(move|| {
1557 tx
.send(()).unwrap();
1567 fn test_nested_recv_iter() {
1568 let (tx
, rx
) = channel
::<i32>();
1569 let (total_tx
, total_rx
) = channel
::<i32>();
1571 let _t
= thread
::spawn(move|| {
1573 for x
in rx
.iter() {
1576 total_tx
.send(acc
).unwrap();
1579 tx
.send(3).unwrap();
1580 tx
.send(1).unwrap();
1581 tx
.send(2).unwrap();
1583 assert_eq
!(total_rx
.recv().unwrap(), 6);
1587 fn test_recv_iter_break() {
1588 let (tx
, rx
) = channel
::<i32>();
1589 let (count_tx
, count_rx
) = channel();
1591 let _t
= thread
::spawn(move|| {
1593 for x
in rx
.iter() {
1600 count_tx
.send(count
).unwrap();
1603 tx
.send(2).unwrap();
1604 tx
.send(2).unwrap();
1605 tx
.send(2).unwrap();
1608 assert_eq
!(count_rx
.recv().unwrap(), 4);
1612 fn test_recv_into_iter_owned() {
1614 let (tx
, rx
) = channel
::<i32>();
1615 tx
.send(1).unwrap();
1616 tx
.send(2).unwrap();
1620 assert_eq
!(iter
.next().unwrap(), 1);
1621 assert_eq
!(iter
.next().unwrap(), 2);
1622 assert_eq
!(iter
.next().is_none(), true);
1626 fn test_recv_into_iter_borrowed() {
1627 let (tx
, rx
) = channel
::<i32>();
1628 tx
.send(1).unwrap();
1629 tx
.send(2).unwrap();
1631 let mut iter
= (&rx
).into_iter();
1632 assert_eq
!(iter
.next().unwrap(), 1);
1633 assert_eq
!(iter
.next().unwrap(), 2);
1634 assert_eq
!(iter
.next().is_none(), true);
1638 fn try_recv_states() {
1639 let (tx1
, rx1
) = channel
::<i32>();
1640 let (tx2
, rx2
) = channel
::<()>();
1641 let (tx3
, rx3
) = channel
::<()>();
1642 let _t
= thread
::spawn(move|| {
1643 rx2
.recv().unwrap();
1644 tx1
.send(1).unwrap();
1645 tx3
.send(()).unwrap();
1646 rx2
.recv().unwrap();
1648 tx3
.send(()).unwrap();
1651 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
1652 tx2
.send(()).unwrap();
1653 rx3
.recv().unwrap();
1654 assert_eq
!(rx1
.try_recv(), Ok(1));
1655 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
1656 tx2
.send(()).unwrap();
1657 rx3
.recv().unwrap();
1658 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Disconnected
));
1661 // This bug used to end up in a livelock inside of the Receiver destructor
1662 // because the internal state of the Shared packet was corrupted
1664 fn destroy_upgraded_shared_port_when_sender_still_active() {
1665 let (tx
, rx
) = channel();
1666 let (tx2
, rx2
) = channel();
1667 let _t
= thread
::spawn(move|| {
1668 rx
.recv().unwrap(); // wait on a oneshot
1669 drop(rx
); // destroy a shared
1670 tx2
.send(()).unwrap();
1672 // make sure the other thread has gone to sleep
1673 for _
in 0..5000 { thread::yield_now(); }
1675 // upgrade to a shared chan and send a message
1678 t
.send(()).unwrap();
1680 // wait for the child thread to exit before we exit
1681 rx2
.recv().unwrap();
1693 pub fn stress_factor() -> usize {
1694 match env
::var("RUST_TEST_STRESS") {
1695 Ok(val
) => val
.parse().unwrap(),
1702 let (tx
, rx
) = sync_channel
::<i32>(1);
1703 tx
.send(1).unwrap();
1704 assert_eq
!(rx
.recv().unwrap(), 1);
1709 let (tx
, _rx
) = sync_channel
::<Box
<isize>>(1);
1710 tx
.send(box 1).unwrap();
1715 let (tx
, rx
) = sync_channel
::<i32>(1);
1716 tx
.send(1).unwrap();
1717 assert_eq
!(rx
.recv().unwrap(), 1);
1718 let tx
= tx
.clone();
1719 tx
.send(1).unwrap();
1720 assert_eq
!(rx
.recv().unwrap(), 1);
1724 fn smoke_threads() {
1725 let (tx
, rx
) = sync_channel
::<i32>(0);
1726 let _t
= thread
::spawn(move|| {
1727 tx
.send(1).unwrap();
1729 assert_eq
!(rx
.recv().unwrap(), 1);
1733 fn smoke_port_gone() {
1734 let (tx
, rx
) = sync_channel
::<i32>(0);
1736 assert
!(tx
.send(1).is_err());
1740 fn smoke_shared_port_gone2() {
1741 let (tx
, rx
) = sync_channel
::<i32>(0);
1743 let tx2
= tx
.clone();
1745 assert
!(tx2
.send(1).is_err());
1749 fn port_gone_concurrent() {
1750 let (tx
, rx
) = sync_channel
::<i32>(0);
1751 let _t
= thread
::spawn(move|| {
1754 while tx
.send(1).is_ok() {}
1758 fn port_gone_concurrent_shared() {
1759 let (tx
, rx
) = sync_channel
::<i32>(0);
1760 let tx2
= tx
.clone();
1761 let _t
= thread
::spawn(move|| {
1764 while tx
.send(1).is_ok() && tx2
.send(1).is_ok() {}
1768 fn smoke_chan_gone() {
1769 let (tx
, rx
) = sync_channel
::<i32>(0);
1771 assert
!(rx
.recv().is_err());
1775 fn smoke_chan_gone_shared() {
1776 let (tx
, rx
) = sync_channel
::<()>(0);
1777 let tx2
= tx
.clone();
1780 assert
!(rx
.recv().is_err());
1784 fn chan_gone_concurrent() {
1785 let (tx
, rx
) = sync_channel
::<i32>(0);
1786 thread
::spawn(move|| {
1787 tx
.send(1).unwrap();
1788 tx
.send(1).unwrap();
1790 while rx
.recv().is_ok() {}
1795 let (tx
, rx
) = sync_channel
::<i32>(0);
1796 thread
::spawn(move|| {
1797 for _
in 0..10000 { tx.send(1).unwrap(); }
1800 assert_eq
!(rx
.recv().unwrap(), 1);
1805 fn stress_shared() {
1806 const AMT
: u32 = 1000;
1807 const NTHREADS
: u32 = 8;
1808 let (tx
, rx
) = sync_channel
::<i32>(0);
1809 let (dtx
, drx
) = sync_channel
::<()>(0);
1811 thread
::spawn(move|| {
1812 for _
in 0..AMT
* NTHREADS
{
1813 assert_eq
!(rx
.recv().unwrap(), 1);
1815 match rx
.try_recv() {
1819 dtx
.send(()).unwrap();
1822 for _
in 0..NTHREADS
{
1823 let tx
= tx
.clone();
1824 thread
::spawn(move|| {
1825 for _
in 0..AMT { tx.send(1).unwrap(); }
1829 drx
.recv().unwrap();
1833 fn oneshot_single_thread_close_port_first() {
1834 // Simple test of closing without sending
1835 let (_tx
, rx
) = sync_channel
::<i32>(0);
1840 fn oneshot_single_thread_close_chan_first() {
1841 // Simple test of closing without sending
1842 let (tx
, _rx
) = sync_channel
::<i32>(0);
1847 fn oneshot_single_thread_send_port_close() {
1848 // Testing that the sender cleans up the payload if receiver is closed
1849 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
1851 assert
!(tx
.send(box 0).is_err());
1855 fn oneshot_single_thread_recv_chan_close() {
1856 // Receiving on a closed chan will panic
1857 let res
= thread
::spawn(move|| {
1858 let (tx
, rx
) = sync_channel
::<i32>(0);
1863 assert
!(res
.is_err());
1867 fn oneshot_single_thread_send_then_recv() {
1868 let (tx
, rx
) = sync_channel
::<Box
<i32>>(1);
1869 tx
.send(box 10).unwrap();
1870 assert
!(rx
.recv().unwrap() == box 10);
1874 fn oneshot_single_thread_try_send_open() {
1875 let (tx
, rx
) = sync_channel
::<i32>(1);
1876 assert_eq
!(tx
.try_send(10), Ok(()));
1877 assert
!(rx
.recv().unwrap() == 10);
1881 fn oneshot_single_thread_try_send_closed() {
1882 let (tx
, rx
) = sync_channel
::<i32>(0);
1884 assert_eq
!(tx
.try_send(10), Err(TrySendError
::Disconnected(10)));
1888 fn oneshot_single_thread_try_send_closed2() {
1889 let (tx
, _rx
) = sync_channel
::<i32>(0);
1890 assert_eq
!(tx
.try_send(10), Err(TrySendError
::Full(10)));
1894 fn oneshot_single_thread_try_recv_open() {
1895 let (tx
, rx
) = sync_channel
::<i32>(1);
1896 tx
.send(10).unwrap();
1897 assert
!(rx
.recv() == Ok(10));
1901 fn oneshot_single_thread_try_recv_closed() {
1902 let (tx
, rx
) = sync_channel
::<i32>(0);
1904 assert
!(rx
.recv().is_err());
1908 fn oneshot_single_thread_peek_data() {
1909 let (tx
, rx
) = sync_channel
::<i32>(1);
1910 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
1911 tx
.send(10).unwrap();
1912 assert_eq
!(rx
.try_recv(), Ok(10));
1916 fn oneshot_single_thread_peek_close() {
1917 let (tx
, rx
) = sync_channel
::<i32>(0);
1919 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
1920 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
1924 fn oneshot_single_thread_peek_open() {
1925 let (_tx
, rx
) = sync_channel
::<i32>(0);
1926 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
1930 fn oneshot_multi_task_recv_then_send() {
1931 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
1932 let _t
= thread
::spawn(move|| {
1933 assert
!(rx
.recv().unwrap() == box 10);
1936 tx
.send(box 10).unwrap();
1940 fn oneshot_multi_task_recv_then_close() {
1941 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
1942 let _t
= thread
::spawn(move|| {
1945 let res
= thread
::spawn(move|| {
1946 assert
!(rx
.recv().unwrap() == box 10);
1948 assert
!(res
.is_err());
1952 fn oneshot_multi_thread_close_stress() {
1953 for _
in 0..stress_factor() {
1954 let (tx
, rx
) = sync_channel
::<i32>(0);
1955 let _t
= thread
::spawn(move|| {
1963 fn oneshot_multi_thread_send_close_stress() {
1964 for _
in 0..stress_factor() {
1965 let (tx
, rx
) = sync_channel
::<i32>(0);
1966 let _t
= thread
::spawn(move|| {
1969 let _
= thread
::spawn(move || {
1970 tx
.send(1).unwrap();
1976 fn oneshot_multi_thread_recv_close_stress() {
1977 for _
in 0..stress_factor() {
1978 let (tx
, rx
) = sync_channel
::<i32>(0);
1979 let _t
= thread
::spawn(move|| {
1980 let res
= thread
::spawn(move|| {
1983 assert
!(res
.is_err());
1985 let _t
= thread
::spawn(move|| {
1986 thread
::spawn(move|| {
1994 fn oneshot_multi_thread_send_recv_stress() {
1995 for _
in 0..stress_factor() {
1996 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
1997 let _t
= thread
::spawn(move|| {
1998 tx
.send(box 10).unwrap();
2000 assert
!(rx
.recv().unwrap() == box 10);
2005 fn stream_send_recv_stress() {
2006 for _
in 0..stress_factor() {
2007 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
2012 fn send(tx
: SyncSender
<Box
<i32>>, i
: i32) {
2013 if i
== 10 { return }
2015 thread
::spawn(move|| {
2016 tx
.send(box i
).unwrap();
2021 fn recv(rx
: Receiver
<Box
<i32>>, i
: i32) {
2022 if i
== 10 { return }
2024 thread
::spawn(move|| {
2025 assert
!(rx
.recv().unwrap() == box i
);
2034 // Regression test that we don't run out of stack in scheduler context
2035 let (tx
, rx
) = sync_channel(10000);
2036 for _
in 0..10000 { tx.send(()).unwrap(); }
2037 for _
in 0..10000 { rx.recv().unwrap(); }
2041 fn shared_chan_stress() {
2042 let (tx
, rx
) = sync_channel(0);
2043 let total
= stress_factor() + 100;
2045 let tx
= tx
.clone();
2046 thread
::spawn(move|| {
2047 tx
.send(()).unwrap();
2057 fn test_nested_recv_iter() {
2058 let (tx
, rx
) = sync_channel
::<i32>(0);
2059 let (total_tx
, total_rx
) = sync_channel
::<i32>(0);
2061 let _t
= thread
::spawn(move|| {
2063 for x
in rx
.iter() {
2066 total_tx
.send(acc
).unwrap();
2069 tx
.send(3).unwrap();
2070 tx
.send(1).unwrap();
2071 tx
.send(2).unwrap();
2073 assert_eq
!(total_rx
.recv().unwrap(), 6);
2077 fn test_recv_iter_break() {
2078 let (tx
, rx
) = sync_channel
::<i32>(0);
2079 let (count_tx
, count_rx
) = sync_channel(0);
2081 let _t
= thread
::spawn(move|| {
2083 for x
in rx
.iter() {
2090 count_tx
.send(count
).unwrap();
2093 tx
.send(2).unwrap();
2094 tx
.send(2).unwrap();
2095 tx
.send(2).unwrap();
2096 let _
= tx
.try_send(2);
2098 assert_eq
!(count_rx
.recv().unwrap(), 4);
2102 fn try_recv_states() {
2103 let (tx1
, rx1
) = sync_channel
::<i32>(1);
2104 let (tx2
, rx2
) = sync_channel
::<()>(1);
2105 let (tx3
, rx3
) = sync_channel
::<()>(1);
2106 let _t
= thread
::spawn(move|| {
2107 rx2
.recv().unwrap();
2108 tx1
.send(1).unwrap();
2109 tx3
.send(()).unwrap();
2110 rx2
.recv().unwrap();
2112 tx3
.send(()).unwrap();
2115 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
2116 tx2
.send(()).unwrap();
2117 rx3
.recv().unwrap();
2118 assert_eq
!(rx1
.try_recv(), Ok(1));
2119 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
2120 tx2
.send(()).unwrap();
2121 rx3
.recv().unwrap();
2122 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Disconnected
));
2125 // This bug used to end up in a livelock inside of the Receiver destructor
2126 // because the internal state of the Shared packet was corrupted
2128 fn destroy_upgraded_shared_port_when_sender_still_active() {
2129 let (tx
, rx
) = sync_channel
::<()>(0);
2130 let (tx2
, rx2
) = sync_channel
::<()>(0);
2131 let _t
= thread
::spawn(move|| {
2132 rx
.recv().unwrap(); // wait on a oneshot
2133 drop(rx
); // destroy a shared
2134 tx2
.send(()).unwrap();
2136 // make sure the other thread has gone to sleep
2137 for _
in 0..5000 { thread::yield_now(); }
2139 // upgrade to a shared chan and send a message
2142 t
.send(()).unwrap();
2144 // wait for the child thread to exit before we exit
2145 rx2
.recv().unwrap();
2150 let (tx
, rx
) = sync_channel
::<i32>(0);
2151 let _t
= thread
::spawn(move|| { rx.recv().unwrap(); }
);
2152 assert_eq
!(tx
.send(1), Ok(()));
2157 let (tx
, rx
) = sync_channel
::<i32>(0);
2158 let _t
= thread
::spawn(move|| { drop(rx); }
);
2159 assert
!(tx
.send(1).is_err());
2164 let (tx
, rx
) = sync_channel
::<i32>(1);
2165 assert_eq
!(tx
.send(1), Ok(()));
2166 let _t
=thread
::spawn(move|| { drop(rx); }
);
2167 assert
!(tx
.send(1).is_err());
2172 let (tx
, rx
) = sync_channel
::<i32>(0);
2173 let tx2
= tx
.clone();
2174 let (done
, donerx
) = channel();
2175 let done2
= done
.clone();
2176 let _t
= thread
::spawn(move|| {
2177 assert
!(tx
.send(1).is_err());
2178 done
.send(()).unwrap();
2180 let _t
= thread
::spawn(move|| {
2181 assert
!(tx2
.send(2).is_err());
2182 done2
.send(()).unwrap();
2185 donerx
.recv().unwrap();
2186 donerx
.recv().unwrap();
2191 let (tx
, _rx
) = sync_channel
::<i32>(0);
2192 assert_eq
!(tx
.try_send(1), Err(TrySendError
::Full(1)));
2197 let (tx
, _rx
) = sync_channel
::<i32>(1);
2198 assert_eq
!(tx
.try_send(1), Ok(()));
2199 assert_eq
!(tx
.try_send(1), Err(TrySendError
::Full(1)));
2204 let (tx
, rx
) = sync_channel
::<i32>(1);
2205 assert_eq
!(tx
.try_send(1), Ok(()));
2207 assert_eq
!(tx
.try_send(1), Err(TrySendError
::Disconnected(1)));
2213 let (tx1
, rx1
) = sync_channel
::<()>(3);
2214 let (tx2
, rx2
) = sync_channel
::<()>(3);
2216 let _t
= thread
::spawn(move|| {
2217 rx1
.recv().unwrap();
2218 tx2
.try_send(()).unwrap();
2221 tx1
.try_send(()).unwrap();
2222 rx2
.recv().unwrap();
2231 fn fmt_debug_sender() {
2232 let (tx
, _
) = channel
::<i32>();
2233 assert_eq
!(format
!("{:?}", tx
), "Sender { .. }");
2237 fn fmt_debug_recv() {
2238 let (_
, rx
) = channel
::<i32>();
2239 assert_eq
!(format
!("{:?}", rx
), "Receiver { .. }");
2243 fn fmt_debug_sync_sender() {
2244 let (tx
, _
) = sync_channel
::<i32>(1);
2245 assert_eq
!(format
!("{:?}", tx
), "SyncSender { .. }");