]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/mod.rs
New upstream version 1.34.2+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mod.rs
CommitLineData
85aaf69f 1//! Multi-producer, single-consumer FIFO queue communication primitives.
1a4d82fc
JJ
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined among three types:
5//!
cc61c64b
XL
6//! * [`Sender`]
7//! * [`SyncSender`]
8//! * [`Receiver`]
1a4d82fc 9//!
cc61c64b 10//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
1a4d82fc
JJ
11//! senders are clone-able (multi-producer) such that many threads can send
12//! simultaneously to one receiver (single-consumer).
13//!
14//! These channels come in two flavors:
15//!
cc61c64b 16//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
1a4d82fc
JJ
17//! will return a `(Sender, Receiver)` tuple where all sends will be
18//! **asynchronous** (they never block). The channel conceptually has an
19//! infinite buffer.
20//!
cc61c64b
XL
21//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22//! return a `(SyncSender, Receiver)` tuple where the storage for pending
23//! messages is a pre-allocated buffer of a fixed size. All sends will be
1a4d82fc 24//! **synchronous** by blocking until there is buffer space available. Note
cc61c64b
XL
25//! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26//! channel where each sender atomically hands off a message to a receiver.
27//!
28//! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
29//! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
30//! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
31//! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
32//! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
33//! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
1a4d82fc
JJ
34//!
35//! ## Disconnection
36//!
cc61c64b 37//! The send and receive operations on channels will all return a [`Result`]
1a4d82fc
JJ
38//! indicating whether the operation succeeded or not. An unsuccessful operation
39//! is normally indicative of the other half of a channel having "hung up" by
40//! being dropped in its corresponding thread.
41//!
42//! Once half of a channel has been deallocated, most operations can no longer
cc61c64b
XL
43//! continue to make progress, so [`Err`] will be returned. Many applications
44//! will continue to [`unwrap`] the results returned from this module,
45//! instigating a propagation of failure among threads if one unexpectedly dies.
46//!
47//! [`Result`]: ../../../std/result/enum.Result.html
48//! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
49//! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
1a4d82fc
JJ
50//!
51//! # Examples
52//!
53//! Simple usage:
54//!
55//! ```
85aaf69f 56//! use std::thread;
1a4d82fc
JJ
57//! use std::sync::mpsc::channel;
58//!
59//! // Create a simple streaming channel
60//! let (tx, rx) = channel();
85aaf69f
SL
61//! thread::spawn(move|| {
62//! tx.send(10).unwrap();
1a4d82fc 63//! });
85aaf69f 64//! assert_eq!(rx.recv().unwrap(), 10);
1a4d82fc
JJ
65//! ```
66//!
67//! Shared usage:
68//!
69//! ```
85aaf69f 70//! use std::thread;
1a4d82fc
JJ
71//! use std::sync::mpsc::channel;
72//!
73//! // Create a shared channel that can be sent along from many threads
74//! // where tx is the sending half (tx for transmission), and rx is the receiving
75//! // half (rx for receiving).
76//! let (tx, rx) = channel();
85aaf69f 77//! for i in 0..10 {
1a4d82fc 78//! let tx = tx.clone();
85aaf69f 79//! thread::spawn(move|| {
1a4d82fc
JJ
80//! tx.send(i).unwrap();
81//! });
82//! }
83//!
85aaf69f 84//! for _ in 0..10 {
1a4d82fc
JJ
85//! let j = rx.recv().unwrap();
86//! assert!(0 <= j && j < 10);
87//! }
88//! ```
89//!
90//! Propagating panics:
91//!
92//! ```
93//! use std::sync::mpsc::channel;
94//!
95//! // The call to recv() will return an error because the channel has already
96//! // hung up (or been deallocated)
c34b1796 97//! let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
98//! drop(tx);
99//! assert!(rx.recv().is_err());
100//! ```
101//!
102//! Synchronous channels:
103//!
104//! ```
85aaf69f 105//! use std::thread;
1a4d82fc
JJ
106//! use std::sync::mpsc::sync_channel;
107//!
c34b1796 108//! let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 109//! thread::spawn(move|| {
bd371182 110//! // This will wait for the parent thread to start receiving
1a4d82fc
JJ
111//! tx.send(53).unwrap();
112//! });
113//! rx.recv().unwrap();
114//! ```
1a4d82fc 115
85aaf69f 116#![stable(feature = "rust1", since = "1.0.0")]
a1dfa0c6 117#![allow(deprecated)] // for mpsc_select
1a4d82fc
JJ
118
119// A description of how Rust's channel implementation works
120//
121// Channels are supposed to be the basic building block for all other
122// concurrent primitives that are used in Rust. As a result, the channel type
123// needs to be highly optimized, flexible, and broad enough for use everywhere.
124//
125// The choice of implementation of all channels is to be built on lock-free data
126// structures. The channels themselves are then consequently also lock-free data
127// structures. As always with lock-free code, this is a very "here be dragons"
128// territory, especially because I'm unaware of any academic papers that have
129// gone into great length about channels of these flavors.
130//
131// ## Flavors of channels
132//
133// From the perspective of a consumer of this library, there is only one flavor
134// of channel. This channel can be used as a stream and cloned to allow multiple
135// senders. Under the hood, however, there are actually three flavors of
136// channels in play.
137//
3157f602
XL
138// * Flavor::Oneshots - these channels are highly optimized for the one-send use
139// case. They contain as few atomics as possible and
140// involve one and exactly one allocation.
1a4d82fc
JJ
141// * Streams - these channels are optimized for the non-shared use case. They
142// use a different concurrent queue that is more tailored for this
143// use case. The initial allocation of this flavor of channel is not
144// optimized.
145// * Shared - this is the most general form of channel that this module offers,
146// a channel with multiple senders. This type is as optimized as it
147// can be, but the previous two types mentioned are much faster for
148// their use-cases.
149//
150// ## Concurrent queues
151//
3157f602
XL
152// The basic idea of Rust's Sender/Receiver types is that send() never blocks,
153// but recv() obviously blocks. This means that under the hood there must be
154// some shared and concurrent queue holding all of the actual data.
1a4d82fc
JJ
155//
156// With two flavors of channels, two flavors of queues are also used. We have
157// chosen to use queues from a well-known author that are abbreviated as SPSC
158// and MPSC (single producer, single consumer and multiple producer, single
159// consumer). SPSC queues are used for streams while MPSC queues are used for
160// shared channels.
161//
162// ### SPSC optimizations
163//
164// The SPSC queue found online is essentially a linked list of nodes where one
165// half of the nodes are the "queue of data" and the other half of nodes are a
166// cache of unused nodes. The unused nodes are used such that an allocation is
167// not required on every push() and a free doesn't need to happen on every
168// pop().
169//
170// As found online, however, the cache of nodes is of an infinite size. This
171// means that if a channel at one point in its life had 50k items in the queue,
172// then the queue will always have the capacity for 50k items. I believed that
173// this was an unnecessary limitation of the implementation, so I have altered
174// the queue to optionally have a bound on the cache size.
175//
176// By default, streams will have an unbounded SPSC queue with a small-ish cache
177// size. The hope is that the cache is still large enough to have very fast
178// send() operations while not too large such that millions of channels can
179// coexist at once.
180//
181// ### MPSC optimizations
182//
183// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
184// a linked list under the hood to earn its unboundedness, but I have not put
185// forth much effort into having a cache of nodes similar to the SPSC queue.
186//
187// For now, I believe that this is "ok" because shared channels are not the most
188// common type, but soon we may wish to revisit this queue choice and determine
189// another candidate for backend storage of shared channels.
190//
191// ## Overview of the Implementation
192//
193// Now that there's a little background on the concurrent queues used, it's
194// worth going into much more detail about the channels themselves. The basic
195// pseudocode for a send/recv are:
196//
197//
198// send(t) recv()
199// queue.push(t) return if queue.pop()
200// if increment() == -1 deschedule {
201// wakeup() if decrement() > 0
202// cancel_deschedule()
203// }
204// queue.pop()
205//
206// As mentioned before, there are no locks in this implementation, only atomic
207// instructions are used.
208//
209// ### The internal atomic counter
210//
211// Every channel has a shared counter with each half to keep track of the size
212// of the queue. This counter is used to abort descheduling by the receiver and
213// to know when to wake up on the sending side.
214//
215// As seen in the pseudocode, senders will increment this count and receivers
216// will decrement the count. The theory behind this is that if a sender sees a
217// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
218// then it doesn't need to block.
219//
220// The recv() method has a beginning call to pop(), and if successful, it needs
221// to decrement the count. It is a crucial implementation detail that this
222// decrement does *not* happen to the shared counter. If this were the case,
223// then it would be possible for the counter to be very negative when there were
224// no receivers waiting, in which case the senders would have to determine when
225// it was actually appropriate to wake up a receiver.
226//
227// Instead, the "steal count" is kept track of separately (not atomically
228// because it's only used by receivers), and then the decrement() call when
229// descheduling will lump in all of the recent steals into one large decrement.
230//
231// The implication of this is that if a sender sees a -1 count, then there's
232// guaranteed to be a waiter waiting!
233//
234// ## Native Implementation
235//
236// A major goal of these channels is to work seamlessly on and off the runtime.
237// All of the previous race conditions have been worded in terms of
238// scheduler-isms (which is obviously not available without the runtime).
239//
240// For now, native usage of channels (off the runtime) will fall back onto
241// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
242// is still entirely lock-free, the "deschedule" blocks above are surrounded by
243// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
244// condition variable.
245//
246// ## Select
247//
248// Being able to support selection over channels has greatly influenced this
249// design, and not only does selection need to work inside the runtime, but also
250// outside the runtime.
251//
252// The implementation is fairly straightforward. The goal of select() is not to
253// return some data, but only to return which channel can receive data without
254// blocking. The implementation is essentially the entire blocking procedure
255// followed by an increment as soon as its woken up. The cancellation procedure
256// involves an increment and swapping out of to_wake to acquire ownership of the
bd371182 257// thread to unblock.
1a4d82fc
JJ
258//
259// Sadly this current implementation requires multiple allocations, so I have
260// seen the throughput of select() be much worse than it should be. I do not
261// believe that there is anything fundamental that needs to change about these
262// channels, however, in order to support a more efficient select().
263//
264// # Conclusion
265//
266// And now that you've seen all the races that I found and attempted to fix,
267// here's the code for you to find some more!
268
1a4d82fc 269use sync::Arc;
c34b1796 270use error;
1a4d82fc 271use fmt;
1a4d82fc
JJ
272use mem;
273use cell::UnsafeCell;
3157f602 274use time::{Duration, Instant};
1a4d82fc 275
92a42be0 276#[unstable(feature = "mpsc_select", issue = "27800")]
1a4d82fc
JJ
277pub use self::select::{Select, Handle};
278use self::select::StartResult;
279use self::select::StartResult::*;
280use self::blocking::SignalToken;
281
9fa01778
XL
282#[cfg(all(test, not(target_os = "emscripten")))]
283mod select_tests;
284
1a4d82fc
JJ
285mod blocking;
286mod oneshot;
287mod select;
288mod shared;
289mod stream;
290mod sync;
291mod mpsc_queue;
292mod spsc_queue;
293
abe05a73
XL
294mod cache_aligned;
295
7cac9316
XL
296/// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
297/// This half can only be owned by one thread.
cc61c64b
XL
298///
299/// Messages sent to the channel can be retrieved using [`recv`].
300///
7cac9316
XL
301/// [`channel`]: fn.channel.html
302/// [`sync_channel`]: fn.sync_channel.html
303/// [`recv`]: struct.Receiver.html#method.recv
cc61c64b
XL
304///
305/// # Examples
306///
307/// ```rust
308/// use std::sync::mpsc::channel;
309/// use std::thread;
310/// use std::time::Duration;
311///
312/// let (send, recv) = channel();
313///
314/// thread::spawn(move || {
315/// send.send("Hello world!").unwrap();
316/// thread::sleep(Duration::from_secs(2)); // block for two seconds
317/// send.send("Delayed for 2 seconds").unwrap();
318/// });
319///
320/// println!("{}", recv.recv().unwrap()); // Received immediately
321/// println!("Waiting...");
322/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
323/// ```
85aaf69f 324#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
325pub struct Receiver<T> {
326 inner: UnsafeCell<Flavor<T>>,
327}
328
329// The receiver port can be sent from place to place, so long as it
330// is not used to receive non-sendable things.
92a42be0 331#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 332unsafe impl<T: Send> Send for Receiver<T> { }
1a4d82fc 333
54a0048b
SL
334#[stable(feature = "rust1", since = "1.0.0")]
335impl<T> !Sync for Receiver<T> { }
336
7cac9316
XL
337/// An iterator over messages on a [`Receiver`], created by [`iter`].
338///
339/// This iterator will block whenever [`next`] is called,
340/// waiting for a new message, and [`None`] will be returned
cc61c64b
XL
341/// when the corresponding channel has hung up.
342///
7cac9316
XL
343/// [`iter`]: struct.Receiver.html#method.iter
344/// [`Receiver`]: struct.Receiver.html
cc61c64b
XL
345/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
346/// [`None`]: ../../../std/option/enum.Option.html#variant.None
7cac9316
XL
347///
348/// # Examples
349///
350/// ```rust
351/// use std::sync::mpsc::channel;
352/// use std::thread;
353///
354/// let (send, recv) = channel();
355///
356/// thread::spawn(move || {
357/// send.send(1u8).unwrap();
358/// send.send(2u8).unwrap();
359/// send.send(3u8).unwrap();
360/// });
361///
362/// for x in recv.iter() {
363/// println!("Got: {}", x);
364/// }
365/// ```
85aaf69f 366#[stable(feature = "rust1", since = "1.0.0")]
32a655c1 367#[derive(Debug)]
c34b1796 368pub struct Iter<'a, T: 'a> {
1a4d82fc
JJ
369 rx: &'a Receiver<T>
370}
371
7cac9316
XL
372/// An iterator that attempts to yield all pending values for a [`Receiver`],
373/// created by [`try_iter`].
5bcae85e 374///
7cac9316
XL
375/// [`None`] will be returned when there are no pending values remaining or
376/// if the corresponding channel has hung up.
377///
378/// This iterator will never block the caller in order to wait for data to
cc61c64b
XL
379/// become available. Instead, it will return [`None`].
380///
7cac9316
XL
381/// [`Receiver`]: struct.Receiver.html
382/// [`try_iter`]: struct.Receiver.html#method.try_iter
cc61c64b 383/// [`None`]: ../../../std/option/enum.Option.html#variant.None
7cac9316
XL
384///
385/// # Examples
386///
387/// ```rust
388/// use std::sync::mpsc::channel;
389/// use std::thread;
390/// use std::time::Duration;
391///
392/// let (sender, receiver) = channel();
393///
394/// // Nothing is in the buffer yet
395/// assert!(receiver.try_iter().next().is_none());
396/// println!("Nothing in the buffer...");
397///
398/// thread::spawn(move || {
399/// sender.send(1).unwrap();
400/// sender.send(2).unwrap();
401/// sender.send(3).unwrap();
402/// });
403///
404/// println!("Going to sleep...");
405/// thread::sleep(Duration::from_secs(2)); // block for two seconds
406///
407/// for x in receiver.try_iter() {
408/// println!("Got: {}", x);
409/// }
410/// ```
476ff2be 411#[stable(feature = "receiver_try_iter", since = "1.15.0")]
32a655c1 412#[derive(Debug)]
5bcae85e
SL
413pub struct TryIter<'a, T: 'a> {
414 rx: &'a Receiver<T>
415}
416
7cac9316
XL
417/// An owning iterator over messages on a [`Receiver`],
418/// created by **Receiver::into_iter**.
cc61c64b 419///
7cac9316
XL
420/// This iterator will block whenever [`next`]
421/// is called, waiting for a new message, and [`None`] will be
422/// returned if the corresponding channel has hung up.
423///
424/// [`Receiver`]: struct.Receiver.html
cc61c64b
XL
425/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
426/// [`None`]: ../../../std/option/enum.Option.html#variant.None
427///
7cac9316
XL
428/// # Examples
429///
430/// ```rust
431/// use std::sync::mpsc::channel;
432/// use std::thread;
433///
434/// let (send, recv) = channel();
435///
436/// thread::spawn(move || {
437/// send.send(1u8).unwrap();
438/// send.send(2u8).unwrap();
439/// send.send(3u8).unwrap();
440/// });
441///
442/// for x in recv.into_iter() {
443/// println!("Got: {}", x);
444/// }
445/// ```
d9579d0f 446#[stable(feature = "receiver_into_iter", since = "1.1.0")]
32a655c1 447#[derive(Debug)]
d9579d0f
AL
448pub struct IntoIter<T> {
449 rx: Receiver<T>
450}
451
7cac9316 452/// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
bd371182 453/// owned by one thread, but it can be cloned to send to other threads.
cc61c64b
XL
454///
455/// Messages can be sent through this channel with [`send`].
456///
7cac9316
XL
457/// [`channel`]: fn.channel.html
458/// [`send`]: struct.Sender.html#method.send
cc61c64b
XL
459///
460/// # Examples
461///
462/// ```rust
463/// use std::sync::mpsc::channel;
464/// use std::thread;
465///
466/// let (sender, receiver) = channel();
467/// let sender2 = sender.clone();
468///
469/// // First thread owns sender
470/// thread::spawn(move || {
471/// sender.send(1).unwrap();
472/// });
473///
474/// // Second thread owns sender2
475/// thread::spawn(move || {
476/// sender2.send(2).unwrap();
477/// });
478///
479/// let msg = receiver.recv().unwrap();
480/// let msg2 = receiver.recv().unwrap();
481///
482/// assert_eq!(3, msg + msg2);
483/// ```
85aaf69f 484#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
485pub struct Sender<T> {
486 inner: UnsafeCell<Flavor<T>>,
487}
488
489// The send port can be sent from place to place, so long as it
490// is not used to send non-sendable things.
92a42be0 491#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 492unsafe impl<T: Send> Send for Sender<T> { }
1a4d82fc 493
54a0048b
SL
494#[stable(feature = "rust1", since = "1.0.0")]
495impl<T> !Sync for Sender<T> { }
496
7cac9316 497/// The sending-half of Rust's synchronous [`sync_channel`] type.
cc61c64b 498///
7cac9316
XL
499/// Messages can be sent through this channel with [`send`] or [`try_send`].
500///
501/// [`send`] will block if there is no space in the internal buffer.
502///
503/// [`sync_channel`]: fn.sync_channel.html
504/// [`send`]: struct.SyncSender.html#method.send
505/// [`try_send`]: struct.SyncSender.html#method.try_send
cc61c64b 506///
7cac9316
XL
507/// # Examples
508///
509/// ```rust
510/// use std::sync::mpsc::sync_channel;
511/// use std::thread;
512///
513/// // Create a sync_channel with buffer size 2
514/// let (sync_sender, receiver) = sync_channel(2);
515/// let sync_sender2 = sync_sender.clone();
516///
517/// // First thread owns sync_sender
518/// thread::spawn(move || {
519/// sync_sender.send(1).unwrap();
520/// sync_sender.send(2).unwrap();
521/// });
522///
523/// // Second thread owns sync_sender2
524/// thread::spawn(move || {
525/// sync_sender2.send(3).unwrap();
526/// // thread will now block since the buffer is full
527/// println!("Thread unblocked!");
528/// });
529///
530/// let mut msg;
531///
532/// msg = receiver.recv().unwrap();
533/// println!("message {} received", msg);
534///
535/// // "Thread unblocked!" will be printed now
536///
537/// msg = receiver.recv().unwrap();
538/// println!("message {} received", msg);
539///
540/// msg = receiver.recv().unwrap();
541///
542/// println!("message {} received", msg);
543/// ```
85aaf69f 544#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 545pub struct SyncSender<T> {
476ff2be 546 inner: Arc<sync::Packet<T>>,
1a4d82fc
JJ
547}
548
92a42be0 549#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 550unsafe impl<T: Send> Send for SyncSender<T> {}
85aaf69f 551
cc61c64b
XL
552/// An error returned from the [`Sender::send`] or [`SyncSender::send`]
553/// function on **channel**s.
1a4d82fc 554///
cc61c64b 555/// A **send** operation can only fail if the receiving end of a channel is
1a4d82fc
JJ
556/// disconnected, implying that the data could never be received. The error
557/// contains the data being sent as a payload so it can be recovered.
cc61c64b
XL
558///
559/// [`Sender::send`]: struct.Sender.html#method.send
560/// [`SyncSender::send`]: struct.SyncSender.html#method.send
85aaf69f
SL
561#[stable(feature = "rust1", since = "1.0.0")]
562#[derive(PartialEq, Eq, Clone, Copy)]
c34b1796 563pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
1a4d82fc 564
cc61c64b
XL
565/// An error returned from the [`recv`] function on a [`Receiver`].
566///
567/// The [`recv`] operation can only fail if the sending half of a
568/// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
569/// messages will ever be received.
1a4d82fc 570///
cc61c64b
XL
571/// [`recv`]: struct.Receiver.html#method.recv
572/// [`Receiver`]: struct.Receiver.html
573/// [`channel`]: fn.channel.html
574/// [`sync_channel`]: fn.sync_channel.html
85aaf69f
SL
575#[derive(PartialEq, Eq, Clone, Copy, Debug)]
576#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
577pub struct RecvError;
578
cc61c64b
XL
579/// This enumeration is the list of the possible reasons that [`try_recv`] could
580/// not return data when called. This can occur with both a [`channel`] and
581/// a [`sync_channel`].
582///
583/// [`try_recv`]: struct.Receiver.html#method.try_recv
584/// [`channel`]: fn.channel.html
585/// [`sync_channel`]: fn.sync_channel.html
85aaf69f
SL
586#[derive(PartialEq, Eq, Clone, Copy, Debug)]
587#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 588pub enum TryRecvError {
cc61c64b 589 /// This **channel** is currently empty, but the **Sender**(s) have not yet
1a4d82fc 590 /// disconnected, so data may yet become available.
85aaf69f 591 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
592 Empty,
593
cc61c64b
XL
594 /// The **channel**'s sending half has become disconnected, and there will
595 /// never be any more data received on it.
85aaf69f 596 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
597 Disconnected,
598}
599
cc61c64b
XL
600/// This enumeration is the list of possible errors that made [`recv_timeout`]
601/// unable to return data when called. This can occur with both a [`channel`] and
602/// a [`sync_channel`].
603///
604/// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
605/// [`channel`]: fn.channel.html
606/// [`sync_channel`]: fn.sync_channel.html
3157f602 607#[derive(PartialEq, Eq, Clone, Copy, Debug)]
5bcae85e 608#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602 609pub enum RecvTimeoutError {
cc61c64b 610 /// This **channel** is currently empty, but the **Sender**(s) have not yet
3157f602 611 /// disconnected, so data may yet become available.
5bcae85e 612 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602 613 Timeout,
cc61c64b
XL
614 /// The **channel**'s sending half has become disconnected, and there will
615 /// never be any more data received on it.
5bcae85e 616 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602
XL
617 Disconnected,
618}
619
1a4d82fc 620/// This enumeration is the list of the possible error outcomes for the
cc61c64b
XL
621/// [`try_send`] method.
622///
623/// [`try_send`]: struct.SyncSender.html#method.try_send
85aaf69f
SL
624#[stable(feature = "rust1", since = "1.0.0")]
625#[derive(PartialEq, Eq, Clone, Copy)]
1a4d82fc 626pub enum TrySendError<T> {
cc61c64b 627 /// The data could not be sent on the [`sync_channel`] because it would require that
1a4d82fc
JJ
628 /// the callee block to send the data.
629 ///
630 /// If this is a buffered channel, then the buffer is full at this time. If
cc61c64b 631 /// this is not a buffered channel, then there is no [`Receiver`] available to
1a4d82fc 632 /// acquire the data.
cc61c64b
XL
633 ///
634 /// [`sync_channel`]: fn.sync_channel.html
635 /// [`Receiver`]: struct.Receiver.html
85aaf69f 636 #[stable(feature = "rust1", since = "1.0.0")]
7453a54e 637 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
1a4d82fc 638
cc61c64b 639 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
1a4d82fc 640 /// sent. The data is returned back to the callee in this case.
cc61c64b
XL
641 ///
642 /// [`sync_channel`]: fn.sync_channel.html
85aaf69f 643 #[stable(feature = "rust1", since = "1.0.0")]
7453a54e 644 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
1a4d82fc
JJ
645}
646
647enum Flavor<T> {
476ff2be
SL
648 Oneshot(Arc<oneshot::Packet<T>>),
649 Stream(Arc<stream::Packet<T>>),
650 Shared(Arc<shared::Packet<T>>),
651 Sync(Arc<sync::Packet<T>>),
1a4d82fc
JJ
652}
653
654#[doc(hidden)]
655trait UnsafeFlavor<T> {
e9174d1e 656 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
7453a54e 657 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
1a4d82fc
JJ
658 &mut *self.inner_unsafe().get()
659 }
7453a54e 660 unsafe fn inner(&self) -> &Flavor<T> {
1a4d82fc
JJ
661 &*self.inner_unsafe().get()
662 }
663}
664impl<T> UnsafeFlavor<T> for Sender<T> {
e9174d1e 665 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
1a4d82fc
JJ
666 &self.inner
667 }
668}
669impl<T> UnsafeFlavor<T> for Receiver<T> {
e9174d1e 670 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
1a4d82fc
JJ
671 &self.inner
672 }
673}
674
675/// Creates a new asynchronous channel, returning the sender/receiver halves.
cc61c64b
XL
676/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
677/// the same order as it was sent, and no [`send`] will block the calling thread
678/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
679/// block after its buffer limit is reached). [`recv`] will block until a message
680/// is available.
681///
682/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
683/// only one [`Receiver`] is supported.
1a4d82fc 684///
cc61c64b 685/// If the [`Receiver`] is disconnected while trying to [`send`] with the
8faf50e0 686/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
cc61c64b
XL
687/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
688/// return a [`RecvError`].
476ff2be 689///
cc61c64b
XL
690/// [`send`]: struct.Sender.html#method.send
691/// [`recv`]: struct.Receiver.html#method.recv
692/// [`Sender`]: struct.Sender.html
693/// [`Receiver`]: struct.Receiver.html
694/// [`sync_channel`]: fn.sync_channel.html
695/// [`SendError`]: struct.SendError.html
696/// [`RecvError`]: struct.RecvError.html
476ff2be 697///
c34b1796 698/// # Examples
1a4d82fc
JJ
699///
700/// ```
701/// use std::sync::mpsc::channel;
85aaf69f 702/// use std::thread;
1a4d82fc 703///
cc61c64b 704/// let (sender, receiver) = channel();
1a4d82fc
JJ
705///
706/// // Spawn off an expensive computation
85aaf69f 707/// thread::spawn(move|| {
1a4d82fc 708/// # fn expensive_computation() {}
cc61c64b 709/// sender.send(expensive_computation()).unwrap();
1a4d82fc
JJ
710/// });
711///
712/// // Do some useful work for awhile
713///
714/// // Let's see what that answer was
cc61c64b 715/// println!("{:?}", receiver.recv().unwrap());
1a4d82fc 716/// ```
85aaf69f 717#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 718pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
476ff2be 719 let a = Arc::new(oneshot::Packet::new());
1a4d82fc
JJ
720 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
721}
722
723/// Creates a new synchronous, bounded channel.
cc61c64b
XL
724/// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
725/// in the same order as it was sent. Like asynchronous [`channel`]s, the
726/// [`Receiver`] will block until a message becomes available. `sync_channel`
727/// differs greatly in the semantics of the sender, however.
1a4d82fc 728///
476ff2be
SL
729/// This channel has an internal buffer on which messages will be queued.
730/// `bound` specifies the buffer size. When the internal buffer becomes full,
731/// future sends will *block* waiting for the buffer to open up. Note that a
732/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
cc61c64b
XL
733/// where each [`send`] will not return until a [`recv`] is paired with it.
734///
735/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
736/// times, but only one [`Receiver`] is supported.
1a4d82fc 737///
cc61c64b
XL
738/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
739/// to [`send`] with the [`SyncSender`], the [`send`] method will return a
740/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
741/// to [`recv`], the [`recv`] method will return a [`RecvError`].
476ff2be 742///
cc61c64b
XL
743/// [`channel`]: fn.channel.html
744/// [`send`]: struct.SyncSender.html#method.send
745/// [`recv`]: struct.Receiver.html#method.recv
746/// [`SyncSender`]: struct.SyncSender.html
747/// [`Receiver`]: struct.Receiver.html
748/// [`SendError`]: struct.SendError.html
749/// [`RecvError`]: struct.RecvError.html
1a4d82fc 750///
c34b1796 751/// # Examples
1a4d82fc
JJ
752///
753/// ```
754/// use std::sync::mpsc::sync_channel;
85aaf69f 755/// use std::thread;
1a4d82fc 756///
cc61c64b 757/// let (sender, receiver) = sync_channel(1);
1a4d82fc
JJ
758///
759/// // this returns immediately
cc61c64b 760/// sender.send(1).unwrap();
1a4d82fc 761///
85aaf69f 762/// thread::spawn(move|| {
1a4d82fc 763/// // this will block until the previous message has been received
cc61c64b 764/// sender.send(2).unwrap();
1a4d82fc
JJ
765/// });
766///
cc61c64b
XL
767/// assert_eq!(receiver.recv().unwrap(), 1);
768/// assert_eq!(receiver.recv().unwrap(), 2);
1a4d82fc 769/// ```
85aaf69f 770#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 771pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
476ff2be 772 let a = Arc::new(sync::Packet::new(bound));
1a4d82fc
JJ
773 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
774}
775
776////////////////////////////////////////////////////////////////////////////////
777// Sender
778////////////////////////////////////////////////////////////////////////////////
779
c34b1796 780impl<T> Sender<T> {
1a4d82fc
JJ
781 fn new(inner: Flavor<T>) -> Sender<T> {
782 Sender {
783 inner: UnsafeCell::new(inner),
784 }
785 }
786
787 /// Attempts to send a value on this channel, returning it back if it could
788 /// not be sent.
789 ///
790 /// A successful send occurs when it is determined that the other end of
791 /// the channel has not hung up already. An unsuccessful send would be one
792 /// where the corresponding receiver has already been deallocated. Note
cc61c64b
XL
793 /// that a return value of [`Err`] means that the data will never be
794 /// received, but a return value of [`Ok`] does *not* mean that the data
9fa01778 795 /// will be received. It is possible for the corresponding receiver to
cc61c64b
XL
796 /// hang up immediately after this function returns [`Ok`].
797 ///
798 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
799 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
1a4d82fc
JJ
800 ///
801 /// This method will never block the current thread.
802 ///
c34b1796 803 /// # Examples
1a4d82fc
JJ
804 ///
805 /// ```
806 /// use std::sync::mpsc::channel;
807 ///
808 /// let (tx, rx) = channel();
809 ///
810 /// // This send is always successful
85aaf69f 811 /// tx.send(1).unwrap();
1a4d82fc
JJ
812 ///
813 /// // This send will fail because the receiver is gone
814 /// drop(rx);
a7813a04 815 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
1a4d82fc 816 /// ```
85aaf69f 817 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
818 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
819 let (new_inner, ret) = match *unsafe { self.inner() } {
820 Flavor::Oneshot(ref p) => {
476ff2be
SL
821 if !p.sent() {
822 return p.send(t).map_err(SendError);
823 } else {
824 let a = Arc::new(stream::Packet::new());
825 let rx = Receiver::new(Flavor::Stream(a.clone()));
826 match p.upgrade(rx) {
827 oneshot::UpSuccess => {
828 let ret = a.send(t);
829 (a, ret)
830 }
831 oneshot::UpDisconnected => (a, Err(t)),
832 oneshot::UpWoke(token) => {
833 // This send cannot panic because the thread is
834 // asleep (we're looking at it), so the receiver
835 // can't go away.
836 a.send(t).ok().unwrap();
837 token.signal();
838 (a, Ok(()))
1a4d82fc
JJ
839 }
840 }
841 }
842 }
476ff2be
SL
843 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
844 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
1a4d82fc
JJ
845 Flavor::Sync(..) => unreachable!(),
846 };
847
848 unsafe {
849 let tmp = Sender::new(Flavor::Stream(new_inner));
850 mem::swap(self.inner_mut(), tmp.inner_mut());
851 }
852 ret.map_err(SendError)
853 }
854}
855
85aaf69f 856#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 857impl<T> Clone for Sender<T> {
1a4d82fc 858 fn clone(&self) -> Sender<T> {
476ff2be 859 let packet = match *unsafe { self.inner() } {
1a4d82fc 860 Flavor::Oneshot(ref p) => {
476ff2be
SL
861 let a = Arc::new(shared::Packet::new());
862 {
863 let guard = a.postinit_lock();
1a4d82fc 864 let rx = Receiver::new(Flavor::Shared(a.clone()));
476ff2be 865 let sleeper = match p.upgrade(rx) {
1a4d82fc 866 oneshot::UpSuccess |
476ff2be
SL
867 oneshot::UpDisconnected => None,
868 oneshot::UpWoke(task) => Some(task),
869 };
870 a.inherit_blocker(sleeper, guard);
1a4d82fc 871 }
476ff2be 872 a
1a4d82fc
JJ
873 }
874 Flavor::Stream(ref p) => {
476ff2be
SL
875 let a = Arc::new(shared::Packet::new());
876 {
877 let guard = a.postinit_lock();
1a4d82fc 878 let rx = Receiver::new(Flavor::Shared(a.clone()));
476ff2be 879 let sleeper = match p.upgrade(rx) {
1a4d82fc 880 stream::UpSuccess |
476ff2be
SL
881 stream::UpDisconnected => None,
882 stream::UpWoke(task) => Some(task),
883 };
884 a.inherit_blocker(sleeper, guard);
1a4d82fc 885 }
476ff2be 886 a
1a4d82fc
JJ
887 }
888 Flavor::Shared(ref p) => {
476ff2be 889 p.clone_chan();
1a4d82fc
JJ
890 return Sender::new(Flavor::Shared(p.clone()));
891 }
892 Flavor::Sync(..) => unreachable!(),
893 };
894
895 unsafe {
1a4d82fc
JJ
896 let tmp = Sender::new(Flavor::Shared(packet.clone()));
897 mem::swap(self.inner_mut(), tmp.inner_mut());
898 }
899 Sender::new(Flavor::Shared(packet))
900 }
901}
902
85aaf69f 903#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 904impl<T> Drop for Sender<T> {
1a4d82fc 905 fn drop(&mut self) {
476ff2be
SL
906 match *unsafe { self.inner() } {
907 Flavor::Oneshot(ref p) => p.drop_chan(),
908 Flavor::Stream(ref p) => p.drop_chan(),
909 Flavor::Shared(ref p) => p.drop_chan(),
1a4d82fc
JJ
910 Flavor::Sync(..) => unreachable!(),
911 }
912 }
913}
914
7cac9316 915#[stable(feature = "mpsc_debug", since = "1.8.0")]
7453a54e
SL
916impl<T> fmt::Debug for Sender<T> {
917 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
abe05a73 918 f.debug_struct("Sender").finish()
7453a54e
SL
919 }
920}
921
1a4d82fc
JJ
922////////////////////////////////////////////////////////////////////////////////
923// SyncSender
924////////////////////////////////////////////////////////////////////////////////
925
c34b1796 926impl<T> SyncSender<T> {
476ff2be 927 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
a1dfa0c6 928 SyncSender { inner }
1a4d82fc
JJ
929 }
930
931 /// Sends a value on this synchronous channel.
932 ///
933 /// This function will *block* until space in the internal buffer becomes
934 /// available or a receiver is available to hand off the message to.
935 ///
936 /// Note that a successful send does *not* guarantee that the receiver will
937 /// ever see the data if there is a buffer on this channel. Items may be
938 /// enqueued in the internal buffer for the receiver to receive at a later
7cac9316
XL
939 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
940 /// channel and it guarantees that the receiver has indeed received
941 /// the data if this function returns success.
1a4d82fc 942 ///
cc61c64b
XL
943 /// This function will never panic, but it may return [`Err`] if the
944 /// [`Receiver`] has disconnected and is no longer able to receive
1a4d82fc 945 /// information.
cc61c64b
XL
946 ///
947 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
948 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
7cac9316
XL
949 ///
950 /// # Examples
951 ///
952 /// ```rust
953 /// use std::sync::mpsc::sync_channel;
954 /// use std::thread;
955 ///
956 /// // Create a rendezvous sync_channel with buffer size 0
957 /// let (sync_sender, receiver) = sync_channel(0);
958 ///
959 /// thread::spawn(move || {
960 /// println!("sending message...");
961 /// sync_sender.send(1).unwrap();
962 /// // Thread is now blocked until the message is received
963 ///
964 /// println!("...message received!");
965 /// });
966 ///
967 /// let msg = receiver.recv().unwrap();
968 /// assert_eq!(1, msg);
969 /// ```
85aaf69f 970 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 971 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
476ff2be 972 self.inner.send(t).map_err(SendError)
1a4d82fc
JJ
973 }
974
975 /// Attempts to send a value on this channel without blocking.
976 ///
cc61c64b 977 /// This method differs from [`send`] by returning immediately if the
1a4d82fc 978 /// channel's buffer is full or no receiver is waiting to acquire some
cc61c64b 979 /// data. Compared with [`send`], this function has two failure cases
1a4d82fc
JJ
980 /// instead of one (one for disconnection, one for a full buffer).
981 ///
7cac9316 982 /// See [`send`] for notes about guarantees of whether the
1a4d82fc 983 /// receiver has received the data or not if this function is successful.
cc61c64b 984 ///
7cac9316
XL
985 /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
986 ///
987 /// # Examples
988 ///
989 /// ```rust
990 /// use std::sync::mpsc::sync_channel;
991 /// use std::thread;
992 ///
993 /// // Create a sync_channel with buffer size 1
994 /// let (sync_sender, receiver) = sync_channel(1);
995 /// let sync_sender2 = sync_sender.clone();
996 ///
997 /// // First thread owns sync_sender
998 /// thread::spawn(move || {
999 /// sync_sender.send(1).unwrap();
1000 /// sync_sender.send(2).unwrap();
1001 /// // Thread blocked
1002 /// });
1003 ///
1004 /// // Second thread owns sync_sender2
1005 /// thread::spawn(move || {
1006 /// // This will return an error and send
1007 /// // no message if the buffer is full
1008 /// sync_sender2.try_send(3).is_err();
1009 /// });
1010 ///
1011 /// let mut msg;
1012 /// msg = receiver.recv().unwrap();
1013 /// println!("message {} received", msg);
1014 ///
1015 /// msg = receiver.recv().unwrap();
1016 /// println!("message {} received", msg);
1017 ///
1018 /// // Third message may have never been sent
1019 /// match receiver.try_recv() {
1020 /// Ok(msg) => println!("message {} received", msg),
1021 /// Err(_) => println!("the third message was never sent"),
1022 /// }
1023 /// ```
85aaf69f 1024 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 1025 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
476ff2be 1026 self.inner.try_send(t)
1a4d82fc
JJ
1027 }
1028}
1029
85aaf69f 1030#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 1031impl<T> Clone for SyncSender<T> {
1a4d82fc 1032 fn clone(&self) -> SyncSender<T> {
476ff2be 1033 self.inner.clone_chan();
e9174d1e 1034 SyncSender::new(self.inner.clone())
1a4d82fc
JJ
1035 }
1036}
1037
85aaf69f 1038#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 1039impl<T> Drop for SyncSender<T> {
1a4d82fc 1040 fn drop(&mut self) {
476ff2be 1041 self.inner.drop_chan();
1a4d82fc
JJ
1042 }
1043}
1044
7cac9316 1045#[stable(feature = "mpsc_debug", since = "1.8.0")]
7453a54e
SL
1046impl<T> fmt::Debug for SyncSender<T> {
1047 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
abe05a73 1048 f.debug_struct("SyncSender").finish()
7453a54e
SL
1049 }
1050}
1051
1a4d82fc
JJ
1052////////////////////////////////////////////////////////////////////////////////
1053// Receiver
1054////////////////////////////////////////////////////////////////////////////////
1055
c34b1796 1056impl<T> Receiver<T> {
1a4d82fc
JJ
1057 fn new(inner: Flavor<T>) -> Receiver<T> {
1058 Receiver { inner: UnsafeCell::new(inner) }
1059 }
1060
7cac9316 1061 /// Attempts to return a pending value on this receiver without blocking.
1a4d82fc
JJ
1062 ///
1063 /// This method will never block the caller in order to wait for data to
1064 /// become available. Instead, this will always return immediately with a
1065 /// possible option of pending data on the channel.
1066 ///
1067 /// This is useful for a flavor of "optimistic check" before deciding to
1068 /// block on a receiver.
7cac9316
XL
1069 ///
1070 /// Compared with [`recv`], this function has two failure cases instead of one
1071 /// (one for disconnection, one for an empty buffer).
1072 ///
1073 /// [`recv`]: struct.Receiver.html#method.recv
1074 ///
1075 /// # Examples
1076 ///
1077 /// ```rust
1078 /// use std::sync::mpsc::{Receiver, channel};
1079 ///
1080 /// let (_, receiver): (_, Receiver<i32>) = channel();
1081 ///
1082 /// assert!(receiver.try_recv().is_err());
1083 /// ```
85aaf69f 1084 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
1085 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1086 loop {
1087 let new_port = match *unsafe { self.inner() } {
1088 Flavor::Oneshot(ref p) => {
476ff2be 1089 match p.try_recv() {
1a4d82fc
JJ
1090 Ok(t) => return Ok(t),
1091 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1092 Err(oneshot::Disconnected) => {
1093 return Err(TryRecvError::Disconnected)
1094 }
1095 Err(oneshot::Upgraded(rx)) => rx,
1096 }
1097 }
1098 Flavor::Stream(ref p) => {
476ff2be 1099 match p.try_recv() {
1a4d82fc
JJ
1100 Ok(t) => return Ok(t),
1101 Err(stream::Empty) => return Err(TryRecvError::Empty),
1102 Err(stream::Disconnected) => {
1103 return Err(TryRecvError::Disconnected)
1104 }
1105 Err(stream::Upgraded(rx)) => rx,
1106 }
1107 }
1108 Flavor::Shared(ref p) => {
476ff2be 1109 match p.try_recv() {
1a4d82fc
JJ
1110 Ok(t) => return Ok(t),
1111 Err(shared::Empty) => return Err(TryRecvError::Empty),
1112 Err(shared::Disconnected) => {
1113 return Err(TryRecvError::Disconnected)
1114 }
1115 }
1116 }
1117 Flavor::Sync(ref p) => {
476ff2be 1118 match p.try_recv() {
1a4d82fc
JJ
1119 Ok(t) => return Ok(t),
1120 Err(sync::Empty) => return Err(TryRecvError::Empty),
1121 Err(sync::Disconnected) => {
1122 return Err(TryRecvError::Disconnected)
1123 }
1124 }
1125 }
1126 };
1127 unsafe {
1128 mem::swap(self.inner_mut(),
1129 new_port.inner_mut());
1130 }
1131 }
1132 }
1133
9346a6ac 1134 /// Attempts to wait for a value on this receiver, returning an error if the
1a4d82fc
JJ
1135 /// corresponding channel has hung up.
1136 ///
1137 /// This function will always block the current thread if there is no data
1138 /// available and it's possible for more data to be sent. Once a message is
7cac9316
XL
1139 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1140 /// receiver will wake up and return that message.
1a4d82fc 1141 ///
cc61c64b
XL
1142 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1143 /// this call is blocking, this call will wake up and return [`Err`] to
1a4d82fc 1144 /// indicate that no more messages can ever be received on this channel.
c1a9b12d
SL
1145 /// However, since channels are buffered, messages sent before the disconnect
1146 /// will still be properly received.
1147 ///
7cac9316
XL
1148 /// [`Sender`]: struct.Sender.html
1149 /// [`SyncSender`]: struct.SyncSender.html
cc61c64b
XL
1150 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1151 ///
c1a9b12d
SL
1152 /// # Examples
1153 ///
1154 /// ```
1155 /// use std::sync::mpsc;
1156 /// use std::thread;
1157 ///
1158 /// let (send, recv) = mpsc::channel();
1159 /// let handle = thread::spawn(move || {
1160 /// send.send(1u8).unwrap();
1161 /// });
1162 ///
1163 /// handle.join().unwrap();
1164 ///
1165 /// assert_eq!(Ok(1), recv.recv());
1166 /// ```
1167 ///
1168 /// Buffering behavior:
1169 ///
1170 /// ```
1171 /// use std::sync::mpsc;
1172 /// use std::thread;
1173 /// use std::sync::mpsc::RecvError;
1174 ///
1175 /// let (send, recv) = mpsc::channel();
1176 /// let handle = thread::spawn(move || {
1177 /// send.send(1u8).unwrap();
1178 /// send.send(2).unwrap();
1179 /// send.send(3).unwrap();
1180 /// drop(send);
1181 /// });
1182 ///
1183 /// // wait for the thread to join so we ensure the sender is dropped
1184 /// handle.join().unwrap();
1185 ///
1186 /// assert_eq!(Ok(1), recv.recv());
1187 /// assert_eq!(Ok(2), recv.recv());
1188 /// assert_eq!(Ok(3), recv.recv());
1189 /// assert_eq!(Err(RecvError), recv.recv());
1190 /// ```
85aaf69f 1191 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
1192 pub fn recv(&self) -> Result<T, RecvError> {
1193 loop {
1194 let new_port = match *unsafe { self.inner() } {
1195 Flavor::Oneshot(ref p) => {
476ff2be 1196 match p.recv(None) {
1a4d82fc 1197 Ok(t) => return Ok(t),
1a4d82fc
JJ
1198 Err(oneshot::Disconnected) => return Err(RecvError),
1199 Err(oneshot::Upgraded(rx)) => rx,
3157f602 1200 Err(oneshot::Empty) => unreachable!(),
1a4d82fc
JJ
1201 }
1202 }
1203 Flavor::Stream(ref p) => {
476ff2be 1204 match p.recv(None) {
1a4d82fc 1205 Ok(t) => return Ok(t),
1a4d82fc
JJ
1206 Err(stream::Disconnected) => return Err(RecvError),
1207 Err(stream::Upgraded(rx)) => rx,
3157f602 1208 Err(stream::Empty) => unreachable!(),
1a4d82fc
JJ
1209 }
1210 }
1211 Flavor::Shared(ref p) => {
476ff2be 1212 match p.recv(None) {
1a4d82fc 1213 Ok(t) => return Ok(t),
1a4d82fc 1214 Err(shared::Disconnected) => return Err(RecvError),
3157f602 1215 Err(shared::Empty) => unreachable!(),
1a4d82fc
JJ
1216 }
1217 }
476ff2be 1218 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1a4d82fc
JJ
1219 };
1220 unsafe {
1221 mem::swap(self.inner_mut(), new_port.inner_mut());
1222 }
1223 }
1224 }
1225
3157f602
XL
1226 /// Attempts to wait for a value on this receiver, returning an error if the
1227 /// corresponding channel has hung up, or if it waits more than `timeout`.
1228 ///
1229 /// This function will always block the current thread if there is no data
1230 /// available and it's possible for more data to be sent. Once a message is
7cac9316
XL
1231 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1232 /// receiver will wake up and return that message.
3157f602 1233 ///
cc61c64b
XL
1234 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1235 /// this call is blocking, this call will wake up and return [`Err`] to
3157f602
XL
1236 /// indicate that no more messages can ever be received on this channel.
1237 /// However, since channels are buffered, messages sent before the disconnect
1238 /// will still be properly received.
1239 ///
7cac9316
XL
1240 /// [`Sender`]: struct.Sender.html
1241 /// [`SyncSender`]: struct.SyncSender.html
cc61c64b
XL
1242 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1243 ///
b7449926
XL
1244 /// # Known Issues
1245 ///
1246 /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1247 /// to panic unexpectedly with the following example:
1248 ///
1249 /// ```no_run
1250 /// use std::sync::mpsc::channel;
1251 /// use std::thread;
1252 /// use std::time::Duration;
1253 ///
1254 /// let (tx, rx) = channel::<String>();
1255 ///
1256 /// thread::spawn(move || {
1257 /// let d = Duration::from_millis(10);
1258 /// loop {
1259 /// println!("recv");
1260 /// let _r = rx.recv_timeout(d);
1261 /// }
1262 /// });
1263 ///
1264 /// thread::sleep(Duration::from_millis(100));
1265 /// let _c1 = tx.clone();
1266 ///
1267 /// thread::sleep(Duration::from_secs(1));
1268 /// ```
1269 ///
1270 /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1271 ///
3157f602
XL
1272 /// # Examples
1273 ///
7cac9316
XL
1274 /// Successfully receiving value before encountering timeout:
1275 ///
3157f602 1276 /// ```no_run
7cac9316
XL
1277 /// use std::thread;
1278 /// use std::time::Duration;
1279 /// use std::sync::mpsc;
1280 ///
1281 /// let (send, recv) = mpsc::channel();
1282 ///
1283 /// thread::spawn(move || {
1284 /// send.send('a').unwrap();
1285 /// });
1286 ///
1287 /// assert_eq!(
1288 /// recv.recv_timeout(Duration::from_millis(400)),
1289 /// Ok('a')
1290 /// );
1291 /// ```
1292 ///
1293 /// Receiving an error upon reaching timeout:
1294 ///
1295 /// ```no_run
1296 /// use std::thread;
3157f602 1297 /// use std::time::Duration;
7cac9316 1298 /// use std::sync::mpsc;
3157f602 1299 ///
7cac9316 1300 /// let (send, recv) = mpsc::channel();
3157f602 1301 ///
7cac9316
XL
1302 /// thread::spawn(move || {
1303 /// thread::sleep(Duration::from_millis(800));
1304 /// send.send('a').unwrap();
1305 /// });
1306 ///
1307 /// assert_eq!(
1308 /// recv.recv_timeout(Duration::from_millis(400)),
1309 /// Err(mpsc::RecvTimeoutError::Timeout)
1310 /// );
3157f602 1311 /// ```
5bcae85e 1312 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602
XL
1313 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1314 // Do an optimistic try_recv to avoid the performance impact of
1315 // Instant::now() in the full-channel case.
1316 match self.try_recv() {
0731742a
XL
1317 Ok(result) => Ok(result),
1318 Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1319 Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1320 Some(deadline) => self.recv_deadline(deadline),
1321 // So far in the future that it's practically the same as waiting indefinitely.
1322 None => self.recv().map_err(RecvTimeoutError::from),
1323 },
3157f602
XL
1324 }
1325 }
1326
ff7c6d11
XL
1327 /// Attempts to wait for a value on this receiver, returning an error if the
1328 /// corresponding channel has hung up, or if `deadline` is reached.
1329 ///
1330 /// This function will always block the current thread if there is no data
1331 /// available and it's possible for more data to be sent. Once a message is
1332 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1333 /// receiver will wake up and return that message.
1334 ///
1335 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1336 /// this call is blocking, this call will wake up and return [`Err`] to
1337 /// indicate that no more messages can ever be received on this channel.
1338 /// However, since channels are buffered, messages sent before the disconnect
1339 /// will still be properly received.
1340 ///
1341 /// [`Sender`]: struct.Sender.html
1342 /// [`SyncSender`]: struct.SyncSender.html
1343 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1344 ///
1345 /// # Examples
1346 ///
1347 /// Successfully receiving value before reaching deadline:
1348 ///
1349 /// ```no_run
1350 /// #![feature(deadline_api)]
1351 /// use std::thread;
1352 /// use std::time::{Duration, Instant};
1353 /// use std::sync::mpsc;
1354 ///
1355 /// let (send, recv) = mpsc::channel();
1356 ///
1357 /// thread::spawn(move || {
1358 /// send.send('a').unwrap();
1359 /// });
1360 ///
1361 /// assert_eq!(
1362 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1363 /// Ok('a')
1364 /// );
1365 /// ```
1366 ///
1367 /// Receiving an error upon reaching deadline:
1368 ///
1369 /// ```no_run
1370 /// #![feature(deadline_api)]
1371 /// use std::thread;
1372 /// use std::time::{Duration, Instant};
1373 /// use std::sync::mpsc;
1374 ///
1375 /// let (send, recv) = mpsc::channel();
1376 ///
1377 /// thread::spawn(move || {
1378 /// thread::sleep(Duration::from_millis(800));
1379 /// send.send('a').unwrap();
1380 /// });
1381 ///
1382 /// assert_eq!(
1383 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1384 /// Err(mpsc::RecvTimeoutError::Timeout)
1385 /// );
1386 /// ```
1387 #[unstable(feature = "deadline_api", issue = "46316")]
1388 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
3157f602
XL
1389 use self::RecvTimeoutError::*;
1390
1391 loop {
1392 let port_or_empty = match *unsafe { self.inner() } {
1393 Flavor::Oneshot(ref p) => {
476ff2be 1394 match p.recv(Some(deadline)) {
3157f602
XL
1395 Ok(t) => return Ok(t),
1396 Err(oneshot::Disconnected) => return Err(Disconnected),
1397 Err(oneshot::Upgraded(rx)) => Some(rx),
1398 Err(oneshot::Empty) => None,
1399 }
1400 }
1401 Flavor::Stream(ref p) => {
476ff2be 1402 match p.recv(Some(deadline)) {
3157f602
XL
1403 Ok(t) => return Ok(t),
1404 Err(stream::Disconnected) => return Err(Disconnected),
1405 Err(stream::Upgraded(rx)) => Some(rx),
1406 Err(stream::Empty) => None,
1407 }
1408 }
1409 Flavor::Shared(ref p) => {
476ff2be 1410 match p.recv(Some(deadline)) {
3157f602
XL
1411 Ok(t) => return Ok(t),
1412 Err(shared::Disconnected) => return Err(Disconnected),
1413 Err(shared::Empty) => None,
1414 }
1415 }
1416 Flavor::Sync(ref p) => {
476ff2be 1417 match p.recv(Some(deadline)) {
3157f602
XL
1418 Ok(t) => return Ok(t),
1419 Err(sync::Disconnected) => return Err(Disconnected),
1420 Err(sync::Empty) => None,
1421 }
1422 }
1423 };
1424
1425 if let Some(new_port) = port_or_empty {
1426 unsafe {
1427 mem::swap(self.inner_mut(), new_port.inner_mut());
1428 }
1429 }
1430
1431 // If we're already passed the deadline, and we're here without
1432 // data, return a timeout, else try again.
1433 if Instant::now() >= deadline {
1434 return Err(Timeout);
1435 }
1436 }
1437 }
1438
1a4d82fc 1439 /// Returns an iterator that will block waiting for messages, but never
cc61c64b
XL
1440 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1441 ///
1442 /// [`panic!`]: ../../../std/macro.panic.html
1443 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1444 ///
1445 /// # Examples
1446 ///
1447 /// ```rust
1448 /// use std::sync::mpsc::channel;
1449 /// use std::thread;
1450 ///
1451 /// let (send, recv) = channel();
1452 ///
1453 /// thread::spawn(move || {
7cac9316
XL
1454 /// send.send(1).unwrap();
1455 /// send.send(2).unwrap();
1456 /// send.send(3).unwrap();
cc61c64b
XL
1457 /// });
1458 ///
7cac9316
XL
1459 /// let mut iter = recv.iter();
1460 /// assert_eq!(iter.next(), Some(1));
1461 /// assert_eq!(iter.next(), Some(2));
1462 /// assert_eq!(iter.next(), Some(3));
1463 /// assert_eq!(iter.next(), None);
cc61c64b 1464 /// ```
85aaf69f 1465 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
1466 pub fn iter(&self) -> Iter<T> {
1467 Iter { rx: self }
1468 }
5bcae85e
SL
1469
1470 /// Returns an iterator that will attempt to yield all pending values.
1471 /// It will return `None` if there are no more pending values or if the
cc61c64b 1472 /// channel has hung up. The iterator will never [`panic!`] or block the
5bcae85e 1473 /// user by waiting for values.
cc61c64b
XL
1474 ///
1475 /// [`panic!`]: ../../../std/macro.panic.html
7cac9316
XL
1476 ///
1477 /// # Examples
1478 ///
1479 /// ```no_run
1480 /// use std::sync::mpsc::channel;
1481 /// use std::thread;
1482 /// use std::time::Duration;
1483 ///
1484 /// let (sender, receiver) = channel();
1485 ///
1486 /// // nothing is in the buffer yet
1487 /// assert!(receiver.try_iter().next().is_none());
1488 ///
1489 /// thread::spawn(move || {
1490 /// thread::sleep(Duration::from_secs(1));
1491 /// sender.send(1).unwrap();
1492 /// sender.send(2).unwrap();
1493 /// sender.send(3).unwrap();
1494 /// });
1495 ///
1496 /// // nothing is in the buffer yet
1497 /// assert!(receiver.try_iter().next().is_none());
1498 ///
1499 /// // block for two seconds
1500 /// thread::sleep(Duration::from_secs(2));
1501 ///
1502 /// let mut iter = receiver.try_iter();
1503 /// assert_eq!(iter.next(), Some(1));
1504 /// assert_eq!(iter.next(), Some(2));
1505 /// assert_eq!(iter.next(), Some(3));
1506 /// assert_eq!(iter.next(), None);
1507 /// ```
476ff2be 1508 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
5bcae85e
SL
1509 pub fn try_iter(&self) -> TryIter<T> {
1510 TryIter { rx: self }
1511 }
1512
1a4d82fc
JJ
1513}
1514
c34b1796 1515impl<T> select::Packet for Receiver<T> {
1a4d82fc
JJ
1516 fn can_recv(&self) -> bool {
1517 loop {
1518 let new_port = match *unsafe { self.inner() } {
1519 Flavor::Oneshot(ref p) => {
476ff2be 1520 match p.can_recv() {
1a4d82fc
JJ
1521 Ok(ret) => return ret,
1522 Err(upgrade) => upgrade,
1523 }
1524 }
1525 Flavor::Stream(ref p) => {
476ff2be 1526 match p.can_recv() {
1a4d82fc
JJ
1527 Ok(ret) => return ret,
1528 Err(upgrade) => upgrade,
1529 }
1530 }
476ff2be
SL
1531 Flavor::Shared(ref p) => return p.can_recv(),
1532 Flavor::Sync(ref p) => return p.can_recv(),
1a4d82fc
JJ
1533 };
1534 unsafe {
1535 mem::swap(self.inner_mut(),
1536 new_port.inner_mut());
1537 }
1538 }
1539 }
1540
1541 fn start_selection(&self, mut token: SignalToken) -> StartResult {
1542 loop {
1543 let (t, new_port) = match *unsafe { self.inner() } {
1544 Flavor::Oneshot(ref p) => {
476ff2be 1545 match p.start_selection(token) {
1a4d82fc
JJ
1546 oneshot::SelSuccess => return Installed,
1547 oneshot::SelCanceled => return Abort,
1548 oneshot::SelUpgraded(t, rx) => (t, rx),
1549 }
1550 }
1551 Flavor::Stream(ref p) => {
476ff2be 1552 match p.start_selection(token) {
1a4d82fc
JJ
1553 stream::SelSuccess => return Installed,
1554 stream::SelCanceled => return Abort,
1555 stream::SelUpgraded(t, rx) => (t, rx),
1556 }
1557 }
476ff2be
SL
1558 Flavor::Shared(ref p) => return p.start_selection(token),
1559 Flavor::Sync(ref p) => return p.start_selection(token),
1a4d82fc
JJ
1560 };
1561 token = t;
1562 unsafe {
1563 mem::swap(self.inner_mut(), new_port.inner_mut());
1564 }
1565 }
1566 }
1567
1568 fn abort_selection(&self) -> bool {
1569 let mut was_upgrade = false;
1570 loop {
1571 let result = match *unsafe { self.inner() } {
476ff2be
SL
1572 Flavor::Oneshot(ref p) => p.abort_selection(),
1573 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1574 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1575 Flavor::Sync(ref p) => return p.abort_selection(),
1a4d82fc
JJ
1576 };
1577 let new_port = match result { Ok(b) => return b, Err(p) => p };
1578 was_upgrade = true;
1579 unsafe {
1580 mem::swap(self.inner_mut(),
1581 new_port.inner_mut());
1582 }
1583 }
1584 }
1585}
1586
85aaf69f 1587#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 1588impl<'a, T> Iterator for Iter<'a, T> {
1a4d82fc
JJ
1589 type Item = T;
1590
1591 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1592}
1593
476ff2be 1594#[stable(feature = "receiver_try_iter", since = "1.15.0")]
5bcae85e
SL
1595impl<'a, T> Iterator for TryIter<'a, T> {
1596 type Item = T;
1597
1598 fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1599}
1600
d9579d0f
AL
1601#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1602impl<'a, T> IntoIterator for &'a Receiver<T> {
1603 type Item = T;
1604 type IntoIter = Iter<'a, T>;
1605
1606 fn into_iter(self) -> Iter<'a, T> { self.iter() }
1607}
1608
92a42be0 1609#[stable(feature = "receiver_into_iter", since = "1.1.0")]
d9579d0f
AL
1610impl<T> Iterator for IntoIter<T> {
1611 type Item = T;
1612 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1613}
1614
1615#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1616impl <T> IntoIterator for Receiver<T> {
1617 type Item = T;
1618 type IntoIter = IntoIter<T>;
1619
1620 fn into_iter(self) -> IntoIter<T> {
1621 IntoIter { rx: self }
1622 }
1623}
1624
85aaf69f 1625#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 1626impl<T> Drop for Receiver<T> {
1a4d82fc 1627 fn drop(&mut self) {
476ff2be
SL
1628 match *unsafe { self.inner() } {
1629 Flavor::Oneshot(ref p) => p.drop_port(),
1630 Flavor::Stream(ref p) => p.drop_port(),
1631 Flavor::Shared(ref p) => p.drop_port(),
1632 Flavor::Sync(ref p) => p.drop_port(),
1a4d82fc
JJ
1633 }
1634 }
1635}
1636
7cac9316 1637#[stable(feature = "mpsc_debug", since = "1.8.0")]
7453a54e
SL
1638impl<T> fmt::Debug for Receiver<T> {
1639 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
abe05a73 1640 f.debug_struct("Receiver").finish()
7453a54e
SL
1641 }
1642}
1643
85aaf69f
SL
1644#[stable(feature = "rust1", since = "1.0.0")]
1645impl<T> fmt::Debug for SendError<T> {
1646 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1647 "SendError(..)".fmt(f)
1a4d82fc 1648 }
85aaf69f 1649}
1a4d82fc 1650
85aaf69f
SL
1651#[stable(feature = "rust1", since = "1.0.0")]
1652impl<T> fmt::Display for SendError<T> {
1653 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1654 "sending on a closed channel".fmt(f)
1a4d82fc 1655 }
1a4d82fc
JJ
1656}
1657
c34b1796 1658#[stable(feature = "rust1", since = "1.0.0")]
9e0c209e 1659impl<T: Send> error::Error for SendError<T> {
c34b1796
AL
1660 fn description(&self) -> &str {
1661 "sending on a closed channel"
1662 }
1663
8faf50e0 1664 fn cause(&self) -> Option<&dyn error::Error> {
c34b1796
AL
1665 None
1666 }
1667}
1668
85aaf69f
SL
1669#[stable(feature = "rust1", since = "1.0.0")]
1670impl<T> fmt::Debug for TrySendError<T> {
1a4d82fc 1671 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85aaf69f
SL
1672 match *self {
1673 TrySendError::Full(..) => "Full(..)".fmt(f),
1674 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1675 }
1a4d82fc
JJ
1676 }
1677}
1678
85aaf69f
SL
1679#[stable(feature = "rust1", since = "1.0.0")]
1680impl<T> fmt::Display for TrySendError<T> {
1a4d82fc
JJ
1681 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1682 match *self {
1683 TrySendError::Full(..) => {
1684 "sending on a full channel".fmt(f)
1685 }
1686 TrySendError::Disconnected(..) => {
1687 "sending on a closed channel".fmt(f)
1688 }
1689 }
1690 }
1691}
1692
c34b1796 1693#[stable(feature = "rust1", since = "1.0.0")]
9e0c209e 1694impl<T: Send> error::Error for TrySendError<T> {
c34b1796
AL
1695
1696 fn description(&self) -> &str {
1697 match *self {
1698 TrySendError::Full(..) => {
1699 "sending on a full channel"
1700 }
1701 TrySendError::Disconnected(..) => {
1702 "sending on a closed channel"
1703 }
1704 }
1705 }
1706
8faf50e0 1707 fn cause(&self) -> Option<&dyn error::Error> {
c34b1796
AL
1708 None
1709 }
1710}
1711
ff7c6d11
XL
1712#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1713impl<T> From<SendError<T>> for TrySendError<T> {
1714 fn from(err: SendError<T>) -> TrySendError<T> {
1715 match err {
1716 SendError(t) => TrySendError::Disconnected(t),
1717 }
1718 }
1719}
1720
85aaf69f
SL
1721#[stable(feature = "rust1", since = "1.0.0")]
1722impl fmt::Display for RecvError {
1a4d82fc
JJ
1723 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1724 "receiving on a closed channel".fmt(f)
1725 }
1726}
1727
c34b1796
AL
1728#[stable(feature = "rust1", since = "1.0.0")]
1729impl error::Error for RecvError {
1730
1731 fn description(&self) -> &str {
1732 "receiving on a closed channel"
1733 }
1734
8faf50e0 1735 fn cause(&self) -> Option<&dyn error::Error> {
c34b1796
AL
1736 None
1737 }
1738}
1739
85aaf69f
SL
1740#[stable(feature = "rust1", since = "1.0.0")]
1741impl fmt::Display for TryRecvError {
1a4d82fc
JJ
1742 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1743 match *self {
1744 TryRecvError::Empty => {
1745 "receiving on an empty channel".fmt(f)
1746 }
1747 TryRecvError::Disconnected => {
1748 "receiving on a closed channel".fmt(f)
1749 }
1750 }
1751 }
1752}
1753
c34b1796
AL
1754#[stable(feature = "rust1", since = "1.0.0")]
1755impl error::Error for TryRecvError {
1756
1757 fn description(&self) -> &str {
1758 match *self {
1759 TryRecvError::Empty => {
1760 "receiving on an empty channel"
1761 }
1762 TryRecvError::Disconnected => {
1763 "receiving on a closed channel"
1764 }
1765 }
1766 }
1767
8faf50e0 1768 fn cause(&self) -> Option<&dyn error::Error> {
c34b1796
AL
1769 None
1770 }
1771}
1772
ff7c6d11
XL
1773#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1774impl From<RecvError> for TryRecvError {
1775 fn from(err: RecvError) -> TryRecvError {
1776 match err {
1777 RecvError => TryRecvError::Disconnected,
1778 }
1779 }
1780}
1781
7cac9316 1782#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
476ff2be
SL
1783impl fmt::Display for RecvTimeoutError {
1784 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1785 match *self {
1786 RecvTimeoutError::Timeout => {
1787 "timed out waiting on channel".fmt(f)
1788 }
1789 RecvTimeoutError::Disconnected => {
1790 "channel is empty and sending half is closed".fmt(f)
1791 }
1792 }
1793 }
1794}
1795
7cac9316 1796#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
476ff2be
SL
1797impl error::Error for RecvTimeoutError {
1798 fn description(&self) -> &str {
1799 match *self {
1800 RecvTimeoutError::Timeout => {
1801 "timed out waiting on channel"
1802 }
1803 RecvTimeoutError::Disconnected => {
1804 "channel is empty and sending half is closed"
1805 }
1806 }
1807 }
1808
8faf50e0 1809 fn cause(&self) -> Option<&dyn error::Error> {
476ff2be
SL
1810 None
1811 }
1812}
1813
ff7c6d11
XL
1814#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1815impl From<RecvError> for RecvTimeoutError {
1816 fn from(err: RecvError) -> RecvTimeoutError {
1817 match err {
1818 RecvError => RecvTimeoutError::Disconnected,
1819 }
1820 }
1821}
1822
c30ab7b3 1823#[cfg(all(test, not(target_os = "emscripten")))]
d9579d0f 1824mod tests {
c1a9b12d 1825 use env;
1a4d82fc 1826 use super::*;
85aaf69f 1827 use thread;
3157f602 1828 use time::{Duration, Instant};
1a4d82fc 1829
c34b1796 1830 pub fn stress_factor() -> usize {
85aaf69f
SL
1831 match env::var("RUST_TEST_STRESS") {
1832 Ok(val) => val.parse().unwrap(),
1833 Err(..) => 1,
1a4d82fc
JJ
1834 }
1835 }
1836
1837 #[test]
1838 fn smoke() {
c34b1796 1839 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1840 tx.send(1).unwrap();
1841 assert_eq!(rx.recv().unwrap(), 1);
1842 }
1843
1844 #[test]
1845 fn drop_full() {
c34b1796 1846 let (tx, _rx) = channel::<Box<isize>>();
85aaf69f 1847 tx.send(box 1).unwrap();
1a4d82fc
JJ
1848 }
1849
1850 #[test]
1851 fn drop_full_shared() {
c34b1796 1852 let (tx, _rx) = channel::<Box<isize>>();
1a4d82fc
JJ
1853 drop(tx.clone());
1854 drop(tx.clone());
85aaf69f 1855 tx.send(box 1).unwrap();
1a4d82fc
JJ
1856 }
1857
1858 #[test]
1859 fn smoke_shared() {
c34b1796 1860 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1861 tx.send(1).unwrap();
1862 assert_eq!(rx.recv().unwrap(), 1);
1863 let tx = tx.clone();
1864 tx.send(1).unwrap();
1865 assert_eq!(rx.recv().unwrap(), 1);
1866 }
1867
1868 #[test]
1869 fn smoke_threads() {
c34b1796 1870 let (tx, rx) = channel::<i32>();
85aaf69f 1871 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1872 tx.send(1).unwrap();
1873 });
1874 assert_eq!(rx.recv().unwrap(), 1);
1875 }
1876
1877 #[test]
1878 fn smoke_port_gone() {
c34b1796 1879 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1880 drop(rx);
1881 assert!(tx.send(1).is_err());
1882 }
1883
1884 #[test]
1885 fn smoke_shared_port_gone() {
c34b1796 1886 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1887 drop(rx);
1888 assert!(tx.send(1).is_err())
1889 }
1890
1891 #[test]
1892 fn smoke_shared_port_gone2() {
c34b1796 1893 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1894 drop(rx);
1895 let tx2 = tx.clone();
1896 drop(tx);
1897 assert!(tx2.send(1).is_err());
1898 }
1899
1900 #[test]
1901 fn port_gone_concurrent() {
c34b1796 1902 let (tx, rx) = channel::<i32>();
85aaf69f 1903 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1904 rx.recv().unwrap();
1905 });
1906 while tx.send(1).is_ok() {}
1907 }
1908
1909 #[test]
1910 fn port_gone_concurrent_shared() {
c34b1796 1911 let (tx, rx) = channel::<i32>();
1a4d82fc 1912 let tx2 = tx.clone();
85aaf69f 1913 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1914 rx.recv().unwrap();
1915 });
1916 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1917 }
1918
1919 #[test]
1920 fn smoke_chan_gone() {
c34b1796 1921 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1922 drop(tx);
1923 assert!(rx.recv().is_err());
1924 }
1925
1926 #[test]
1927 fn smoke_chan_gone_shared() {
1928 let (tx, rx) = channel::<()>();
1929 let tx2 = tx.clone();
1930 drop(tx);
1931 drop(tx2);
1932 assert!(rx.recv().is_err());
1933 }
1934
1935 #[test]
1936 fn chan_gone_concurrent() {
c34b1796 1937 let (tx, rx) = channel::<i32>();
85aaf69f 1938 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1939 tx.send(1).unwrap();
1940 tx.send(1).unwrap();
1941 });
1942 while rx.recv().is_ok() {}
1943 }
1944
1945 #[test]
1946 fn stress() {
c34b1796 1947 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1948 let t = thread::spawn(move|| {
1949 for _ in 0..10000 { tx.send(1).unwrap(); }
1a4d82fc 1950 });
85aaf69f 1951 for _ in 0..10000 {
1a4d82fc
JJ
1952 assert_eq!(rx.recv().unwrap(), 1);
1953 }
1954 t.join().ok().unwrap();
1955 }
1956
1957 #[test]
1958 fn stress_shared() {
c34b1796
AL
1959 const AMT: u32 = 10000;
1960 const NTHREADS: u32 = 8;
1961 let (tx, rx) = channel::<i32>();
1a4d82fc 1962
85aaf69f
SL
1963 let t = thread::spawn(move|| {
1964 for _ in 0..AMT * NTHREADS {
1a4d82fc
JJ
1965 assert_eq!(rx.recv().unwrap(), 1);
1966 }
1967 match rx.try_recv() {
1968 Ok(..) => panic!(),
1969 _ => {}
1970 }
1971 });
1972
85aaf69f 1973 for _ in 0..NTHREADS {
1a4d82fc 1974 let tx = tx.clone();
85aaf69f
SL
1975 thread::spawn(move|| {
1976 for _ in 0..AMT { tx.send(1).unwrap(); }
1a4d82fc
JJ
1977 });
1978 }
1979 drop(tx);
1980 t.join().ok().unwrap();
1981 }
1982
1983 #[test]
1984 fn send_from_outside_runtime() {
1985 let (tx1, rx1) = channel::<()>();
c34b1796 1986 let (tx2, rx2) = channel::<i32>();
85aaf69f 1987 let t1 = thread::spawn(move|| {
1a4d82fc 1988 tx1.send(()).unwrap();
85aaf69f 1989 for _ in 0..40 {
1a4d82fc
JJ
1990 assert_eq!(rx2.recv().unwrap(), 1);
1991 }
1992 });
1993 rx1.recv().unwrap();
85aaf69f
SL
1994 let t2 = thread::spawn(move|| {
1995 for _ in 0..40 {
1a4d82fc
JJ
1996 tx2.send(1).unwrap();
1997 }
1998 });
1999 t1.join().ok().unwrap();
2000 t2.join().ok().unwrap();
2001 }
2002
2003 #[test]
2004 fn recv_from_outside_runtime() {
c34b1796 2005 let (tx, rx) = channel::<i32>();
85aaf69f
SL
2006 let t = thread::spawn(move|| {
2007 for _ in 0..40 {
1a4d82fc
JJ
2008 assert_eq!(rx.recv().unwrap(), 1);
2009 }
2010 });
85aaf69f 2011 for _ in 0..40 {
1a4d82fc
JJ
2012 tx.send(1).unwrap();
2013 }
2014 t.join().ok().unwrap();
2015 }
2016
2017 #[test]
2018 fn no_runtime() {
c34b1796
AL
2019 let (tx1, rx1) = channel::<i32>();
2020 let (tx2, rx2) = channel::<i32>();
85aaf69f 2021 let t1 = thread::spawn(move|| {
1a4d82fc
JJ
2022 assert_eq!(rx1.recv().unwrap(), 1);
2023 tx2.send(2).unwrap();
2024 });
85aaf69f 2025 let t2 = thread::spawn(move|| {
1a4d82fc
JJ
2026 tx1.send(1).unwrap();
2027 assert_eq!(rx2.recv().unwrap(), 2);
2028 });
2029 t1.join().ok().unwrap();
2030 t2.join().ok().unwrap();
2031 }
2032
2033 #[test]
2034 fn oneshot_single_thread_close_port_first() {
2035 // Simple test of closing without sending
c34b1796 2036 let (_tx, rx) = channel::<i32>();
1a4d82fc
JJ
2037 drop(rx);
2038 }
2039
2040 #[test]
2041 fn oneshot_single_thread_close_chan_first() {
2042 // Simple test of closing without sending
c34b1796 2043 let (tx, _rx) = channel::<i32>();
1a4d82fc
JJ
2044 drop(tx);
2045 }
2046
2047 #[test]
2048 fn oneshot_single_thread_send_port_close() {
2049 // Testing that the sender cleans up the payload if receiver is closed
c34b1796 2050 let (tx, rx) = channel::<Box<i32>>();
1a4d82fc
JJ
2051 drop(rx);
2052 assert!(tx.send(box 0).is_err());
2053 }
2054
2055 #[test]
2056 fn oneshot_single_thread_recv_chan_close() {
2057 // Receiving on a closed chan will panic
85aaf69f 2058 let res = thread::spawn(move|| {
c34b1796 2059 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
2060 drop(tx);
2061 rx.recv().unwrap();
2062 }).join();
2063 // What is our res?
2064 assert!(res.is_err());
2065 }
2066
2067 #[test]
2068 fn oneshot_single_thread_send_then_recv() {
c34b1796 2069 let (tx, rx) = channel::<Box<i32>>();
1a4d82fc 2070 tx.send(box 10).unwrap();
7cac9316 2071 assert!(*rx.recv().unwrap() == 10);
1a4d82fc
JJ
2072 }
2073
2074 #[test]
2075 fn oneshot_single_thread_try_send_open() {
c34b1796 2076 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
2077 assert!(tx.send(10).is_ok());
2078 assert!(rx.recv().unwrap() == 10);
2079 }
2080
2081 #[test]
2082 fn oneshot_single_thread_try_send_closed() {
c34b1796 2083 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
2084 drop(rx);
2085 assert!(tx.send(10).is_err());
2086 }
2087
2088 #[test]
2089 fn oneshot_single_thread_try_recv_open() {
c34b1796 2090 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
2091 tx.send(10).unwrap();
2092 assert!(rx.recv() == Ok(10));
2093 }
2094
2095 #[test]
2096 fn oneshot_single_thread_try_recv_closed() {
c34b1796 2097 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
2098 drop(tx);
2099 assert!(rx.recv().is_err());
2100 }
2101
2102 #[test]
2103 fn oneshot_single_thread_peek_data() {
c34b1796 2104 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
2105 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2106 tx.send(10).unwrap();
2107 assert_eq!(rx.try_recv(), Ok(10));
2108 }
2109
2110 #[test]
2111 fn oneshot_single_thread_peek_close() {
c34b1796 2112 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
2113 drop(tx);
2114 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2115 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2116 }
2117
2118 #[test]
2119 fn oneshot_single_thread_peek_open() {
c34b1796 2120 let (_tx, rx) = channel::<i32>();
1a4d82fc
JJ
2121 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2122 }
2123
2124 #[test]
2125 fn oneshot_multi_task_recv_then_send() {
c34b1796 2126 let (tx, rx) = channel::<Box<i32>>();
85aaf69f 2127 let _t = thread::spawn(move|| {
7cac9316 2128 assert!(*rx.recv().unwrap() == 10);
1a4d82fc
JJ
2129 });
2130
2131 tx.send(box 10).unwrap();
2132 }
2133
2134 #[test]
2135 fn oneshot_multi_task_recv_then_close() {
c34b1796 2136 let (tx, rx) = channel::<Box<i32>>();
85aaf69f 2137 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2138 drop(tx);
2139 });
85aaf69f 2140 let res = thread::spawn(move|| {
7cac9316 2141 assert!(*rx.recv().unwrap() == 10);
1a4d82fc
JJ
2142 }).join();
2143 assert!(res.is_err());
2144 }
2145
2146 #[test]
2147 fn oneshot_multi_thread_close_stress() {
85aaf69f 2148 for _ in 0..stress_factor() {
c34b1796 2149 let (tx, rx) = channel::<i32>();
85aaf69f 2150 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2151 drop(rx);
2152 });
2153 drop(tx);
2154 }
2155 }
2156
2157 #[test]
2158 fn oneshot_multi_thread_send_close_stress() {
85aaf69f 2159 for _ in 0..stress_factor() {
c34b1796 2160 let (tx, rx) = channel::<i32>();
85aaf69f 2161 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2162 drop(rx);
2163 });
85aaf69f 2164 let _ = thread::spawn(move|| {
1a4d82fc
JJ
2165 tx.send(1).unwrap();
2166 }).join();
2167 }
2168 }
2169
2170 #[test]
2171 fn oneshot_multi_thread_recv_close_stress() {
85aaf69f 2172 for _ in 0..stress_factor() {
c34b1796 2173 let (tx, rx) = channel::<i32>();
85aaf69f
SL
2174 thread::spawn(move|| {
2175 let res = thread::spawn(move|| {
1a4d82fc
JJ
2176 rx.recv().unwrap();
2177 }).join();
2178 assert!(res.is_err());
2179 });
85aaf69f
SL
2180 let _t = thread::spawn(move|| {
2181 thread::spawn(move|| {
1a4d82fc
JJ
2182 drop(tx);
2183 });
2184 });
2185 }
2186 }
2187
2188 #[test]
2189 fn oneshot_multi_thread_send_recv_stress() {
85aaf69f 2190 for _ in 0..stress_factor() {
c34b1796 2191 let (tx, rx) = channel::<Box<isize>>();
85aaf69f
SL
2192 let _t = thread::spawn(move|| {
2193 tx.send(box 10).unwrap();
1a4d82fc 2194 });
7cac9316 2195 assert!(*rx.recv().unwrap() == 10);
1a4d82fc
JJ
2196 }
2197 }
2198
2199 #[test]
2200 fn stream_send_recv_stress() {
85aaf69f 2201 for _ in 0..stress_factor() {
1a4d82fc
JJ
2202 let (tx, rx) = channel();
2203
2204 send(tx, 0);
2205 recv(rx, 0);
2206
c34b1796 2207 fn send(tx: Sender<Box<i32>>, i: i32) {
1a4d82fc
JJ
2208 if i == 10 { return }
2209
85aaf69f 2210 thread::spawn(move|| {
1a4d82fc
JJ
2211 tx.send(box i).unwrap();
2212 send(tx, i + 1);
2213 });
2214 }
2215
c34b1796 2216 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1a4d82fc
JJ
2217 if i == 10 { return }
2218
85aaf69f 2219 thread::spawn(move|| {
7cac9316 2220 assert!(*rx.recv().unwrap() == i);
1a4d82fc
JJ
2221 recv(rx, i + 1);
2222 });
2223 }
2224 }
2225 }
2226
3157f602
XL
2227 #[test]
2228 fn oneshot_single_thread_recv_timeout() {
2229 let (tx, rx) = channel();
2230 tx.send(()).unwrap();
2231 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2232 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2233 tx.send(()).unwrap();
2234 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2235 }
2236
2237 #[test]
2238 fn stress_recv_timeout_two_threads() {
2239 let (tx, rx) = channel();
2240 let stress = stress_factor() + 100;
2241 let timeout = Duration::from_millis(100);
2242
2243 thread::spawn(move || {
2244 for i in 0..stress {
2245 if i % 2 == 0 {
2246 thread::sleep(timeout * 2);
2247 }
2248 tx.send(1usize).unwrap();
2249 }
2250 });
2251
2252 let mut recv_count = 0;
2253 loop {
2254 match rx.recv_timeout(timeout) {
2255 Ok(n) => {
2256 assert_eq!(n, 1usize);
2257 recv_count += 1;
2258 }
2259 Err(RecvTimeoutError::Timeout) => continue,
2260 Err(RecvTimeoutError::Disconnected) => break,
2261 }
2262 }
2263
2264 assert_eq!(recv_count, stress);
2265 }
2266
2267 #[test]
2268 fn recv_timeout_upgrade() {
2269 let (tx, rx) = channel::<()>();
2270 let timeout = Duration::from_millis(1);
2271 let _tx_clone = tx.clone();
2272
2273 let start = Instant::now();
2274 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2275 assert!(Instant::now() >= start + timeout);
2276 }
2277
2278 #[test]
2279 fn stress_recv_timeout_shared() {
2280 let (tx, rx) = channel();
2281 let stress = stress_factor() + 100;
2282
2283 for i in 0..stress {
2284 let tx = tx.clone();
2285 thread::spawn(move || {
2286 thread::sleep(Duration::from_millis(i as u64 * 10));
2287 tx.send(1usize).unwrap();
2288 });
2289 }
2290
2291 drop(tx);
2292
2293 let mut recv_count = 0;
2294 loop {
2295 match rx.recv_timeout(Duration::from_millis(10)) {
2296 Ok(n) => {
2297 assert_eq!(n, 1usize);
2298 recv_count += 1;
2299 }
2300 Err(RecvTimeoutError::Timeout) => continue,
2301 Err(RecvTimeoutError::Disconnected) => break,
2302 }
2303 }
2304
2305 assert_eq!(recv_count, stress);
2306 }
2307
0731742a
XL
2308 #[test]
2309 fn very_long_recv_timeout_wont_panic() {
2310 let (tx, rx) = channel::<()>();
2311 let join_handle = thread::spawn(move || {
2312 rx.recv_timeout(Duration::from_secs(u64::max_value()))
2313 });
2314 thread::sleep(Duration::from_secs(1));
2315 assert!(tx.send(()).is_ok());
2316 assert_eq!(join_handle.join().unwrap(), Ok(()));
2317 }
2318
1a4d82fc
JJ
2319 #[test]
2320 fn recv_a_lot() {
2321 // Regression test that we don't run out of stack in scheduler context
2322 let (tx, rx) = channel();
85aaf69f
SL
2323 for _ in 0..10000 { tx.send(()).unwrap(); }
2324 for _ in 0..10000 { rx.recv().unwrap(); }
1a4d82fc
JJ
2325 }
2326
3157f602
XL
2327 #[test]
2328 fn shared_recv_timeout() {
2329 let (tx, rx) = channel();
2330 let total = 5;
2331 for _ in 0..total {
2332 let tx = tx.clone();
2333 thread::spawn(move|| {
2334 tx.send(()).unwrap();
2335 });
2336 }
2337
2338 for _ in 0..total { rx.recv().unwrap(); }
2339
2340 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2341 tx.send(()).unwrap();
2342 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2343 }
2344
1a4d82fc
JJ
2345 #[test]
2346 fn shared_chan_stress() {
2347 let (tx, rx) = channel();
2348 let total = stress_factor() + 100;
85aaf69f 2349 for _ in 0..total {
1a4d82fc 2350 let tx = tx.clone();
85aaf69f 2351 thread::spawn(move|| {
1a4d82fc
JJ
2352 tx.send(()).unwrap();
2353 });
2354 }
2355
85aaf69f 2356 for _ in 0..total {
1a4d82fc
JJ
2357 rx.recv().unwrap();
2358 }
2359 }
2360
2361 #[test]
2362 fn test_nested_recv_iter() {
c34b1796
AL
2363 let (tx, rx) = channel::<i32>();
2364 let (total_tx, total_rx) = channel::<i32>();
1a4d82fc 2365
85aaf69f 2366 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2367 let mut acc = 0;
2368 for x in rx.iter() {
2369 acc += x;
2370 }
2371 total_tx.send(acc).unwrap();
2372 });
2373
2374 tx.send(3).unwrap();
2375 tx.send(1).unwrap();
2376 tx.send(2).unwrap();
2377 drop(tx);
2378 assert_eq!(total_rx.recv().unwrap(), 6);
2379 }
2380
2381 #[test]
2382 fn test_recv_iter_break() {
c34b1796 2383 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
2384 let (count_tx, count_rx) = channel();
2385
85aaf69f 2386 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2387 let mut count = 0;
2388 for x in rx.iter() {
2389 if count >= 3 {
2390 break;
2391 } else {
2392 count += x;
2393 }
2394 }
2395 count_tx.send(count).unwrap();
2396 });
2397
2398 tx.send(2).unwrap();
2399 tx.send(2).unwrap();
2400 tx.send(2).unwrap();
2401 let _ = tx.send(2);
2402 drop(tx);
2403 assert_eq!(count_rx.recv().unwrap(), 4);
2404 }
2405
5bcae85e
SL
2406 #[test]
2407 fn test_recv_try_iter() {
2408 let (request_tx, request_rx) = channel();
2409 let (response_tx, response_rx) = channel();
2410
2411 // Request `x`s until we have `6`.
2412 let t = thread::spawn(move|| {
2413 let mut count = 0;
2414 loop {
2415 for x in response_rx.try_iter() {
2416 count += x;
2417 if count == 6 {
2418 return count;
2419 }
2420 }
2421 request_tx.send(()).unwrap();
2422 }
2423 });
2424
2425 for _ in request_rx.iter() {
2426 if response_tx.send(2).is_err() {
2427 break;
2428 }
2429 }
2430
2431 assert_eq!(t.join().unwrap(), 6);
2432 }
2433
d9579d0f
AL
2434 #[test]
2435 fn test_recv_into_iter_owned() {
2436 let mut iter = {
2437 let (tx, rx) = channel::<i32>();
2438 tx.send(1).unwrap();
2439 tx.send(2).unwrap();
2440
2441 rx.into_iter()
2442 };
2443 assert_eq!(iter.next().unwrap(), 1);
2444 assert_eq!(iter.next().unwrap(), 2);
2445 assert_eq!(iter.next().is_none(), true);
2446 }
2447
2448 #[test]
2449 fn test_recv_into_iter_borrowed() {
2450 let (tx, rx) = channel::<i32>();
2451 tx.send(1).unwrap();
2452 tx.send(2).unwrap();
2453 drop(tx);
2454 let mut iter = (&rx).into_iter();
2455 assert_eq!(iter.next().unwrap(), 1);
2456 assert_eq!(iter.next().unwrap(), 2);
2457 assert_eq!(iter.next().is_none(), true);
2458 }
2459
1a4d82fc
JJ
2460 #[test]
2461 fn try_recv_states() {
c34b1796 2462 let (tx1, rx1) = channel::<i32>();
1a4d82fc
JJ
2463 let (tx2, rx2) = channel::<()>();
2464 let (tx3, rx3) = channel::<()>();
85aaf69f 2465 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2466 rx2.recv().unwrap();
2467 tx1.send(1).unwrap();
2468 tx3.send(()).unwrap();
2469 rx2.recv().unwrap();
2470 drop(tx1);
2471 tx3.send(()).unwrap();
2472 });
2473
2474 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2475 tx2.send(()).unwrap();
2476 rx3.recv().unwrap();
2477 assert_eq!(rx1.try_recv(), Ok(1));
2478 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2479 tx2.send(()).unwrap();
2480 rx3.recv().unwrap();
2481 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2482 }
2483
2484 // This bug used to end up in a livelock inside of the Receiver destructor
2485 // because the internal state of the Shared packet was corrupted
2486 #[test]
2487 fn destroy_upgraded_shared_port_when_sender_still_active() {
2488 let (tx, rx) = channel();
2489 let (tx2, rx2) = channel();
85aaf69f 2490 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2491 rx.recv().unwrap(); // wait on a oneshot
2492 drop(rx); // destroy a shared
2493 tx2.send(()).unwrap();
2494 });
bd371182 2495 // make sure the other thread has gone to sleep
85aaf69f 2496 for _ in 0..5000 { thread::yield_now(); }
1a4d82fc
JJ
2497
2498 // upgrade to a shared chan and send a message
2499 let t = tx.clone();
2500 drop(tx);
2501 t.send(()).unwrap();
2502
bd371182 2503 // wait for the child thread to exit before we exit
1a4d82fc
JJ
2504 rx2.recv().unwrap();
2505 }
c30ab7b3
SL
2506
2507 #[test]
2508 fn issue_32114() {
2509 let (tx, _) = channel();
2510 let _ = tx.send(123);
2511 assert_eq!(tx.send(123), Err(SendError(123)));
2512 }
1a4d82fc
JJ
2513}
2514
c30ab7b3 2515#[cfg(all(test, not(target_os = "emscripten")))]
1a4d82fc 2516mod sync_tests {
c1a9b12d 2517 use env;
85aaf69f 2518 use thread;
1a4d82fc 2519 use super::*;
3157f602 2520 use time::Duration;
1a4d82fc 2521
c34b1796 2522 pub fn stress_factor() -> usize {
85aaf69f
SL
2523 match env::var("RUST_TEST_STRESS") {
2524 Ok(val) => val.parse().unwrap(),
2525 Err(..) => 1,
1a4d82fc
JJ
2526 }
2527 }
2528
2529 #[test]
2530 fn smoke() {
c34b1796 2531 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2532 tx.send(1).unwrap();
2533 assert_eq!(rx.recv().unwrap(), 1);
2534 }
2535
2536 #[test]
2537 fn drop_full() {
c34b1796 2538 let (tx, _rx) = sync_channel::<Box<isize>>(1);
85aaf69f 2539 tx.send(box 1).unwrap();
1a4d82fc
JJ
2540 }
2541
2542 #[test]
2543 fn smoke_shared() {
c34b1796 2544 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2545 tx.send(1).unwrap();
2546 assert_eq!(rx.recv().unwrap(), 1);
2547 let tx = tx.clone();
2548 tx.send(1).unwrap();
2549 assert_eq!(rx.recv().unwrap(), 1);
2550 }
2551
3157f602
XL
2552 #[test]
2553 fn recv_timeout() {
2554 let (tx, rx) = sync_channel::<i32>(1);
2555 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2556 tx.send(1).unwrap();
2557 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2558 }
2559
1a4d82fc
JJ
2560 #[test]
2561 fn smoke_threads() {
c34b1796 2562 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2563 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2564 tx.send(1).unwrap();
2565 });
2566 assert_eq!(rx.recv().unwrap(), 1);
2567 }
2568
2569 #[test]
2570 fn smoke_port_gone() {
c34b1796 2571 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2572 drop(rx);
2573 assert!(tx.send(1).is_err());
2574 }
2575
2576 #[test]
2577 fn smoke_shared_port_gone2() {
c34b1796 2578 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2579 drop(rx);
2580 let tx2 = tx.clone();
2581 drop(tx);
2582 assert!(tx2.send(1).is_err());
2583 }
2584
2585 #[test]
2586 fn port_gone_concurrent() {
c34b1796 2587 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2588 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2589 rx.recv().unwrap();
2590 });
2591 while tx.send(1).is_ok() {}
2592 }
2593
2594 #[test]
2595 fn port_gone_concurrent_shared() {
c34b1796 2596 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc 2597 let tx2 = tx.clone();
85aaf69f 2598 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2599 rx.recv().unwrap();
2600 });
2601 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2602 }
2603
2604 #[test]
2605 fn smoke_chan_gone() {
c34b1796 2606 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2607 drop(tx);
2608 assert!(rx.recv().is_err());
2609 }
2610
2611 #[test]
2612 fn smoke_chan_gone_shared() {
2613 let (tx, rx) = sync_channel::<()>(0);
2614 let tx2 = tx.clone();
2615 drop(tx);
2616 drop(tx2);
2617 assert!(rx.recv().is_err());
2618 }
2619
2620 #[test]
2621 fn chan_gone_concurrent() {
c34b1796 2622 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2623 thread::spawn(move|| {
1a4d82fc
JJ
2624 tx.send(1).unwrap();
2625 tx.send(1).unwrap();
2626 });
2627 while rx.recv().is_ok() {}
2628 }
2629
2630 #[test]
2631 fn stress() {
c34b1796 2632 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f
SL
2633 thread::spawn(move|| {
2634 for _ in 0..10000 { tx.send(1).unwrap(); }
1a4d82fc 2635 });
85aaf69f 2636 for _ in 0..10000 {
1a4d82fc
JJ
2637 assert_eq!(rx.recv().unwrap(), 1);
2638 }
2639 }
2640
2641 #[test]
3157f602
XL
2642 fn stress_recv_timeout_two_threads() {
2643 let (tx, rx) = sync_channel::<i32>(0);
2644
2645 thread::spawn(move|| {
2646 for _ in 0..10000 { tx.send(1).unwrap(); }
2647 });
2648
2649 let mut recv_count = 0;
2650 loop {
2651 match rx.recv_timeout(Duration::from_millis(1)) {
2652 Ok(v) => {
2653 assert_eq!(v, 1);
2654 recv_count += 1;
2655 },
2656 Err(RecvTimeoutError::Timeout) => continue,
2657 Err(RecvTimeoutError::Disconnected) => break,
2658 }
2659 }
2660
2661 assert_eq!(recv_count, 10000);
2662 }
2663
2664 #[test]
2665 fn stress_recv_timeout_shared() {
2666 const AMT: u32 = 1000;
2667 const NTHREADS: u32 = 8;
2668 let (tx, rx) = sync_channel::<i32>(0);
2669 let (dtx, drx) = sync_channel::<()>(0);
2670
2671 thread::spawn(move|| {
2672 let mut recv_count = 0;
2673 loop {
2674 match rx.recv_timeout(Duration::from_millis(10)) {
2675 Ok(v) => {
2676 assert_eq!(v, 1);
2677 recv_count += 1;
2678 },
2679 Err(RecvTimeoutError::Timeout) => continue,
2680 Err(RecvTimeoutError::Disconnected) => break,
2681 }
2682 }
2683
2684 assert_eq!(recv_count, AMT * NTHREADS);
2685 assert!(rx.try_recv().is_err());
2686
2687 dtx.send(()).unwrap();
2688 });
2689
2690 for _ in 0..NTHREADS {
2691 let tx = tx.clone();
2692 thread::spawn(move|| {
2693 for _ in 0..AMT { tx.send(1).unwrap(); }
2694 });
2695 }
2696
2697 drop(tx);
2698
2699 drx.recv().unwrap();
2700 }
2701
2702 #[test]
1a4d82fc 2703 fn stress_shared() {
c34b1796
AL
2704 const AMT: u32 = 1000;
2705 const NTHREADS: u32 = 8;
2706 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2707 let (dtx, drx) = sync_channel::<()>(0);
2708
85aaf69f
SL
2709 thread::spawn(move|| {
2710 for _ in 0..AMT * NTHREADS {
1a4d82fc
JJ
2711 assert_eq!(rx.recv().unwrap(), 1);
2712 }
2713 match rx.try_recv() {
2714 Ok(..) => panic!(),
2715 _ => {}
2716 }
2717 dtx.send(()).unwrap();
2718 });
2719
85aaf69f 2720 for _ in 0..NTHREADS {
1a4d82fc 2721 let tx = tx.clone();
85aaf69f
SL
2722 thread::spawn(move|| {
2723 for _ in 0..AMT { tx.send(1).unwrap(); }
1a4d82fc
JJ
2724 });
2725 }
2726 drop(tx);
2727 drx.recv().unwrap();
2728 }
2729
2730 #[test]
2731 fn oneshot_single_thread_close_port_first() {
2732 // Simple test of closing without sending
c34b1796 2733 let (_tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2734 drop(rx);
2735 }
2736
2737 #[test]
2738 fn oneshot_single_thread_close_chan_first() {
2739 // Simple test of closing without sending
c34b1796 2740 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2741 drop(tx);
2742 }
2743
2744 #[test]
2745 fn oneshot_single_thread_send_port_close() {
2746 // Testing that the sender cleans up the payload if receiver is closed
c34b1796 2747 let (tx, rx) = sync_channel::<Box<i32>>(0);
1a4d82fc
JJ
2748 drop(rx);
2749 assert!(tx.send(box 0).is_err());
2750 }
2751
2752 #[test]
2753 fn oneshot_single_thread_recv_chan_close() {
2754 // Receiving on a closed chan will panic
85aaf69f 2755 let res = thread::spawn(move|| {
c34b1796 2756 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2757 drop(tx);
2758 rx.recv().unwrap();
2759 }).join();
2760 // What is our res?
2761 assert!(res.is_err());
2762 }
2763
2764 #[test]
2765 fn oneshot_single_thread_send_then_recv() {
c34b1796 2766 let (tx, rx) = sync_channel::<Box<i32>>(1);
1a4d82fc 2767 tx.send(box 10).unwrap();
7cac9316 2768 assert!(*rx.recv().unwrap() == 10);
1a4d82fc
JJ
2769 }
2770
2771 #[test]
2772 fn oneshot_single_thread_try_send_open() {
c34b1796 2773 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2774 assert_eq!(tx.try_send(10), Ok(()));
2775 assert!(rx.recv().unwrap() == 10);
2776 }
2777
2778 #[test]
2779 fn oneshot_single_thread_try_send_closed() {
c34b1796 2780 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2781 drop(rx);
2782 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2783 }
2784
2785 #[test]
2786 fn oneshot_single_thread_try_send_closed2() {
c34b1796 2787 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2788 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2789 }
2790
2791 #[test]
2792 fn oneshot_single_thread_try_recv_open() {
c34b1796 2793 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2794 tx.send(10).unwrap();
2795 assert!(rx.recv() == Ok(10));
2796 }
2797
2798 #[test]
2799 fn oneshot_single_thread_try_recv_closed() {
c34b1796 2800 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2801 drop(tx);
2802 assert!(rx.recv().is_err());
2803 }
2804
5bcae85e
SL
2805 #[test]
2806 fn oneshot_single_thread_try_recv_closed_with_data() {
2807 let (tx, rx) = sync_channel::<i32>(1);
2808 tx.send(10).unwrap();
2809 drop(tx);
2810 assert_eq!(rx.try_recv(), Ok(10));
2811 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2812 }
2813
1a4d82fc
JJ
2814 #[test]
2815 fn oneshot_single_thread_peek_data() {
c34b1796 2816 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2817 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2818 tx.send(10).unwrap();
2819 assert_eq!(rx.try_recv(), Ok(10));
2820 }
2821
2822 #[test]
2823 fn oneshot_single_thread_peek_close() {
c34b1796 2824 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2825 drop(tx);
2826 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2827 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2828 }
2829
2830 #[test]
2831 fn oneshot_single_thread_peek_open() {
c34b1796 2832 let (_tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2833 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2834 }
2835
2836 #[test]
2837 fn oneshot_multi_task_recv_then_send() {
c34b1796 2838 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f 2839 let _t = thread::spawn(move|| {
7cac9316 2840 assert!(*rx.recv().unwrap() == 10);
1a4d82fc
JJ
2841 });
2842
2843 tx.send(box 10).unwrap();
2844 }
2845
2846 #[test]
2847 fn oneshot_multi_task_recv_then_close() {
c34b1796 2848 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f 2849 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2850 drop(tx);
2851 });
85aaf69f 2852 let res = thread::spawn(move|| {
7cac9316 2853 assert!(*rx.recv().unwrap() == 10);
1a4d82fc
JJ
2854 }).join();
2855 assert!(res.is_err());
2856 }
2857
2858 #[test]
2859 fn oneshot_multi_thread_close_stress() {
85aaf69f 2860 for _ in 0..stress_factor() {
c34b1796 2861 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2862 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2863 drop(rx);
2864 });
2865 drop(tx);
2866 }
2867 }
2868
2869 #[test]
2870 fn oneshot_multi_thread_send_close_stress() {
85aaf69f 2871 for _ in 0..stress_factor() {
c34b1796 2872 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2873 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2874 drop(rx);
2875 });
85aaf69f 2876 let _ = thread::spawn(move || {
1a4d82fc
JJ
2877 tx.send(1).unwrap();
2878 }).join();
2879 }
2880 }
2881
2882 #[test]
2883 fn oneshot_multi_thread_recv_close_stress() {
85aaf69f 2884 for _ in 0..stress_factor() {
c34b1796 2885 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f
SL
2886 let _t = thread::spawn(move|| {
2887 let res = thread::spawn(move|| {
1a4d82fc
JJ
2888 rx.recv().unwrap();
2889 }).join();
2890 assert!(res.is_err());
2891 });
85aaf69f
SL
2892 let _t = thread::spawn(move|| {
2893 thread::spawn(move|| {
1a4d82fc
JJ
2894 drop(tx);
2895 });
2896 });
2897 }
2898 }
2899
2900 #[test]
2901 fn oneshot_multi_thread_send_recv_stress() {
85aaf69f 2902 for _ in 0..stress_factor() {
c34b1796 2903 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f
SL
2904 let _t = thread::spawn(move|| {
2905 tx.send(box 10).unwrap();
1a4d82fc 2906 });
7cac9316 2907 assert!(*rx.recv().unwrap() == 10);
1a4d82fc
JJ
2908 }
2909 }
2910
2911 #[test]
2912 fn stream_send_recv_stress() {
85aaf69f 2913 for _ in 0..stress_factor() {
c34b1796 2914 let (tx, rx) = sync_channel::<Box<i32>>(0);
1a4d82fc
JJ
2915
2916 send(tx, 0);
2917 recv(rx, 0);
2918
c34b1796 2919 fn send(tx: SyncSender<Box<i32>>, i: i32) {
1a4d82fc
JJ
2920 if i == 10 { return }
2921
85aaf69f 2922 thread::spawn(move|| {
1a4d82fc
JJ
2923 tx.send(box i).unwrap();
2924 send(tx, i + 1);
2925 });
2926 }
2927
c34b1796 2928 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1a4d82fc
JJ
2929 if i == 10 { return }
2930
85aaf69f 2931 thread::spawn(move|| {
7cac9316 2932 assert!(*rx.recv().unwrap() == i);
1a4d82fc
JJ
2933 recv(rx, i + 1);
2934 });
2935 }
2936 }
2937 }
2938
2939 #[test]
2940 fn recv_a_lot() {
2941 // Regression test that we don't run out of stack in scheduler context
2942 let (tx, rx) = sync_channel(10000);
85aaf69f
SL
2943 for _ in 0..10000 { tx.send(()).unwrap(); }
2944 for _ in 0..10000 { rx.recv().unwrap(); }
1a4d82fc
JJ
2945 }
2946
2947 #[test]
2948 fn shared_chan_stress() {
2949 let (tx, rx) = sync_channel(0);
2950 let total = stress_factor() + 100;
85aaf69f 2951 for _ in 0..total {
1a4d82fc 2952 let tx = tx.clone();
85aaf69f 2953 thread::spawn(move|| {
1a4d82fc
JJ
2954 tx.send(()).unwrap();
2955 });
2956 }
2957
85aaf69f 2958 for _ in 0..total {
1a4d82fc
JJ
2959 rx.recv().unwrap();
2960 }
2961 }
2962
2963 #[test]
2964 fn test_nested_recv_iter() {
c34b1796
AL
2965 let (tx, rx) = sync_channel::<i32>(0);
2966 let (total_tx, total_rx) = sync_channel::<i32>(0);
1a4d82fc 2967
85aaf69f 2968 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2969 let mut acc = 0;
2970 for x in rx.iter() {
2971 acc += x;
2972 }
2973 total_tx.send(acc).unwrap();
2974 });
2975
2976 tx.send(3).unwrap();
2977 tx.send(1).unwrap();
2978 tx.send(2).unwrap();
2979 drop(tx);
2980 assert_eq!(total_rx.recv().unwrap(), 6);
2981 }
2982
2983 #[test]
2984 fn test_recv_iter_break() {
c34b1796 2985 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2986 let (count_tx, count_rx) = sync_channel(0);
2987
85aaf69f 2988 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2989 let mut count = 0;
2990 for x in rx.iter() {
2991 if count >= 3 {
2992 break;
2993 } else {
2994 count += x;
2995 }
2996 }
2997 count_tx.send(count).unwrap();
2998 });
2999
3000 tx.send(2).unwrap();
3001 tx.send(2).unwrap();
3002 tx.send(2).unwrap();
3003 let _ = tx.try_send(2);
3004 drop(tx);
3005 assert_eq!(count_rx.recv().unwrap(), 4);
3006 }
3007
3008 #[test]
3009 fn try_recv_states() {
c34b1796 3010 let (tx1, rx1) = sync_channel::<i32>(1);
1a4d82fc
JJ
3011 let (tx2, rx2) = sync_channel::<()>(1);
3012 let (tx3, rx3) = sync_channel::<()>(1);
85aaf69f 3013 let _t = thread::spawn(move|| {
1a4d82fc
JJ
3014 rx2.recv().unwrap();
3015 tx1.send(1).unwrap();
3016 tx3.send(()).unwrap();
3017 rx2.recv().unwrap();
3018 drop(tx1);
3019 tx3.send(()).unwrap();
3020 });
3021
3022 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3023 tx2.send(()).unwrap();
3024 rx3.recv().unwrap();
3025 assert_eq!(rx1.try_recv(), Ok(1));
3026 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
3027 tx2.send(()).unwrap();
3028 rx3.recv().unwrap();
3029 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
3030 }
3031
3032 // This bug used to end up in a livelock inside of the Receiver destructor
3033 // because the internal state of the Shared packet was corrupted
3034 #[test]
3035 fn destroy_upgraded_shared_port_when_sender_still_active() {
3036 let (tx, rx) = sync_channel::<()>(0);
3037 let (tx2, rx2) = sync_channel::<()>(0);
85aaf69f 3038 let _t = thread::spawn(move|| {
1a4d82fc
JJ
3039 rx.recv().unwrap(); // wait on a oneshot
3040 drop(rx); // destroy a shared
3041 tx2.send(()).unwrap();
3042 });
bd371182 3043 // make sure the other thread has gone to sleep
85aaf69f 3044 for _ in 0..5000 { thread::yield_now(); }
1a4d82fc
JJ
3045
3046 // upgrade to a shared chan and send a message
3047 let t = tx.clone();
3048 drop(tx);
3049 t.send(()).unwrap();
3050
bd371182 3051 // wait for the child thread to exit before we exit
1a4d82fc
JJ
3052 rx2.recv().unwrap();
3053 }
3054
3055 #[test]
3056 fn send1() {
c34b1796 3057 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 3058 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
1a4d82fc
JJ
3059 assert_eq!(tx.send(1), Ok(()));
3060 }
3061
3062 #[test]
3063 fn send2() {
c34b1796 3064 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 3065 let _t = thread::spawn(move|| { drop(rx); });
1a4d82fc
JJ
3066 assert!(tx.send(1).is_err());
3067 }
3068
3069 #[test]
3070 fn send3() {
c34b1796 3071 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc 3072 assert_eq!(tx.send(1), Ok(()));
85aaf69f 3073 let _t =thread::spawn(move|| { drop(rx); });
1a4d82fc
JJ
3074 assert!(tx.send(1).is_err());
3075 }
3076
3077 #[test]
3078 fn send4() {
c34b1796 3079 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
3080 let tx2 = tx.clone();
3081 let (done, donerx) = channel();
3082 let done2 = done.clone();
85aaf69f 3083 let _t = thread::spawn(move|| {
1a4d82fc
JJ
3084 assert!(tx.send(1).is_err());
3085 done.send(()).unwrap();
3086 });
85aaf69f 3087 let _t = thread::spawn(move|| {
1a4d82fc
JJ
3088 assert!(tx2.send(2).is_err());
3089 done2.send(()).unwrap();
3090 });
3091 drop(rx);
3092 donerx.recv().unwrap();
3093 donerx.recv().unwrap();
3094 }
3095
3096 #[test]
3097 fn try_send1() {
c34b1796 3098 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
3099 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3100 }
3101
3102 #[test]
3103 fn try_send2() {
c34b1796 3104 let (tx, _rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
3105 assert_eq!(tx.try_send(1), Ok(()));
3106 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
3107 }
3108
3109 #[test]
3110 fn try_send3() {
c34b1796 3111 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
3112 assert_eq!(tx.try_send(1), Ok(()));
3113 drop(rx);
3114 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
3115 }
3116
3117 #[test]
3118 fn issue_15761() {
3119 fn repro() {
3120 let (tx1, rx1) = sync_channel::<()>(3);
3121 let (tx2, rx2) = sync_channel::<()>(3);
3122
85aaf69f 3123 let _t = thread::spawn(move|| {
1a4d82fc
JJ
3124 rx1.recv().unwrap();
3125 tx2.try_send(()).unwrap();
3126 });
3127
3128 tx1.try_send(()).unwrap();
3129 rx2.recv().unwrap();
3130 }
3131
85aaf69f 3132 for _ in 0..100 {
1a4d82fc
JJ
3133 repro()
3134 }
3135 }
3136}