1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
20 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
24 //! These channels come in two flavors:
26 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
31 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
32 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
33 //! messages is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
36 //! channel where each sender atomically hands off a message to a receiver.
38 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
39 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
40 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
41 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
42 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
43 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
47 //! The send and receive operations on channels will all return a [`Result`]
48 //! indicating whether the operation succeeded or not. An unsuccessful operation
49 //! is normally indicative of the other half of a channel having "hung up" by
50 //! being dropped in its corresponding thread.
52 //! Once half of a channel has been deallocated, most operations can no longer
53 //! continue to make progress, so [`Err`] will be returned. Many applications
54 //! will continue to [`unwrap`] the results returned from this module,
55 //! instigating a propagation of failure among threads if one unexpectedly dies.
57 //! [`Result`]: ../../../std/result/enum.Result.html
58 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
59 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
67 //! use std::sync::mpsc::channel;
69 //! // Create a simple streaming channel
70 //! let (tx, rx) = channel();
71 //! thread::spawn(move|| {
72 //! tx.send(10).unwrap();
74 //! assert_eq!(rx.recv().unwrap(), 10);
81 //! use std::sync::mpsc::channel;
83 //! // Create a shared channel that can be sent along from many threads
84 //! // where tx is the sending half (tx for transmission), and rx is the receiving
85 //! // half (rx for receiving).
86 //! let (tx, rx) = channel();
88 //! let tx = tx.clone();
89 //! thread::spawn(move|| {
90 //! tx.send(i).unwrap();
95 //! let j = rx.recv().unwrap();
96 //! assert!(0 <= j && j < 10);
100 //! Propagating panics:
103 //! use std::sync::mpsc::channel;
105 //! // The call to recv() will return an error because the channel has already
106 //! // hung up (or been deallocated)
107 //! let (tx, rx) = channel::<i32>();
109 //! assert!(rx.recv().is_err());
112 //! Synchronous channels:
116 //! use std::sync::mpsc::sync_channel;
118 //! let (tx, rx) = sync_channel::<i32>(0);
119 //! thread::spawn(move|| {
120 //! // This will wait for the parent thread to start receiving
121 //! tx.send(53).unwrap();
123 //! rx.recv().unwrap();
126 #![stable(feature = "rust1", since = "1.0.0")]
128 // A description of how Rust's channel implementation works
130 // Channels are supposed to be the basic building block for all other
131 // concurrent primitives that are used in Rust. As a result, the channel type
132 // needs to be highly optimized, flexible, and broad enough for use everywhere.
134 // The choice of implementation of all channels is to be built on lock-free data
135 // structures. The channels themselves are then consequently also lock-free data
136 // structures. As always with lock-free code, this is a very "here be dragons"
137 // territory, especially because I'm unaware of any academic papers that have
138 // gone into great length about channels of these flavors.
140 // ## Flavors of channels
142 // From the perspective of a consumer of this library, there is only one flavor
143 // of channel. This channel can be used as a stream and cloned to allow multiple
144 // senders. Under the hood, however, there are actually three flavors of
147 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
148 // case. They contain as few atomics as possible and
149 // involve one and exactly one allocation.
150 // * Streams - these channels are optimized for the non-shared use case. They
151 // use a different concurrent queue that is more tailored for this
152 // use case. The initial allocation of this flavor of channel is not
154 // * Shared - this is the most general form of channel that this module offers,
155 // a channel with multiple senders. This type is as optimized as it
156 // can be, but the previous two types mentioned are much faster for
159 // ## Concurrent queues
161 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
162 // but recv() obviously blocks. This means that under the hood there must be
163 // some shared and concurrent queue holding all of the actual data.
165 // With two flavors of channels, two flavors of queues are also used. We have
166 // chosen to use queues from a well-known author that are abbreviated as SPSC
167 // and MPSC (single producer, single consumer and multiple producer, single
168 // consumer). SPSC queues are used for streams while MPSC queues are used for
171 // ### SPSC optimizations
173 // The SPSC queue found online is essentially a linked list of nodes where one
174 // half of the nodes are the "queue of data" and the other half of nodes are a
175 // cache of unused nodes. The unused nodes are used such that an allocation is
176 // not required on every push() and a free doesn't need to happen on every
179 // As found online, however, the cache of nodes is of an infinite size. This
180 // means that if a channel at one point in its life had 50k items in the queue,
181 // then the queue will always have the capacity for 50k items. I believed that
182 // this was an unnecessary limitation of the implementation, so I have altered
183 // the queue to optionally have a bound on the cache size.
185 // By default, streams will have an unbounded SPSC queue with a small-ish cache
186 // size. The hope is that the cache is still large enough to have very fast
187 // send() operations while not too large such that millions of channels can
190 // ### MPSC optimizations
192 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
193 // a linked list under the hood to earn its unboundedness, but I have not put
194 // forth much effort into having a cache of nodes similar to the SPSC queue.
196 // For now, I believe that this is "ok" because shared channels are not the most
197 // common type, but soon we may wish to revisit this queue choice and determine
198 // another candidate for backend storage of shared channels.
200 // ## Overview of the Implementation
202 // Now that there's a little background on the concurrent queues used, it's
203 // worth going into much more detail about the channels themselves. The basic
204 // pseudocode for a send/recv are:
208 // queue.push(t) return if queue.pop()
209 // if increment() == -1 deschedule {
210 // wakeup() if decrement() > 0
211 // cancel_deschedule()
215 // As mentioned before, there are no locks in this implementation, only atomic
216 // instructions are used.
218 // ### The internal atomic counter
220 // Every channel has a shared counter with each half to keep track of the size
221 // of the queue. This counter is used to abort descheduling by the receiver and
222 // to know when to wake up on the sending side.
224 // As seen in the pseudocode, senders will increment this count and receivers
225 // will decrement the count. The theory behind this is that if a sender sees a
226 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
227 // then it doesn't need to block.
229 // The recv() method has a beginning call to pop(), and if successful, it needs
230 // to decrement the count. It is a crucial implementation detail that this
231 // decrement does *not* happen to the shared counter. If this were the case,
232 // then it would be possible for the counter to be very negative when there were
233 // no receivers waiting, in which case the senders would have to determine when
234 // it was actually appropriate to wake up a receiver.
236 // Instead, the "steal count" is kept track of separately (not atomically
237 // because it's only used by receivers), and then the decrement() call when
238 // descheduling will lump in all of the recent steals into one large decrement.
240 // The implication of this is that if a sender sees a -1 count, then there's
241 // guaranteed to be a waiter waiting!
243 // ## Native Implementation
245 // A major goal of these channels is to work seamlessly on and off the runtime.
246 // All of the previous race conditions have been worded in terms of
247 // scheduler-isms (which is obviously not available without the runtime).
249 // For now, native usage of channels (off the runtime) will fall back onto
250 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
251 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
252 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
253 // condition variable.
257 // Being able to support selection over channels has greatly influenced this
258 // design, and not only does selection need to work inside the runtime, but also
259 // outside the runtime.
261 // The implementation is fairly straightforward. The goal of select() is not to
262 // return some data, but only to return which channel can receive data without
263 // blocking. The implementation is essentially the entire blocking procedure
264 // followed by an increment as soon as its woken up. The cancellation procedure
265 // involves an increment and swapping out of to_wake to acquire ownership of the
266 // thread to unblock.
268 // Sadly this current implementation requires multiple allocations, so I have
269 // seen the throughput of select() be much worse than it should be. I do not
270 // believe that there is anything fundamental that needs to change about these
271 // channels, however, in order to support a more efficient select().
275 // And now that you've seen all the races that I found and attempted to fix,
276 // here's the code for you to find some more!
282 use cell
::UnsafeCell
;
283 use time
::{Duration, Instant}
;
285 #[unstable(feature = "mpsc_select", issue = "27800")]
286 pub use self::select
::{Select, Handle}
;
287 use self::select
::StartResult
;
288 use self::select
::StartResult
::*;
289 use self::blocking
::SignalToken
;
300 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
301 /// This half can only be owned by one thread.
303 /// Messages sent to the channel can be retrieved using [`recv`].
305 /// [`channel`]: fn.channel.html
306 /// [`sync_channel`]: fn.sync_channel.html
307 /// [`recv`]: struct.Receiver.html#method.recv
312 /// use std::sync::mpsc::channel;
314 /// use std::time::Duration;
316 /// let (send, recv) = channel();
318 /// thread::spawn(move || {
319 /// send.send("Hello world!").unwrap();
320 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
321 /// send.send("Delayed for 2 seconds").unwrap();
324 /// println!("{}", recv.recv().unwrap()); // Received immediately
325 /// println!("Waiting...");
326 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
328 #[stable(feature = "rust1", since = "1.0.0")]
329 pub struct Receiver
<T
> {
330 inner
: UnsafeCell
<Flavor
<T
>>,
333 // The receiver port can be sent from place to place, so long as it
334 // is not used to receive non-sendable things.
335 #[stable(feature = "rust1", since = "1.0.0")]
336 unsafe impl<T
: Send
> Send
for Receiver
<T
> { }
338 #[stable(feature = "rust1", since = "1.0.0")]
339 impl<T
> !Sync
for Receiver
<T
> { }
341 /// An iterator over messages on a [`Receiver`], created by [`iter`].
343 /// This iterator will block whenever [`next`] is called,
344 /// waiting for a new message, and [`None`] will be returned
345 /// when the corresponding channel has hung up.
347 /// [`iter`]: struct.Receiver.html#method.iter
348 /// [`Receiver`]: struct.Receiver.html
349 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
350 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
355 /// use std::sync::mpsc::channel;
358 /// let (send, recv) = channel();
360 /// thread::spawn(move || {
361 /// send.send(1u8).unwrap();
362 /// send.send(2u8).unwrap();
363 /// send.send(3u8).unwrap();
366 /// for x in recv.iter() {
367 /// println!("Got: {}", x);
370 #[stable(feature = "rust1", since = "1.0.0")]
372 pub struct Iter
<'a
, T
: 'a
> {
376 /// An iterator that attempts to yield all pending values for a [`Receiver`],
377 /// created by [`try_iter`].
379 /// [`None`] will be returned when there are no pending values remaining or
380 /// if the corresponding channel has hung up.
382 /// This iterator will never block the caller in order to wait for data to
383 /// become available. Instead, it will return [`None`].
385 /// [`Receiver`]: struct.Receiver.html
386 /// [`try_iter`]: struct.Receiver.html#method.try_iter
387 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
392 /// use std::sync::mpsc::channel;
394 /// use std::time::Duration;
396 /// let (sender, receiver) = channel();
398 /// // Nothing is in the buffer yet
399 /// assert!(receiver.try_iter().next().is_none());
400 /// println!("Nothing in the buffer...");
402 /// thread::spawn(move || {
403 /// sender.send(1).unwrap();
404 /// sender.send(2).unwrap();
405 /// sender.send(3).unwrap();
408 /// println!("Going to sleep...");
409 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
411 /// for x in receiver.try_iter() {
412 /// println!("Got: {}", x);
415 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
417 pub struct TryIter
<'a
, T
: 'a
> {
421 /// An owning iterator over messages on a [`Receiver`],
422 /// created by **Receiver::into_iter**.
424 /// This iterator will block whenever [`next`]
425 /// is called, waiting for a new message, and [`None`] will be
426 /// returned if the corresponding channel has hung up.
428 /// [`Receiver`]: struct.Receiver.html
429 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
430 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
435 /// use std::sync::mpsc::channel;
438 /// let (send, recv) = channel();
440 /// thread::spawn(move || {
441 /// send.send(1u8).unwrap();
442 /// send.send(2u8).unwrap();
443 /// send.send(3u8).unwrap();
446 /// for x in recv.into_iter() {
447 /// println!("Got: {}", x);
450 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
452 pub struct IntoIter
<T
> {
456 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
457 /// owned by one thread, but it can be cloned to send to other threads.
459 /// Messages can be sent through this channel with [`send`].
461 /// [`channel`]: fn.channel.html
462 /// [`send`]: struct.Sender.html#method.send
467 /// use std::sync::mpsc::channel;
470 /// let (sender, receiver) = channel();
471 /// let sender2 = sender.clone();
473 /// // First thread owns sender
474 /// thread::spawn(move || {
475 /// sender.send(1).unwrap();
478 /// // Second thread owns sender2
479 /// thread::spawn(move || {
480 /// sender2.send(2).unwrap();
483 /// let msg = receiver.recv().unwrap();
484 /// let msg2 = receiver.recv().unwrap();
486 /// assert_eq!(3, msg + msg2);
488 #[stable(feature = "rust1", since = "1.0.0")]
489 pub struct Sender
<T
> {
490 inner
: UnsafeCell
<Flavor
<T
>>,
493 // The send port can be sent from place to place, so long as it
494 // is not used to send non-sendable things.
495 #[stable(feature = "rust1", since = "1.0.0")]
496 unsafe impl<T
: Send
> Send
for Sender
<T
> { }
498 #[stable(feature = "rust1", since = "1.0.0")]
499 impl<T
> !Sync
for Sender
<T
> { }
501 /// The sending-half of Rust's synchronous [`sync_channel`] type.
502 /// This half can only be owned by one thread, but it can be cloned
503 /// to send to other threads.
505 /// Messages can be sent through this channel with [`send`] or [`try_send`].
507 /// [`send`] will block if there is no space in the internal buffer.
509 /// [`sync_channel`]: fn.sync_channel.html
510 /// [`send`]: struct.SyncSender.html#method.send
511 /// [`try_send`]: struct.SyncSender.html#method.try_send
516 /// use std::sync::mpsc::sync_channel;
519 /// // Create a sync_channel with buffer size 2
520 /// let (sync_sender, receiver) = sync_channel(2);
521 /// let sync_sender2 = sync_sender.clone();
523 /// // First thread owns sync_sender
524 /// thread::spawn(move || {
525 /// sync_sender.send(1).unwrap();
526 /// sync_sender.send(2).unwrap();
529 /// // Second thread owns sync_sender2
530 /// thread::spawn(move || {
531 /// sync_sender2.send(3).unwrap();
532 /// // thread will now block since the buffer is full
533 /// println!("Thread unblocked!");
538 /// msg = receiver.recv().unwrap();
539 /// println!("message {} received", msg);
541 /// // "Thread unblocked!" will be printed now
543 /// msg = receiver.recv().unwrap();
544 /// println!("message {} received", msg);
546 /// msg = receiver.recv().unwrap();
548 /// println!("message {} received", msg);
550 #[stable(feature = "rust1", since = "1.0.0")]
551 pub struct SyncSender
<T
> {
552 inner
: Arc
<sync
::Packet
<T
>>,
555 #[stable(feature = "rust1", since = "1.0.0")]
556 unsafe impl<T
: Send
> Send
for SyncSender
<T
> {}
558 #[stable(feature = "rust1", since = "1.0.0")]
559 impl<T
> !Sync
for SyncSender
<T
> {}
561 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
562 /// function on **channel**s.
564 /// A **send** operation can only fail if the receiving end of a channel is
565 /// disconnected, implying that the data could never be received. The error
566 /// contains the data being sent as a payload so it can be recovered.
568 /// [`Sender::send`]: struct.Sender.html#method.send
569 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
570 #[stable(feature = "rust1", since = "1.0.0")]
571 #[derive(PartialEq, Eq, Clone, Copy)]
572 pub struct SendError
<T
>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
574 /// An error returned from the [`recv`] function on a [`Receiver`].
576 /// The [`recv`] operation can only fail if the sending half of a
577 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
578 /// messages will ever be received.
580 /// [`recv`]: struct.Receiver.html#method.recv
581 /// [`Receiver`]: struct.Receiver.html
582 /// [`channel`]: fn.channel.html
583 /// [`sync_channel`]: fn.sync_channel.html
584 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
585 #[stable(feature = "rust1", since = "1.0.0")]
586 pub struct RecvError
;
588 /// This enumeration is the list of the possible reasons that [`try_recv`] could
589 /// not return data when called. This can occur with both a [`channel`] and
590 /// a [`sync_channel`].
592 /// [`try_recv`]: struct.Receiver.html#method.try_recv
593 /// [`channel`]: fn.channel.html
594 /// [`sync_channel`]: fn.sync_channel.html
595 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
596 #[stable(feature = "rust1", since = "1.0.0")]
597 pub enum TryRecvError
{
598 /// This **channel** is currently empty, but the **Sender**(s) have not yet
599 /// disconnected, so data may yet become available.
600 #[stable(feature = "rust1", since = "1.0.0")]
603 /// The **channel**'s sending half has become disconnected, and there will
604 /// never be any more data received on it.
605 #[stable(feature = "rust1", since = "1.0.0")]
609 /// This enumeration is the list of possible errors that made [`recv_timeout`]
610 /// unable to return data when called. This can occur with both a [`channel`] and
611 /// a [`sync_channel`].
613 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
614 /// [`channel`]: fn.channel.html
615 /// [`sync_channel`]: fn.sync_channel.html
616 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
617 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
618 pub enum RecvTimeoutError
{
619 /// This **channel** is currently empty, but the **Sender**(s) have not yet
620 /// disconnected, so data may yet become available.
621 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
623 /// The **channel**'s sending half has become disconnected, and there will
624 /// never be any more data received on it.
625 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
629 /// This enumeration is the list of the possible error outcomes for the
630 /// [`try_send`] method.
632 /// [`try_send`]: struct.SyncSender.html#method.try_send
633 #[stable(feature = "rust1", since = "1.0.0")]
634 #[derive(PartialEq, Eq, Clone, Copy)]
635 pub enum TrySendError
<T
> {
636 /// The data could not be sent on the [`sync_channel`] because it would require that
637 /// the callee block to send the data.
639 /// If this is a buffered channel, then the buffer is full at this time. If
640 /// this is not a buffered channel, then there is no [`Receiver`] available to
641 /// acquire the data.
643 /// [`sync_channel`]: fn.sync_channel.html
644 /// [`Receiver`]: struct.Receiver.html
645 #[stable(feature = "rust1", since = "1.0.0")]
646 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
648 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
649 /// sent. The data is returned back to the callee in this case.
651 /// [`sync_channel`]: fn.sync_channel.html
652 #[stable(feature = "rust1", since = "1.0.0")]
653 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
657 Oneshot(Arc
<oneshot
::Packet
<T
>>),
658 Stream(Arc
<stream
::Packet
<T
>>),
659 Shared(Arc
<shared
::Packet
<T
>>),
660 Sync(Arc
<sync
::Packet
<T
>>),
664 trait UnsafeFlavor
<T
> {
665 fn inner_unsafe(&self) -> &UnsafeCell
<Flavor
<T
>>;
666 unsafe fn inner_mut(&self) -> &mut Flavor
<T
> {
667 &mut *self.inner_unsafe().get()
669 unsafe fn inner(&self) -> &Flavor
<T
> {
670 &*self.inner_unsafe().get()
673 impl<T
> UnsafeFlavor
<T
> for Sender
<T
> {
674 fn inner_unsafe(&self) -> &UnsafeCell
<Flavor
<T
>> {
678 impl<T
> UnsafeFlavor
<T
> for Receiver
<T
> {
679 fn inner_unsafe(&self) -> &UnsafeCell
<Flavor
<T
>> {
684 /// Creates a new asynchronous channel, returning the sender/receiver halves.
685 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
686 /// the same order as it was sent, and no [`send`] will block the calling thread
687 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
688 /// block after its buffer limit is reached). [`recv`] will block until a message
691 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
692 /// only one [`Receiver`] is supported.
694 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
695 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If the
696 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
697 /// return a [`RecvError`].
699 /// [`send`]: struct.Sender.html#method.send
700 /// [`recv`]: struct.Receiver.html#method.recv
701 /// [`Sender`]: struct.Sender.html
702 /// [`Receiver`]: struct.Receiver.html
703 /// [`sync_channel`]: fn.sync_channel.html
704 /// [`SendError`]: struct.SendError.html
705 /// [`RecvError`]: struct.RecvError.html
710 /// use std::sync::mpsc::channel;
713 /// let (sender, receiver) = channel();
715 /// // Spawn off an expensive computation
716 /// thread::spawn(move|| {
717 /// # fn expensive_computation() {}
718 /// sender.send(expensive_computation()).unwrap();
721 /// // Do some useful work for awhile
723 /// // Let's see what that answer was
724 /// println!("{:?}", receiver.recv().unwrap());
726 #[stable(feature = "rust1", since = "1.0.0")]
727 pub fn channel
<T
>() -> (Sender
<T
>, Receiver
<T
>) {
728 let a
= Arc
::new(oneshot
::Packet
::new());
729 (Sender
::new(Flavor
::Oneshot(a
.clone())), Receiver
::new(Flavor
::Oneshot(a
)))
732 /// Creates a new synchronous, bounded channel.
733 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
734 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
735 /// [`Receiver`] will block until a message becomes available. `sync_channel`
736 /// differs greatly in the semantics of the sender, however.
738 /// This channel has an internal buffer on which messages will be queued.
739 /// `bound` specifies the buffer size. When the internal buffer becomes full,
740 /// future sends will *block* waiting for the buffer to open up. Note that a
741 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
742 /// where each [`send`] will not return until a [`recv`] is paired with it.
744 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
745 /// times, but only one [`Receiver`] is supported.
747 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
748 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
749 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
750 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
752 /// [`channel`]: fn.channel.html
753 /// [`send`]: struct.SyncSender.html#method.send
754 /// [`recv`]: struct.Receiver.html#method.recv
755 /// [`SyncSender`]: struct.SyncSender.html
756 /// [`Receiver`]: struct.Receiver.html
757 /// [`SendError`]: struct.SendError.html
758 /// [`RecvError`]: struct.RecvError.html
763 /// use std::sync::mpsc::sync_channel;
766 /// let (sender, receiver) = sync_channel(1);
768 /// // this returns immediately
769 /// sender.send(1).unwrap();
771 /// thread::spawn(move|| {
772 /// // this will block until the previous message has been received
773 /// sender.send(2).unwrap();
776 /// assert_eq!(receiver.recv().unwrap(), 1);
777 /// assert_eq!(receiver.recv().unwrap(), 2);
779 #[stable(feature = "rust1", since = "1.0.0")]
780 pub fn sync_channel
<T
>(bound
: usize) -> (SyncSender
<T
>, Receiver
<T
>) {
781 let a
= Arc
::new(sync
::Packet
::new(bound
));
782 (SyncSender
::new(a
.clone()), Receiver
::new(Flavor
::Sync(a
)))
785 ////////////////////////////////////////////////////////////////////////////////
787 ////////////////////////////////////////////////////////////////////////////////
790 fn new(inner
: Flavor
<T
>) -> Sender
<T
> {
792 inner
: UnsafeCell
::new(inner
),
796 /// Attempts to send a value on this channel, returning it back if it could
799 /// A successful send occurs when it is determined that the other end of
800 /// the channel has not hung up already. An unsuccessful send would be one
801 /// where the corresponding receiver has already been deallocated. Note
802 /// that a return value of [`Err`] means that the data will never be
803 /// received, but a return value of [`Ok`] does *not* mean that the data
804 /// will be received. It is possible for the corresponding receiver to
805 /// hang up immediately after this function returns [`Ok`].
807 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
808 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
810 /// This method will never block the current thread.
815 /// use std::sync::mpsc::channel;
817 /// let (tx, rx) = channel();
819 /// // This send is always successful
820 /// tx.send(1).unwrap();
822 /// // This send will fail because the receiver is gone
824 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
826 #[stable(feature = "rust1", since = "1.0.0")]
827 pub fn send(&self, t
: T
) -> Result
<(), SendError
<T
>> {
828 let (new_inner
, ret
) = match *unsafe { self.inner() }
{
829 Flavor
::Oneshot(ref p
) => {
831 return p
.send(t
).map_err(SendError
);
833 let a
= Arc
::new(stream
::Packet
::new());
834 let rx
= Receiver
::new(Flavor
::Stream(a
.clone()));
835 match p
.upgrade(rx
) {
836 oneshot
::UpSuccess
=> {
840 oneshot
::UpDisconnected
=> (a
, Err(t
)),
841 oneshot
::UpWoke(token
) => {
842 // This send cannot panic because the thread is
843 // asleep (we're looking at it), so the receiver
845 a
.send(t
).ok().unwrap();
852 Flavor
::Stream(ref p
) => return p
.send(t
).map_err(SendError
),
853 Flavor
::Shared(ref p
) => return p
.send(t
).map_err(SendError
),
854 Flavor
::Sync(..) => unreachable
!(),
858 let tmp
= Sender
::new(Flavor
::Stream(new_inner
));
859 mem
::swap(self.inner_mut(), tmp
.inner_mut());
861 ret
.map_err(SendError
)
865 #[stable(feature = "rust1", since = "1.0.0")]
866 impl<T
> Clone
for Sender
<T
> {
867 fn clone(&self) -> Sender
<T
> {
868 let packet
= match *unsafe { self.inner() }
{
869 Flavor
::Oneshot(ref p
) => {
870 let a
= Arc
::new(shared
::Packet
::new());
872 let guard
= a
.postinit_lock();
873 let rx
= Receiver
::new(Flavor
::Shared(a
.clone()));
874 let sleeper
= match p
.upgrade(rx
) {
876 oneshot
::UpDisconnected
=> None
,
877 oneshot
::UpWoke(task
) => Some(task
),
879 a
.inherit_blocker(sleeper
, guard
);
883 Flavor
::Stream(ref p
) => {
884 let a
= Arc
::new(shared
::Packet
::new());
886 let guard
= a
.postinit_lock();
887 let rx
= Receiver
::new(Flavor
::Shared(a
.clone()));
888 let sleeper
= match p
.upgrade(rx
) {
890 stream
::UpDisconnected
=> None
,
891 stream
::UpWoke(task
) => Some(task
),
893 a
.inherit_blocker(sleeper
, guard
);
897 Flavor
::Shared(ref p
) => {
899 return Sender
::new(Flavor
::Shared(p
.clone()));
901 Flavor
::Sync(..) => unreachable
!(),
905 let tmp
= Sender
::new(Flavor
::Shared(packet
.clone()));
906 mem
::swap(self.inner_mut(), tmp
.inner_mut());
908 Sender
::new(Flavor
::Shared(packet
))
912 #[stable(feature = "rust1", since = "1.0.0")]
913 impl<T
> Drop
for Sender
<T
> {
915 match *unsafe { self.inner() }
{
916 Flavor
::Oneshot(ref p
) => p
.drop_chan(),
917 Flavor
::Stream(ref p
) => p
.drop_chan(),
918 Flavor
::Shared(ref p
) => p
.drop_chan(),
919 Flavor
::Sync(..) => unreachable
!(),
924 #[stable(feature = "mpsc_debug", since = "1.8.0")]
925 impl<T
> fmt
::Debug
for Sender
<T
> {
926 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
927 write
!(f
, "Sender {{ .. }}")
931 ////////////////////////////////////////////////////////////////////////////////
933 ////////////////////////////////////////////////////////////////////////////////
935 impl<T
> SyncSender
<T
> {
936 fn new(inner
: Arc
<sync
::Packet
<T
>>) -> SyncSender
<T
> {
937 SyncSender { inner: inner }
940 /// Sends a value on this synchronous channel.
942 /// This function will *block* until space in the internal buffer becomes
943 /// available or a receiver is available to hand off the message to.
945 /// Note that a successful send does *not* guarantee that the receiver will
946 /// ever see the data if there is a buffer on this channel. Items may be
947 /// enqueued in the internal buffer for the receiver to receive at a later
948 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
949 /// channel and it guarantees that the receiver has indeed received
950 /// the data if this function returns success.
952 /// This function will never panic, but it may return [`Err`] if the
953 /// [`Receiver`] has disconnected and is no longer able to receive
956 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
957 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
962 /// use std::sync::mpsc::sync_channel;
965 /// // Create a rendezvous sync_channel with buffer size 0
966 /// let (sync_sender, receiver) = sync_channel(0);
968 /// thread::spawn(move || {
969 /// println!("sending message...");
970 /// sync_sender.send(1).unwrap();
971 /// // Thread is now blocked until the message is received
973 /// println!("...message received!");
976 /// let msg = receiver.recv().unwrap();
977 /// assert_eq!(1, msg);
979 #[stable(feature = "rust1", since = "1.0.0")]
980 pub fn send(&self, t
: T
) -> Result
<(), SendError
<T
>> {
981 self.inner
.send(t
).map_err(SendError
)
984 /// Attempts to send a value on this channel without blocking.
986 /// This method differs from [`send`] by returning immediately if the
987 /// channel's buffer is full or no receiver is waiting to acquire some
988 /// data. Compared with [`send`], this function has two failure cases
989 /// instead of one (one for disconnection, one for a full buffer).
991 /// See [`send`] for notes about guarantees of whether the
992 /// receiver has received the data or not if this function is successful.
994 /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
999 /// use std::sync::mpsc::sync_channel;
1000 /// use std::thread;
1002 /// // Create a sync_channel with buffer size 1
1003 /// let (sync_sender, receiver) = sync_channel(1);
1004 /// let sync_sender2 = sync_sender.clone();
1006 /// // First thread owns sync_sender
1007 /// thread::spawn(move || {
1008 /// sync_sender.send(1).unwrap();
1009 /// sync_sender.send(2).unwrap();
1010 /// // Thread blocked
1013 /// // Second thread owns sync_sender2
1014 /// thread::spawn(move || {
1015 /// // This will return an error and send
1016 /// // no message if the buffer is full
1017 /// sync_sender2.try_send(3).is_err();
1021 /// msg = receiver.recv().unwrap();
1022 /// println!("message {} received", msg);
1024 /// msg = receiver.recv().unwrap();
1025 /// println!("message {} received", msg);
1027 /// // Third message may have never been sent
1028 /// match receiver.try_recv() {
1029 /// Ok(msg) => println!("message {} received", msg),
1030 /// Err(_) => println!("the third message was never sent"),
1033 #[stable(feature = "rust1", since = "1.0.0")]
1034 pub fn try_send(&self, t
: T
) -> Result
<(), TrySendError
<T
>> {
1035 self.inner
.try_send(t
)
1039 #[stable(feature = "rust1", since = "1.0.0")]
1040 impl<T
> Clone
for SyncSender
<T
> {
1041 fn clone(&self) -> SyncSender
<T
> {
1042 self.inner
.clone_chan();
1043 SyncSender
::new(self.inner
.clone())
1047 #[stable(feature = "rust1", since = "1.0.0")]
1048 impl<T
> Drop
for SyncSender
<T
> {
1049 fn drop(&mut self) {
1050 self.inner
.drop_chan();
1054 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1055 impl<T
> fmt
::Debug
for SyncSender
<T
> {
1056 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1057 write
!(f
, "SyncSender {{ .. }}")
1061 ////////////////////////////////////////////////////////////////////////////////
1063 ////////////////////////////////////////////////////////////////////////////////
1065 impl<T
> Receiver
<T
> {
1066 fn new(inner
: Flavor
<T
>) -> Receiver
<T
> {
1067 Receiver { inner: UnsafeCell::new(inner) }
1070 /// Attempts to return a pending value on this receiver without blocking.
1072 /// This method will never block the caller in order to wait for data to
1073 /// become available. Instead, this will always return immediately with a
1074 /// possible option of pending data on the channel.
1076 /// This is useful for a flavor of "optimistic check" before deciding to
1077 /// block on a receiver.
1079 /// Compared with [`recv`], this function has two failure cases instead of one
1080 /// (one for disconnection, one for an empty buffer).
1082 /// [`recv`]: struct.Receiver.html#method.recv
1087 /// use std::sync::mpsc::{Receiver, channel};
1089 /// let (_, receiver): (_, Receiver<i32>) = channel();
1091 /// assert!(receiver.try_recv().is_err());
1093 #[stable(feature = "rust1", since = "1.0.0")]
1094 pub fn try_recv(&self) -> Result
<T
, TryRecvError
> {
1096 let new_port
= match *unsafe { self.inner() }
{
1097 Flavor
::Oneshot(ref p
) => {
1098 match p
.try_recv() {
1099 Ok(t
) => return Ok(t
),
1100 Err(oneshot
::Empty
) => return Err(TryRecvError
::Empty
),
1101 Err(oneshot
::Disconnected
) => {
1102 return Err(TryRecvError
::Disconnected
)
1104 Err(oneshot
::Upgraded(rx
)) => rx
,
1107 Flavor
::Stream(ref p
) => {
1108 match p
.try_recv() {
1109 Ok(t
) => return Ok(t
),
1110 Err(stream
::Empty
) => return Err(TryRecvError
::Empty
),
1111 Err(stream
::Disconnected
) => {
1112 return Err(TryRecvError
::Disconnected
)
1114 Err(stream
::Upgraded(rx
)) => rx
,
1117 Flavor
::Shared(ref p
) => {
1118 match p
.try_recv() {
1119 Ok(t
) => return Ok(t
),
1120 Err(shared
::Empty
) => return Err(TryRecvError
::Empty
),
1121 Err(shared
::Disconnected
) => {
1122 return Err(TryRecvError
::Disconnected
)
1126 Flavor
::Sync(ref p
) => {
1127 match p
.try_recv() {
1128 Ok(t
) => return Ok(t
),
1129 Err(sync
::Empty
) => return Err(TryRecvError
::Empty
),
1130 Err(sync
::Disconnected
) => {
1131 return Err(TryRecvError
::Disconnected
)
1137 mem
::swap(self.inner_mut(),
1138 new_port
.inner_mut());
1143 /// Attempts to wait for a value on this receiver, returning an error if the
1144 /// corresponding channel has hung up.
1146 /// This function will always block the current thread if there is no data
1147 /// available and it's possible for more data to be sent. Once a message is
1148 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1149 /// receiver will wake up and return that message.
1151 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1152 /// this call is blocking, this call will wake up and return [`Err`] to
1153 /// indicate that no more messages can ever be received on this channel.
1154 /// However, since channels are buffered, messages sent before the disconnect
1155 /// will still be properly received.
1157 /// [`Sender`]: struct.Sender.html
1158 /// [`SyncSender`]: struct.SyncSender.html
1159 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1164 /// use std::sync::mpsc;
1165 /// use std::thread;
1167 /// let (send, recv) = mpsc::channel();
1168 /// let handle = thread::spawn(move || {
1169 /// send.send(1u8).unwrap();
1172 /// handle.join().unwrap();
1174 /// assert_eq!(Ok(1), recv.recv());
1177 /// Buffering behavior:
1180 /// use std::sync::mpsc;
1181 /// use std::thread;
1182 /// use std::sync::mpsc::RecvError;
1184 /// let (send, recv) = mpsc::channel();
1185 /// let handle = thread::spawn(move || {
1186 /// send.send(1u8).unwrap();
1187 /// send.send(2).unwrap();
1188 /// send.send(3).unwrap();
1192 /// // wait for the thread to join so we ensure the sender is dropped
1193 /// handle.join().unwrap();
1195 /// assert_eq!(Ok(1), recv.recv());
1196 /// assert_eq!(Ok(2), recv.recv());
1197 /// assert_eq!(Ok(3), recv.recv());
1198 /// assert_eq!(Err(RecvError), recv.recv());
1200 #[stable(feature = "rust1", since = "1.0.0")]
1201 pub fn recv(&self) -> Result
<T
, RecvError
> {
1203 let new_port
= match *unsafe { self.inner() }
{
1204 Flavor
::Oneshot(ref p
) => {
1205 match p
.recv(None
) {
1206 Ok(t
) => return Ok(t
),
1207 Err(oneshot
::Disconnected
) => return Err(RecvError
),
1208 Err(oneshot
::Upgraded(rx
)) => rx
,
1209 Err(oneshot
::Empty
) => unreachable
!(),
1212 Flavor
::Stream(ref p
) => {
1213 match p
.recv(None
) {
1214 Ok(t
) => return Ok(t
),
1215 Err(stream
::Disconnected
) => return Err(RecvError
),
1216 Err(stream
::Upgraded(rx
)) => rx
,
1217 Err(stream
::Empty
) => unreachable
!(),
1220 Flavor
::Shared(ref p
) => {
1221 match p
.recv(None
) {
1222 Ok(t
) => return Ok(t
),
1223 Err(shared
::Disconnected
) => return Err(RecvError
),
1224 Err(shared
::Empty
) => unreachable
!(),
1227 Flavor
::Sync(ref p
) => return p
.recv(None
).map_err(|_
| RecvError
),
1230 mem
::swap(self.inner_mut(), new_port
.inner_mut());
1235 /// Attempts to wait for a value on this receiver, returning an error if the
1236 /// corresponding channel has hung up, or if it waits more than `timeout`.
1238 /// This function will always block the current thread if there is no data
1239 /// available and it's possible for more data to be sent. Once a message is
1240 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1241 /// receiver will wake up and return that message.
1243 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1244 /// this call is blocking, this call will wake up and return [`Err`] to
1245 /// indicate that no more messages can ever be received on this channel.
1246 /// However, since channels are buffered, messages sent before the disconnect
1247 /// will still be properly received.
1249 /// [`Sender`]: struct.Sender.html
1250 /// [`SyncSender`]: struct.SyncSender.html
1251 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1255 /// Successfully receiving value before encountering timeout:
1258 /// use std::thread;
1259 /// use std::time::Duration;
1260 /// use std::sync::mpsc;
1262 /// let (send, recv) = mpsc::channel();
1264 /// thread::spawn(move || {
1265 /// send.send('a').unwrap();
1269 /// recv.recv_timeout(Duration::from_millis(400)),
1274 /// Receiving an error upon reaching timeout:
1277 /// use std::thread;
1278 /// use std::time::Duration;
1279 /// use std::sync::mpsc;
1281 /// let (send, recv) = mpsc::channel();
1283 /// thread::spawn(move || {
1284 /// thread::sleep(Duration::from_millis(800));
1285 /// send.send('a').unwrap();
1289 /// recv.recv_timeout(Duration::from_millis(400)),
1290 /// Err(mpsc::RecvTimeoutError::Timeout)
1293 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1294 pub fn recv_timeout(&self, timeout
: Duration
) -> Result
<T
, RecvTimeoutError
> {
1295 // Do an optimistic try_recv to avoid the performance impact of
1296 // Instant::now() in the full-channel case.
1297 match self.try_recv() {
1300 Err(TryRecvError
::Disconnected
)
1301 => Err(RecvTimeoutError
::Disconnected
),
1302 Err(TryRecvError
::Empty
)
1303 => self.recv_max_until(Instant
::now() + timeout
)
1307 fn recv_max_until(&self, deadline
: Instant
) -> Result
<T
, RecvTimeoutError
> {
1308 use self::RecvTimeoutError
::*;
1311 let port_or_empty
= match *unsafe { self.inner() }
{
1312 Flavor
::Oneshot(ref p
) => {
1313 match p
.recv(Some(deadline
)) {
1314 Ok(t
) => return Ok(t
),
1315 Err(oneshot
::Disconnected
) => return Err(Disconnected
),
1316 Err(oneshot
::Upgraded(rx
)) => Some(rx
),
1317 Err(oneshot
::Empty
) => None
,
1320 Flavor
::Stream(ref p
) => {
1321 match p
.recv(Some(deadline
)) {
1322 Ok(t
) => return Ok(t
),
1323 Err(stream
::Disconnected
) => return Err(Disconnected
),
1324 Err(stream
::Upgraded(rx
)) => Some(rx
),
1325 Err(stream
::Empty
) => None
,
1328 Flavor
::Shared(ref p
) => {
1329 match p
.recv(Some(deadline
)) {
1330 Ok(t
) => return Ok(t
),
1331 Err(shared
::Disconnected
) => return Err(Disconnected
),
1332 Err(shared
::Empty
) => None
,
1335 Flavor
::Sync(ref p
) => {
1336 match p
.recv(Some(deadline
)) {
1337 Ok(t
) => return Ok(t
),
1338 Err(sync
::Disconnected
) => return Err(Disconnected
),
1339 Err(sync
::Empty
) => None
,
1344 if let Some(new_port
) = port_or_empty
{
1346 mem
::swap(self.inner_mut(), new_port
.inner_mut());
1350 // If we're already passed the deadline, and we're here without
1351 // data, return a timeout, else try again.
1352 if Instant
::now() >= deadline
{
1353 return Err(Timeout
);
1358 /// Returns an iterator that will block waiting for messages, but never
1359 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1361 /// [`panic!`]: ../../../std/macro.panic.html
1362 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1367 /// use std::sync::mpsc::channel;
1368 /// use std::thread;
1370 /// let (send, recv) = channel();
1372 /// thread::spawn(move || {
1373 /// send.send(1).unwrap();
1374 /// send.send(2).unwrap();
1375 /// send.send(3).unwrap();
1378 /// let mut iter = recv.iter();
1379 /// assert_eq!(iter.next(), Some(1));
1380 /// assert_eq!(iter.next(), Some(2));
1381 /// assert_eq!(iter.next(), Some(3));
1382 /// assert_eq!(iter.next(), None);
1384 #[stable(feature = "rust1", since = "1.0.0")]
1385 pub fn iter(&self) -> Iter
<T
> {
1389 /// Returns an iterator that will attempt to yield all pending values.
1390 /// It will return `None` if there are no more pending values or if the
1391 /// channel has hung up. The iterator will never [`panic!`] or block the
1392 /// user by waiting for values.
1394 /// [`panic!`]: ../../../std/macro.panic.html
1399 /// use std::sync::mpsc::channel;
1400 /// use std::thread;
1401 /// use std::time::Duration;
1403 /// let (sender, receiver) = channel();
1405 /// // nothing is in the buffer yet
1406 /// assert!(receiver.try_iter().next().is_none());
1408 /// thread::spawn(move || {
1409 /// thread::sleep(Duration::from_secs(1));
1410 /// sender.send(1).unwrap();
1411 /// sender.send(2).unwrap();
1412 /// sender.send(3).unwrap();
1415 /// // nothing is in the buffer yet
1416 /// assert!(receiver.try_iter().next().is_none());
1418 /// // block for two seconds
1419 /// thread::sleep(Duration::from_secs(2));
1421 /// let mut iter = receiver.try_iter();
1422 /// assert_eq!(iter.next(), Some(1));
1423 /// assert_eq!(iter.next(), Some(2));
1424 /// assert_eq!(iter.next(), Some(3));
1425 /// assert_eq!(iter.next(), None);
1427 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1428 pub fn try_iter(&self) -> TryIter
<T
> {
1429 TryIter { rx: self }
1434 impl<T
> select
::Packet
for Receiver
<T
> {
1435 fn can_recv(&self) -> bool
{
1437 let new_port
= match *unsafe { self.inner() }
{
1438 Flavor
::Oneshot(ref p
) => {
1439 match p
.can_recv() {
1440 Ok(ret
) => return ret
,
1441 Err(upgrade
) => upgrade
,
1444 Flavor
::Stream(ref p
) => {
1445 match p
.can_recv() {
1446 Ok(ret
) => return ret
,
1447 Err(upgrade
) => upgrade
,
1450 Flavor
::Shared(ref p
) => return p
.can_recv(),
1451 Flavor
::Sync(ref p
) => return p
.can_recv(),
1454 mem
::swap(self.inner_mut(),
1455 new_port
.inner_mut());
1460 fn start_selection(&self, mut token
: SignalToken
) -> StartResult
{
1462 let (t
, new_port
) = match *unsafe { self.inner() }
{
1463 Flavor
::Oneshot(ref p
) => {
1464 match p
.start_selection(token
) {
1465 oneshot
::SelSuccess
=> return Installed
,
1466 oneshot
::SelCanceled
=> return Abort
,
1467 oneshot
::SelUpgraded(t
, rx
) => (t
, rx
),
1470 Flavor
::Stream(ref p
) => {
1471 match p
.start_selection(token
) {
1472 stream
::SelSuccess
=> return Installed
,
1473 stream
::SelCanceled
=> return Abort
,
1474 stream
::SelUpgraded(t
, rx
) => (t
, rx
),
1477 Flavor
::Shared(ref p
) => return p
.start_selection(token
),
1478 Flavor
::Sync(ref p
) => return p
.start_selection(token
),
1482 mem
::swap(self.inner_mut(), new_port
.inner_mut());
1487 fn abort_selection(&self) -> bool
{
1488 let mut was_upgrade
= false;
1490 let result
= match *unsafe { self.inner() }
{
1491 Flavor
::Oneshot(ref p
) => p
.abort_selection(),
1492 Flavor
::Stream(ref p
) => p
.abort_selection(was_upgrade
),
1493 Flavor
::Shared(ref p
) => return p
.abort_selection(was_upgrade
),
1494 Flavor
::Sync(ref p
) => return p
.abort_selection(),
1496 let new_port
= match result { Ok(b) => return b, Err(p) => p }
;
1499 mem
::swap(self.inner_mut(),
1500 new_port
.inner_mut());
1506 #[stable(feature = "rust1", since = "1.0.0")]
1507 impl<'a
, T
> Iterator
for Iter
<'a
, T
> {
1510 fn next(&mut self) -> Option
<T
> { self.rx.recv().ok() }
1513 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1514 impl<'a
, T
> Iterator
for TryIter
<'a
, T
> {
1517 fn next(&mut self) -> Option
<T
> { self.rx.try_recv().ok() }
1520 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1521 impl<'a
, T
> IntoIterator
for &'a Receiver
<T
> {
1523 type IntoIter
= Iter
<'a
, T
>;
1525 fn into_iter(self) -> Iter
<'a
, T
> { self.iter() }
1528 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1529 impl<T
> Iterator
for IntoIter
<T
> {
1531 fn next(&mut self) -> Option
<T
> { self.rx.recv().ok() }
1534 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1535 impl <T
> IntoIterator
for Receiver
<T
> {
1537 type IntoIter
= IntoIter
<T
>;
1539 fn into_iter(self) -> IntoIter
<T
> {
1540 IntoIter { rx: self }
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl<T
> Drop
for Receiver
<T
> {
1546 fn drop(&mut self) {
1547 match *unsafe { self.inner() }
{
1548 Flavor
::Oneshot(ref p
) => p
.drop_port(),
1549 Flavor
::Stream(ref p
) => p
.drop_port(),
1550 Flavor
::Shared(ref p
) => p
.drop_port(),
1551 Flavor
::Sync(ref p
) => p
.drop_port(),
1556 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1557 impl<T
> fmt
::Debug
for Receiver
<T
> {
1558 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1559 write
!(f
, "Receiver {{ .. }}")
1563 #[stable(feature = "rust1", since = "1.0.0")]
1564 impl<T
> fmt
::Debug
for SendError
<T
> {
1565 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1566 "SendError(..)".fmt(f
)
1570 #[stable(feature = "rust1", since = "1.0.0")]
1571 impl<T
> fmt
::Display
for SendError
<T
> {
1572 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1573 "sending on a closed channel".fmt(f
)
1577 #[stable(feature = "rust1", since = "1.0.0")]
1578 impl<T
: Send
> error
::Error
for SendError
<T
> {
1579 fn description(&self) -> &str {
1580 "sending on a closed channel"
1583 fn cause(&self) -> Option
<&error
::Error
> {
1588 #[stable(feature = "rust1", since = "1.0.0")]
1589 impl<T
> fmt
::Debug
for TrySendError
<T
> {
1590 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1592 TrySendError
::Full(..) => "Full(..)".fmt(f
),
1593 TrySendError
::Disconnected(..) => "Disconnected(..)".fmt(f
),
1598 #[stable(feature = "rust1", since = "1.0.0")]
1599 impl<T
> fmt
::Display
for TrySendError
<T
> {
1600 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1602 TrySendError
::Full(..) => {
1603 "sending on a full channel".fmt(f
)
1605 TrySendError
::Disconnected(..) => {
1606 "sending on a closed channel".fmt(f
)
1612 #[stable(feature = "rust1", since = "1.0.0")]
1613 impl<T
: Send
> error
::Error
for TrySendError
<T
> {
1615 fn description(&self) -> &str {
1617 TrySendError
::Full(..) => {
1618 "sending on a full channel"
1620 TrySendError
::Disconnected(..) => {
1621 "sending on a closed channel"
1626 fn cause(&self) -> Option
<&error
::Error
> {
1631 #[stable(feature = "rust1", since = "1.0.0")]
1632 impl fmt
::Display
for RecvError
{
1633 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1634 "receiving on a closed channel".fmt(f
)
1638 #[stable(feature = "rust1", since = "1.0.0")]
1639 impl error
::Error
for RecvError
{
1641 fn description(&self) -> &str {
1642 "receiving on a closed channel"
1645 fn cause(&self) -> Option
<&error
::Error
> {
1650 #[stable(feature = "rust1", since = "1.0.0")]
1651 impl fmt
::Display
for TryRecvError
{
1652 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1654 TryRecvError
::Empty
=> {
1655 "receiving on an empty channel".fmt(f
)
1657 TryRecvError
::Disconnected
=> {
1658 "receiving on a closed channel".fmt(f
)
1664 #[stable(feature = "rust1", since = "1.0.0")]
1665 impl error
::Error
for TryRecvError
{
1667 fn description(&self) -> &str {
1669 TryRecvError
::Empty
=> {
1670 "receiving on an empty channel"
1672 TryRecvError
::Disconnected
=> {
1673 "receiving on a closed channel"
1678 fn cause(&self) -> Option
<&error
::Error
> {
1683 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1684 impl fmt
::Display
for RecvTimeoutError
{
1685 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1687 RecvTimeoutError
::Timeout
=> {
1688 "timed out waiting on channel".fmt(f
)
1690 RecvTimeoutError
::Disconnected
=> {
1691 "channel is empty and sending half is closed".fmt(f
)
1697 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1698 impl error
::Error
for RecvTimeoutError
{
1699 fn description(&self) -> &str {
1701 RecvTimeoutError
::Timeout
=> {
1702 "timed out waiting on channel"
1704 RecvTimeoutError
::Disconnected
=> {
1705 "channel is empty and sending half is closed"
1710 fn cause(&self) -> Option
<&error
::Error
> {
1715 #[cfg(all(test, not(target_os = "emscripten")))]
1720 use time
::{Duration, Instant}
;
1722 pub fn stress_factor() -> usize {
1723 match env
::var("RUST_TEST_STRESS") {
1724 Ok(val
) => val
.parse().unwrap(),
1731 let (tx
, rx
) = channel
::<i32>();
1732 tx
.send(1).unwrap();
1733 assert_eq
!(rx
.recv().unwrap(), 1);
1738 let (tx
, _rx
) = channel
::<Box
<isize>>();
1739 tx
.send(box 1).unwrap();
1743 fn drop_full_shared() {
1744 let (tx
, _rx
) = channel
::<Box
<isize>>();
1747 tx
.send(box 1).unwrap();
1752 let (tx
, rx
) = channel
::<i32>();
1753 tx
.send(1).unwrap();
1754 assert_eq
!(rx
.recv().unwrap(), 1);
1755 let tx
= tx
.clone();
1756 tx
.send(1).unwrap();
1757 assert_eq
!(rx
.recv().unwrap(), 1);
1761 fn smoke_threads() {
1762 let (tx
, rx
) = channel
::<i32>();
1763 let _t
= thread
::spawn(move|| {
1764 tx
.send(1).unwrap();
1766 assert_eq
!(rx
.recv().unwrap(), 1);
1770 fn smoke_port_gone() {
1771 let (tx
, rx
) = channel
::<i32>();
1773 assert
!(tx
.send(1).is_err());
1777 fn smoke_shared_port_gone() {
1778 let (tx
, rx
) = channel
::<i32>();
1780 assert
!(tx
.send(1).is_err())
1784 fn smoke_shared_port_gone2() {
1785 let (tx
, rx
) = channel
::<i32>();
1787 let tx2
= tx
.clone();
1789 assert
!(tx2
.send(1).is_err());
1793 fn port_gone_concurrent() {
1794 let (tx
, rx
) = channel
::<i32>();
1795 let _t
= thread
::spawn(move|| {
1798 while tx
.send(1).is_ok() {}
1802 fn port_gone_concurrent_shared() {
1803 let (tx
, rx
) = channel
::<i32>();
1804 let tx2
= tx
.clone();
1805 let _t
= thread
::spawn(move|| {
1808 while tx
.send(1).is_ok() && tx2
.send(1).is_ok() {}
1812 fn smoke_chan_gone() {
1813 let (tx
, rx
) = channel
::<i32>();
1815 assert
!(rx
.recv().is_err());
1819 fn smoke_chan_gone_shared() {
1820 let (tx
, rx
) = channel
::<()>();
1821 let tx2
= tx
.clone();
1824 assert
!(rx
.recv().is_err());
1828 fn chan_gone_concurrent() {
1829 let (tx
, rx
) = channel
::<i32>();
1830 let _t
= thread
::spawn(move|| {
1831 tx
.send(1).unwrap();
1832 tx
.send(1).unwrap();
1834 while rx
.recv().is_ok() {}
1839 let (tx
, rx
) = channel
::<i32>();
1840 let t
= thread
::spawn(move|| {
1841 for _
in 0..10000 { tx.send(1).unwrap(); }
1844 assert_eq
!(rx
.recv().unwrap(), 1);
1846 t
.join().ok().unwrap();
1850 fn stress_shared() {
1851 const AMT
: u32 = 10000;
1852 const NTHREADS
: u32 = 8;
1853 let (tx
, rx
) = channel
::<i32>();
1855 let t
= thread
::spawn(move|| {
1856 for _
in 0..AMT
* NTHREADS
{
1857 assert_eq
!(rx
.recv().unwrap(), 1);
1859 match rx
.try_recv() {
1865 for _
in 0..NTHREADS
{
1866 let tx
= tx
.clone();
1867 thread
::spawn(move|| {
1868 for _
in 0..AMT { tx.send(1).unwrap(); }
1872 t
.join().ok().unwrap();
1876 fn send_from_outside_runtime() {
1877 let (tx1
, rx1
) = channel
::<()>();
1878 let (tx2
, rx2
) = channel
::<i32>();
1879 let t1
= thread
::spawn(move|| {
1880 tx1
.send(()).unwrap();
1882 assert_eq
!(rx2
.recv().unwrap(), 1);
1885 rx1
.recv().unwrap();
1886 let t2
= thread
::spawn(move|| {
1888 tx2
.send(1).unwrap();
1891 t1
.join().ok().unwrap();
1892 t2
.join().ok().unwrap();
1896 fn recv_from_outside_runtime() {
1897 let (tx
, rx
) = channel
::<i32>();
1898 let t
= thread
::spawn(move|| {
1900 assert_eq
!(rx
.recv().unwrap(), 1);
1904 tx
.send(1).unwrap();
1906 t
.join().ok().unwrap();
1911 let (tx1
, rx1
) = channel
::<i32>();
1912 let (tx2
, rx2
) = channel
::<i32>();
1913 let t1
= thread
::spawn(move|| {
1914 assert_eq
!(rx1
.recv().unwrap(), 1);
1915 tx2
.send(2).unwrap();
1917 let t2
= thread
::spawn(move|| {
1918 tx1
.send(1).unwrap();
1919 assert_eq
!(rx2
.recv().unwrap(), 2);
1921 t1
.join().ok().unwrap();
1922 t2
.join().ok().unwrap();
1926 fn oneshot_single_thread_close_port_first() {
1927 // Simple test of closing without sending
1928 let (_tx
, rx
) = channel
::<i32>();
1933 fn oneshot_single_thread_close_chan_first() {
1934 // Simple test of closing without sending
1935 let (tx
, _rx
) = channel
::<i32>();
1940 fn oneshot_single_thread_send_port_close() {
1941 // Testing that the sender cleans up the payload if receiver is closed
1942 let (tx
, rx
) = channel
::<Box
<i32>>();
1944 assert
!(tx
.send(box 0).is_err());
1948 fn oneshot_single_thread_recv_chan_close() {
1949 // Receiving on a closed chan will panic
1950 let res
= thread
::spawn(move|| {
1951 let (tx
, rx
) = channel
::<i32>();
1956 assert
!(res
.is_err());
1960 fn oneshot_single_thread_send_then_recv() {
1961 let (tx
, rx
) = channel
::<Box
<i32>>();
1962 tx
.send(box 10).unwrap();
1963 assert
!(*rx
.recv().unwrap() == 10);
1967 fn oneshot_single_thread_try_send_open() {
1968 let (tx
, rx
) = channel
::<i32>();
1969 assert
!(tx
.send(10).is_ok());
1970 assert
!(rx
.recv().unwrap() == 10);
1974 fn oneshot_single_thread_try_send_closed() {
1975 let (tx
, rx
) = channel
::<i32>();
1977 assert
!(tx
.send(10).is_err());
1981 fn oneshot_single_thread_try_recv_open() {
1982 let (tx
, rx
) = channel
::<i32>();
1983 tx
.send(10).unwrap();
1984 assert
!(rx
.recv() == Ok(10));
1988 fn oneshot_single_thread_try_recv_closed() {
1989 let (tx
, rx
) = channel
::<i32>();
1991 assert
!(rx
.recv().is_err());
1995 fn oneshot_single_thread_peek_data() {
1996 let (tx
, rx
) = channel
::<i32>();
1997 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
1998 tx
.send(10).unwrap();
1999 assert_eq
!(rx
.try_recv(), Ok(10));
2003 fn oneshot_single_thread_peek_close() {
2004 let (tx
, rx
) = channel
::<i32>();
2006 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
2007 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
2011 fn oneshot_single_thread_peek_open() {
2012 let (_tx
, rx
) = channel
::<i32>();
2013 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
2017 fn oneshot_multi_task_recv_then_send() {
2018 let (tx
, rx
) = channel
::<Box
<i32>>();
2019 let _t
= thread
::spawn(move|| {
2020 assert
!(*rx
.recv().unwrap() == 10);
2023 tx
.send(box 10).unwrap();
2027 fn oneshot_multi_task_recv_then_close() {
2028 let (tx
, rx
) = channel
::<Box
<i32>>();
2029 let _t
= thread
::spawn(move|| {
2032 let res
= thread
::spawn(move|| {
2033 assert
!(*rx
.recv().unwrap() == 10);
2035 assert
!(res
.is_err());
2039 fn oneshot_multi_thread_close_stress() {
2040 for _
in 0..stress_factor() {
2041 let (tx
, rx
) = channel
::<i32>();
2042 let _t
= thread
::spawn(move|| {
2050 fn oneshot_multi_thread_send_close_stress() {
2051 for _
in 0..stress_factor() {
2052 let (tx
, rx
) = channel
::<i32>();
2053 let _t
= thread
::spawn(move|| {
2056 let _
= thread
::spawn(move|| {
2057 tx
.send(1).unwrap();
2063 fn oneshot_multi_thread_recv_close_stress() {
2064 for _
in 0..stress_factor() {
2065 let (tx
, rx
) = channel
::<i32>();
2066 thread
::spawn(move|| {
2067 let res
= thread
::spawn(move|| {
2070 assert
!(res
.is_err());
2072 let _t
= thread
::spawn(move|| {
2073 thread
::spawn(move|| {
2081 fn oneshot_multi_thread_send_recv_stress() {
2082 for _
in 0..stress_factor() {
2083 let (tx
, rx
) = channel
::<Box
<isize>>();
2084 let _t
= thread
::spawn(move|| {
2085 tx
.send(box 10).unwrap();
2087 assert
!(*rx
.recv().unwrap() == 10);
2092 fn stream_send_recv_stress() {
2093 for _
in 0..stress_factor() {
2094 let (tx
, rx
) = channel();
2099 fn send(tx
: Sender
<Box
<i32>>, i
: i32) {
2100 if i
== 10 { return }
2102 thread
::spawn(move|| {
2103 tx
.send(box i
).unwrap();
2108 fn recv(rx
: Receiver
<Box
<i32>>, i
: i32) {
2109 if i
== 10 { return }
2111 thread
::spawn(move|| {
2112 assert
!(*rx
.recv().unwrap() == i
);
2120 fn oneshot_single_thread_recv_timeout() {
2121 let (tx
, rx
) = channel();
2122 tx
.send(()).unwrap();
2123 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Ok(()));
2124 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Err(RecvTimeoutError
::Timeout
));
2125 tx
.send(()).unwrap();
2126 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Ok(()));
2130 fn stress_recv_timeout_two_threads() {
2131 let (tx
, rx
) = channel();
2132 let stress
= stress_factor() + 100;
2133 let timeout
= Duration
::from_millis(100);
2135 thread
::spawn(move || {
2136 for i
in 0..stress
{
2138 thread
::sleep(timeout
* 2);
2140 tx
.send(1usize
).unwrap();
2144 let mut recv_count
= 0;
2146 match rx
.recv_timeout(timeout
) {
2148 assert_eq
!(n
, 1usize
);
2151 Err(RecvTimeoutError
::Timeout
) => continue,
2152 Err(RecvTimeoutError
::Disconnected
) => break,
2156 assert_eq
!(recv_count
, stress
);
2160 fn recv_timeout_upgrade() {
2161 let (tx
, rx
) = channel
::<()>();
2162 let timeout
= Duration
::from_millis(1);
2163 let _tx_clone
= tx
.clone();
2165 let start
= Instant
::now();
2166 assert_eq
!(rx
.recv_timeout(timeout
), Err(RecvTimeoutError
::Timeout
));
2167 assert
!(Instant
::now() >= start
+ timeout
);
2171 fn stress_recv_timeout_shared() {
2172 let (tx
, rx
) = channel();
2173 let stress
= stress_factor() + 100;
2175 for i
in 0..stress
{
2176 let tx
= tx
.clone();
2177 thread
::spawn(move || {
2178 thread
::sleep(Duration
::from_millis(i
as u64 * 10));
2179 tx
.send(1usize
).unwrap();
2185 let mut recv_count
= 0;
2187 match rx
.recv_timeout(Duration
::from_millis(10)) {
2189 assert_eq
!(n
, 1usize
);
2192 Err(RecvTimeoutError
::Timeout
) => continue,
2193 Err(RecvTimeoutError
::Disconnected
) => break,
2197 assert_eq
!(recv_count
, stress
);
2202 // Regression test that we don't run out of stack in scheduler context
2203 let (tx
, rx
) = channel();
2204 for _
in 0..10000 { tx.send(()).unwrap(); }
2205 for _
in 0..10000 { rx.recv().unwrap(); }
2209 fn shared_recv_timeout() {
2210 let (tx
, rx
) = channel();
2213 let tx
= tx
.clone();
2214 thread
::spawn(move|| {
2215 tx
.send(()).unwrap();
2219 for _
in 0..total { rx.recv().unwrap(); }
2221 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Err(RecvTimeoutError
::Timeout
));
2222 tx
.send(()).unwrap();
2223 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Ok(()));
2227 fn shared_chan_stress() {
2228 let (tx
, rx
) = channel();
2229 let total
= stress_factor() + 100;
2231 let tx
= tx
.clone();
2232 thread
::spawn(move|| {
2233 tx
.send(()).unwrap();
2243 fn test_nested_recv_iter() {
2244 let (tx
, rx
) = channel
::<i32>();
2245 let (total_tx
, total_rx
) = channel
::<i32>();
2247 let _t
= thread
::spawn(move|| {
2249 for x
in rx
.iter() {
2252 total_tx
.send(acc
).unwrap();
2255 tx
.send(3).unwrap();
2256 tx
.send(1).unwrap();
2257 tx
.send(2).unwrap();
2259 assert_eq
!(total_rx
.recv().unwrap(), 6);
2263 fn test_recv_iter_break() {
2264 let (tx
, rx
) = channel
::<i32>();
2265 let (count_tx
, count_rx
) = channel();
2267 let _t
= thread
::spawn(move|| {
2269 for x
in rx
.iter() {
2276 count_tx
.send(count
).unwrap();
2279 tx
.send(2).unwrap();
2280 tx
.send(2).unwrap();
2281 tx
.send(2).unwrap();
2284 assert_eq
!(count_rx
.recv().unwrap(), 4);
2288 fn test_recv_try_iter() {
2289 let (request_tx
, request_rx
) = channel();
2290 let (response_tx
, response_rx
) = channel();
2292 // Request `x`s until we have `6`.
2293 let t
= thread
::spawn(move|| {
2296 for x
in response_rx
.try_iter() {
2302 request_tx
.send(()).unwrap();
2306 for _
in request_rx
.iter() {
2307 if response_tx
.send(2).is_err() {
2312 assert_eq
!(t
.join().unwrap(), 6);
2316 fn test_recv_into_iter_owned() {
2318 let (tx
, rx
) = channel
::<i32>();
2319 tx
.send(1).unwrap();
2320 tx
.send(2).unwrap();
2324 assert_eq
!(iter
.next().unwrap(), 1);
2325 assert_eq
!(iter
.next().unwrap(), 2);
2326 assert_eq
!(iter
.next().is_none(), true);
2330 fn test_recv_into_iter_borrowed() {
2331 let (tx
, rx
) = channel
::<i32>();
2332 tx
.send(1).unwrap();
2333 tx
.send(2).unwrap();
2335 let mut iter
= (&rx
).into_iter();
2336 assert_eq
!(iter
.next().unwrap(), 1);
2337 assert_eq
!(iter
.next().unwrap(), 2);
2338 assert_eq
!(iter
.next().is_none(), true);
2342 fn try_recv_states() {
2343 let (tx1
, rx1
) = channel
::<i32>();
2344 let (tx2
, rx2
) = channel
::<()>();
2345 let (tx3
, rx3
) = channel
::<()>();
2346 let _t
= thread
::spawn(move|| {
2347 rx2
.recv().unwrap();
2348 tx1
.send(1).unwrap();
2349 tx3
.send(()).unwrap();
2350 rx2
.recv().unwrap();
2352 tx3
.send(()).unwrap();
2355 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
2356 tx2
.send(()).unwrap();
2357 rx3
.recv().unwrap();
2358 assert_eq
!(rx1
.try_recv(), Ok(1));
2359 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
2360 tx2
.send(()).unwrap();
2361 rx3
.recv().unwrap();
2362 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Disconnected
));
2365 // This bug used to end up in a livelock inside of the Receiver destructor
2366 // because the internal state of the Shared packet was corrupted
2368 fn destroy_upgraded_shared_port_when_sender_still_active() {
2369 let (tx
, rx
) = channel();
2370 let (tx2
, rx2
) = channel();
2371 let _t
= thread
::spawn(move|| {
2372 rx
.recv().unwrap(); // wait on a oneshot
2373 drop(rx
); // destroy a shared
2374 tx2
.send(()).unwrap();
2376 // make sure the other thread has gone to sleep
2377 for _
in 0..5000 { thread::yield_now(); }
2379 // upgrade to a shared chan and send a message
2382 t
.send(()).unwrap();
2384 // wait for the child thread to exit before we exit
2385 rx2
.recv().unwrap();
2390 let (tx
, _
) = channel();
2391 let _
= tx
.send(123);
2392 assert_eq
!(tx
.send(123), Err(SendError(123)));
2396 #[cfg(all(test, not(target_os = "emscripten")))]
2403 pub fn stress_factor() -> usize {
2404 match env
::var("RUST_TEST_STRESS") {
2405 Ok(val
) => val
.parse().unwrap(),
2412 let (tx
, rx
) = sync_channel
::<i32>(1);
2413 tx
.send(1).unwrap();
2414 assert_eq
!(rx
.recv().unwrap(), 1);
2419 let (tx
, _rx
) = sync_channel
::<Box
<isize>>(1);
2420 tx
.send(box 1).unwrap();
2425 let (tx
, rx
) = sync_channel
::<i32>(1);
2426 tx
.send(1).unwrap();
2427 assert_eq
!(rx
.recv().unwrap(), 1);
2428 let tx
= tx
.clone();
2429 tx
.send(1).unwrap();
2430 assert_eq
!(rx
.recv().unwrap(), 1);
2435 let (tx
, rx
) = sync_channel
::<i32>(1);
2436 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Err(RecvTimeoutError
::Timeout
));
2437 tx
.send(1).unwrap();
2438 assert_eq
!(rx
.recv_timeout(Duration
::from_millis(1)), Ok(1));
2442 fn smoke_threads() {
2443 let (tx
, rx
) = sync_channel
::<i32>(0);
2444 let _t
= thread
::spawn(move|| {
2445 tx
.send(1).unwrap();
2447 assert_eq
!(rx
.recv().unwrap(), 1);
2451 fn smoke_port_gone() {
2452 let (tx
, rx
) = sync_channel
::<i32>(0);
2454 assert
!(tx
.send(1).is_err());
2458 fn smoke_shared_port_gone2() {
2459 let (tx
, rx
) = sync_channel
::<i32>(0);
2461 let tx2
= tx
.clone();
2463 assert
!(tx2
.send(1).is_err());
2467 fn port_gone_concurrent() {
2468 let (tx
, rx
) = sync_channel
::<i32>(0);
2469 let _t
= thread
::spawn(move|| {
2472 while tx
.send(1).is_ok() {}
2476 fn port_gone_concurrent_shared() {
2477 let (tx
, rx
) = sync_channel
::<i32>(0);
2478 let tx2
= tx
.clone();
2479 let _t
= thread
::spawn(move|| {
2482 while tx
.send(1).is_ok() && tx2
.send(1).is_ok() {}
2486 fn smoke_chan_gone() {
2487 let (tx
, rx
) = sync_channel
::<i32>(0);
2489 assert
!(rx
.recv().is_err());
2493 fn smoke_chan_gone_shared() {
2494 let (tx
, rx
) = sync_channel
::<()>(0);
2495 let tx2
= tx
.clone();
2498 assert
!(rx
.recv().is_err());
2502 fn chan_gone_concurrent() {
2503 let (tx
, rx
) = sync_channel
::<i32>(0);
2504 thread
::spawn(move|| {
2505 tx
.send(1).unwrap();
2506 tx
.send(1).unwrap();
2508 while rx
.recv().is_ok() {}
2513 let (tx
, rx
) = sync_channel
::<i32>(0);
2514 thread
::spawn(move|| {
2515 for _
in 0..10000 { tx.send(1).unwrap(); }
2518 assert_eq
!(rx
.recv().unwrap(), 1);
2523 fn stress_recv_timeout_two_threads() {
2524 let (tx
, rx
) = sync_channel
::<i32>(0);
2526 thread
::spawn(move|| {
2527 for _
in 0..10000 { tx.send(1).unwrap(); }
2530 let mut recv_count
= 0;
2532 match rx
.recv_timeout(Duration
::from_millis(1)) {
2537 Err(RecvTimeoutError
::Timeout
) => continue,
2538 Err(RecvTimeoutError
::Disconnected
) => break,
2542 assert_eq
!(recv_count
, 10000);
2546 fn stress_recv_timeout_shared() {
2547 const AMT
: u32 = 1000;
2548 const NTHREADS
: u32 = 8;
2549 let (tx
, rx
) = sync_channel
::<i32>(0);
2550 let (dtx
, drx
) = sync_channel
::<()>(0);
2552 thread
::spawn(move|| {
2553 let mut recv_count
= 0;
2555 match rx
.recv_timeout(Duration
::from_millis(10)) {
2560 Err(RecvTimeoutError
::Timeout
) => continue,
2561 Err(RecvTimeoutError
::Disconnected
) => break,
2565 assert_eq
!(recv_count
, AMT
* NTHREADS
);
2566 assert
!(rx
.try_recv().is_err());
2568 dtx
.send(()).unwrap();
2571 for _
in 0..NTHREADS
{
2572 let tx
= tx
.clone();
2573 thread
::spawn(move|| {
2574 for _
in 0..AMT { tx.send(1).unwrap(); }
2580 drx
.recv().unwrap();
2584 fn stress_shared() {
2585 const AMT
: u32 = 1000;
2586 const NTHREADS
: u32 = 8;
2587 let (tx
, rx
) = sync_channel
::<i32>(0);
2588 let (dtx
, drx
) = sync_channel
::<()>(0);
2590 thread
::spawn(move|| {
2591 for _
in 0..AMT
* NTHREADS
{
2592 assert_eq
!(rx
.recv().unwrap(), 1);
2594 match rx
.try_recv() {
2598 dtx
.send(()).unwrap();
2601 for _
in 0..NTHREADS
{
2602 let tx
= tx
.clone();
2603 thread
::spawn(move|| {
2604 for _
in 0..AMT { tx.send(1).unwrap(); }
2608 drx
.recv().unwrap();
2612 fn oneshot_single_thread_close_port_first() {
2613 // Simple test of closing without sending
2614 let (_tx
, rx
) = sync_channel
::<i32>(0);
2619 fn oneshot_single_thread_close_chan_first() {
2620 // Simple test of closing without sending
2621 let (tx
, _rx
) = sync_channel
::<i32>(0);
2626 fn oneshot_single_thread_send_port_close() {
2627 // Testing that the sender cleans up the payload if receiver is closed
2628 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
2630 assert
!(tx
.send(box 0).is_err());
2634 fn oneshot_single_thread_recv_chan_close() {
2635 // Receiving on a closed chan will panic
2636 let res
= thread
::spawn(move|| {
2637 let (tx
, rx
) = sync_channel
::<i32>(0);
2642 assert
!(res
.is_err());
2646 fn oneshot_single_thread_send_then_recv() {
2647 let (tx
, rx
) = sync_channel
::<Box
<i32>>(1);
2648 tx
.send(box 10).unwrap();
2649 assert
!(*rx
.recv().unwrap() == 10);
2653 fn oneshot_single_thread_try_send_open() {
2654 let (tx
, rx
) = sync_channel
::<i32>(1);
2655 assert_eq
!(tx
.try_send(10), Ok(()));
2656 assert
!(rx
.recv().unwrap() == 10);
2660 fn oneshot_single_thread_try_send_closed() {
2661 let (tx
, rx
) = sync_channel
::<i32>(0);
2663 assert_eq
!(tx
.try_send(10), Err(TrySendError
::Disconnected(10)));
2667 fn oneshot_single_thread_try_send_closed2() {
2668 let (tx
, _rx
) = sync_channel
::<i32>(0);
2669 assert_eq
!(tx
.try_send(10), Err(TrySendError
::Full(10)));
2673 fn oneshot_single_thread_try_recv_open() {
2674 let (tx
, rx
) = sync_channel
::<i32>(1);
2675 tx
.send(10).unwrap();
2676 assert
!(rx
.recv() == Ok(10));
2680 fn oneshot_single_thread_try_recv_closed() {
2681 let (tx
, rx
) = sync_channel
::<i32>(0);
2683 assert
!(rx
.recv().is_err());
2687 fn oneshot_single_thread_try_recv_closed_with_data() {
2688 let (tx
, rx
) = sync_channel
::<i32>(1);
2689 tx
.send(10).unwrap();
2691 assert_eq
!(rx
.try_recv(), Ok(10));
2692 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
2696 fn oneshot_single_thread_peek_data() {
2697 let (tx
, rx
) = sync_channel
::<i32>(1);
2698 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
2699 tx
.send(10).unwrap();
2700 assert_eq
!(rx
.try_recv(), Ok(10));
2704 fn oneshot_single_thread_peek_close() {
2705 let (tx
, rx
) = sync_channel
::<i32>(0);
2707 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
2708 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Disconnected
));
2712 fn oneshot_single_thread_peek_open() {
2713 let (_tx
, rx
) = sync_channel
::<i32>(0);
2714 assert_eq
!(rx
.try_recv(), Err(TryRecvError
::Empty
));
2718 fn oneshot_multi_task_recv_then_send() {
2719 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
2720 let _t
= thread
::spawn(move|| {
2721 assert
!(*rx
.recv().unwrap() == 10);
2724 tx
.send(box 10).unwrap();
2728 fn oneshot_multi_task_recv_then_close() {
2729 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
2730 let _t
= thread
::spawn(move|| {
2733 let res
= thread
::spawn(move|| {
2734 assert
!(*rx
.recv().unwrap() == 10);
2736 assert
!(res
.is_err());
2740 fn oneshot_multi_thread_close_stress() {
2741 for _
in 0..stress_factor() {
2742 let (tx
, rx
) = sync_channel
::<i32>(0);
2743 let _t
= thread
::spawn(move|| {
2751 fn oneshot_multi_thread_send_close_stress() {
2752 for _
in 0..stress_factor() {
2753 let (tx
, rx
) = sync_channel
::<i32>(0);
2754 let _t
= thread
::spawn(move|| {
2757 let _
= thread
::spawn(move || {
2758 tx
.send(1).unwrap();
2764 fn oneshot_multi_thread_recv_close_stress() {
2765 for _
in 0..stress_factor() {
2766 let (tx
, rx
) = sync_channel
::<i32>(0);
2767 let _t
= thread
::spawn(move|| {
2768 let res
= thread
::spawn(move|| {
2771 assert
!(res
.is_err());
2773 let _t
= thread
::spawn(move|| {
2774 thread
::spawn(move|| {
2782 fn oneshot_multi_thread_send_recv_stress() {
2783 for _
in 0..stress_factor() {
2784 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
2785 let _t
= thread
::spawn(move|| {
2786 tx
.send(box 10).unwrap();
2788 assert
!(*rx
.recv().unwrap() == 10);
2793 fn stream_send_recv_stress() {
2794 for _
in 0..stress_factor() {
2795 let (tx
, rx
) = sync_channel
::<Box
<i32>>(0);
2800 fn send(tx
: SyncSender
<Box
<i32>>, i
: i32) {
2801 if i
== 10 { return }
2803 thread
::spawn(move|| {
2804 tx
.send(box i
).unwrap();
2809 fn recv(rx
: Receiver
<Box
<i32>>, i
: i32) {
2810 if i
== 10 { return }
2812 thread
::spawn(move|| {
2813 assert
!(*rx
.recv().unwrap() == i
);
2822 // Regression test that we don't run out of stack in scheduler context
2823 let (tx
, rx
) = sync_channel(10000);
2824 for _
in 0..10000 { tx.send(()).unwrap(); }
2825 for _
in 0..10000 { rx.recv().unwrap(); }
2829 fn shared_chan_stress() {
2830 let (tx
, rx
) = sync_channel(0);
2831 let total
= stress_factor() + 100;
2833 let tx
= tx
.clone();
2834 thread
::spawn(move|| {
2835 tx
.send(()).unwrap();
2845 fn test_nested_recv_iter() {
2846 let (tx
, rx
) = sync_channel
::<i32>(0);
2847 let (total_tx
, total_rx
) = sync_channel
::<i32>(0);
2849 let _t
= thread
::spawn(move|| {
2851 for x
in rx
.iter() {
2854 total_tx
.send(acc
).unwrap();
2857 tx
.send(3).unwrap();
2858 tx
.send(1).unwrap();
2859 tx
.send(2).unwrap();
2861 assert_eq
!(total_rx
.recv().unwrap(), 6);
2865 fn test_recv_iter_break() {
2866 let (tx
, rx
) = sync_channel
::<i32>(0);
2867 let (count_tx
, count_rx
) = sync_channel(0);
2869 let _t
= thread
::spawn(move|| {
2871 for x
in rx
.iter() {
2878 count_tx
.send(count
).unwrap();
2881 tx
.send(2).unwrap();
2882 tx
.send(2).unwrap();
2883 tx
.send(2).unwrap();
2884 let _
= tx
.try_send(2);
2886 assert_eq
!(count_rx
.recv().unwrap(), 4);
2890 fn try_recv_states() {
2891 let (tx1
, rx1
) = sync_channel
::<i32>(1);
2892 let (tx2
, rx2
) = sync_channel
::<()>(1);
2893 let (tx3
, rx3
) = sync_channel
::<()>(1);
2894 let _t
= thread
::spawn(move|| {
2895 rx2
.recv().unwrap();
2896 tx1
.send(1).unwrap();
2897 tx3
.send(()).unwrap();
2898 rx2
.recv().unwrap();
2900 tx3
.send(()).unwrap();
2903 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
2904 tx2
.send(()).unwrap();
2905 rx3
.recv().unwrap();
2906 assert_eq
!(rx1
.try_recv(), Ok(1));
2907 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Empty
));
2908 tx2
.send(()).unwrap();
2909 rx3
.recv().unwrap();
2910 assert_eq
!(rx1
.try_recv(), Err(TryRecvError
::Disconnected
));
2913 // This bug used to end up in a livelock inside of the Receiver destructor
2914 // because the internal state of the Shared packet was corrupted
2916 fn destroy_upgraded_shared_port_when_sender_still_active() {
2917 let (tx
, rx
) = sync_channel
::<()>(0);
2918 let (tx2
, rx2
) = sync_channel
::<()>(0);
2919 let _t
= thread
::spawn(move|| {
2920 rx
.recv().unwrap(); // wait on a oneshot
2921 drop(rx
); // destroy a shared
2922 tx2
.send(()).unwrap();
2924 // make sure the other thread has gone to sleep
2925 for _
in 0..5000 { thread::yield_now(); }
2927 // upgrade to a shared chan and send a message
2930 t
.send(()).unwrap();
2932 // wait for the child thread to exit before we exit
2933 rx2
.recv().unwrap();
2938 let (tx
, rx
) = sync_channel
::<i32>(0);
2939 let _t
= thread
::spawn(move|| { rx.recv().unwrap(); }
);
2940 assert_eq
!(tx
.send(1), Ok(()));
2945 let (tx
, rx
) = sync_channel
::<i32>(0);
2946 let _t
= thread
::spawn(move|| { drop(rx); }
);
2947 assert
!(tx
.send(1).is_err());
2952 let (tx
, rx
) = sync_channel
::<i32>(1);
2953 assert_eq
!(tx
.send(1), Ok(()));
2954 let _t
=thread
::spawn(move|| { drop(rx); }
);
2955 assert
!(tx
.send(1).is_err());
2960 let (tx
, rx
) = sync_channel
::<i32>(0);
2961 let tx2
= tx
.clone();
2962 let (done
, donerx
) = channel();
2963 let done2
= done
.clone();
2964 let _t
= thread
::spawn(move|| {
2965 assert
!(tx
.send(1).is_err());
2966 done
.send(()).unwrap();
2968 let _t
= thread
::spawn(move|| {
2969 assert
!(tx2
.send(2).is_err());
2970 done2
.send(()).unwrap();
2973 donerx
.recv().unwrap();
2974 donerx
.recv().unwrap();
2979 let (tx
, _rx
) = sync_channel
::<i32>(0);
2980 assert_eq
!(tx
.try_send(1), Err(TrySendError
::Full(1)));
2985 let (tx
, _rx
) = sync_channel
::<i32>(1);
2986 assert_eq
!(tx
.try_send(1), Ok(()));
2987 assert_eq
!(tx
.try_send(1), Err(TrySendError
::Full(1)));
2992 let (tx
, rx
) = sync_channel
::<i32>(1);
2993 assert_eq
!(tx
.try_send(1), Ok(()));
2995 assert_eq
!(tx
.try_send(1), Err(TrySendError
::Disconnected(1)));
3001 let (tx1
, rx1
) = sync_channel
::<()>(3);
3002 let (tx2
, rx2
) = sync_channel
::<()>(3);
3004 let _t
= thread
::spawn(move|| {
3005 rx1
.recv().unwrap();
3006 tx2
.try_send(()).unwrap();
3009 tx1
.try_send(()).unwrap();
3010 rx2
.recv().unwrap();
3019 fn fmt_debug_sender() {
3020 let (tx
, _
) = channel
::<i32>();
3021 assert_eq
!(format
!("{:?}", tx
), "Sender { .. }");
3025 fn fmt_debug_recv() {
3026 let (_
, rx
) = channel
::<i32>();
3027 assert_eq
!(format
!("{:?}", rx
), "Receiver { .. }");
3031 fn fmt_debug_sync_sender() {
3032 let (tx
, _
) = sync_channel
::<i32>(1);
3033 assert_eq
!(format
!("{:?}", tx
), "SyncSender { .. }");