]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/mod.rs
New upstream version 1.13.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mod.rs
CommitLineData
1a4d82fc
JJ
1// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
85aaf69f 11//! Multi-producer, single-consumer FIFO queue communication primitives.
1a4d82fc
JJ
12//!
13//! This module provides message-based communication over channels, concretely
14//! defined among three types:
15//!
16//! * `Sender`
17//! * `SyncSender`
18//! * `Receiver`
19//!
20//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21//! senders are clone-able (multi-producer) such that many threads can send
22//! simultaneously to one receiver (single-consumer).
23//!
24//! These channels come in two flavors:
25//!
26//! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27//! will return a `(Sender, Receiver)` tuple where all sends will be
28//! **asynchronous** (they never block). The channel conceptually has an
29//! infinite buffer.
30//!
31//! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32//! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33//! is a pre-allocated buffer of a fixed size. All sends will be
34//! **synchronous** by blocking until there is buffer space available. Note
35//! that a bound of 0 is allowed, causing the channel to become a
36//! "rendezvous" channel where each sender atomically hands off a message to
37//! a receiver.
38//!
39//! ## Disconnection
40//!
41//! The send and receive operations on channels will all return a `Result`
42//! indicating whether the operation succeeded or not. An unsuccessful operation
43//! is normally indicative of the other half of a channel having "hung up" by
44//! being dropped in its corresponding thread.
45//!
46//! Once half of a channel has been deallocated, most operations can no longer
47//! continue to make progress, so `Err` will be returned. Many applications will
48//! continue to `unwrap()` the results returned from this module, instigating a
49//! propagation of failure among threads if one unexpectedly dies.
50//!
51//! # Examples
52//!
53//! Simple usage:
54//!
55//! ```
85aaf69f 56//! use std::thread;
1a4d82fc
JJ
57//! use std::sync::mpsc::channel;
58//!
59//! // Create a simple streaming channel
60//! let (tx, rx) = channel();
85aaf69f
SL
61//! thread::spawn(move|| {
62//! tx.send(10).unwrap();
1a4d82fc 63//! });
85aaf69f 64//! assert_eq!(rx.recv().unwrap(), 10);
1a4d82fc
JJ
65//! ```
66//!
67//! Shared usage:
68//!
69//! ```
85aaf69f 70//! use std::thread;
1a4d82fc
JJ
71//! use std::sync::mpsc::channel;
72//!
73//! // Create a shared channel that can be sent along from many threads
74//! // where tx is the sending half (tx for transmission), and rx is the receiving
75//! // half (rx for receiving).
76//! let (tx, rx) = channel();
85aaf69f 77//! for i in 0..10 {
1a4d82fc 78//! let tx = tx.clone();
85aaf69f 79//! thread::spawn(move|| {
1a4d82fc
JJ
80//! tx.send(i).unwrap();
81//! });
82//! }
83//!
85aaf69f 84//! for _ in 0..10 {
1a4d82fc
JJ
85//! let j = rx.recv().unwrap();
86//! assert!(0 <= j && j < 10);
87//! }
88//! ```
89//!
90//! Propagating panics:
91//!
92//! ```
93//! use std::sync::mpsc::channel;
94//!
95//! // The call to recv() will return an error because the channel has already
96//! // hung up (or been deallocated)
c34b1796 97//! let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
98//! drop(tx);
99//! assert!(rx.recv().is_err());
100//! ```
101//!
102//! Synchronous channels:
103//!
104//! ```
85aaf69f 105//! use std::thread;
1a4d82fc
JJ
106//! use std::sync::mpsc::sync_channel;
107//!
c34b1796 108//! let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 109//! thread::spawn(move|| {
bd371182 110//! // This will wait for the parent thread to start receiving
1a4d82fc
JJ
111//! tx.send(53).unwrap();
112//! });
113//! rx.recv().unwrap();
114//! ```
1a4d82fc 115
85aaf69f 116#![stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
117
118// A description of how Rust's channel implementation works
119//
120// Channels are supposed to be the basic building block for all other
121// concurrent primitives that are used in Rust. As a result, the channel type
122// needs to be highly optimized, flexible, and broad enough for use everywhere.
123//
124// The choice of implementation of all channels is to be built on lock-free data
125// structures. The channels themselves are then consequently also lock-free data
126// structures. As always with lock-free code, this is a very "here be dragons"
127// territory, especially because I'm unaware of any academic papers that have
128// gone into great length about channels of these flavors.
129//
130// ## Flavors of channels
131//
132// From the perspective of a consumer of this library, there is only one flavor
133// of channel. This channel can be used as a stream and cloned to allow multiple
134// senders. Under the hood, however, there are actually three flavors of
135// channels in play.
136//
3157f602
XL
137// * Flavor::Oneshots - these channels are highly optimized for the one-send use
138// case. They contain as few atomics as possible and
139// involve one and exactly one allocation.
1a4d82fc
JJ
140// * Streams - these channels are optimized for the non-shared use case. They
141// use a different concurrent queue that is more tailored for this
142// use case. The initial allocation of this flavor of channel is not
143// optimized.
144// * Shared - this is the most general form of channel that this module offers,
145// a channel with multiple senders. This type is as optimized as it
146// can be, but the previous two types mentioned are much faster for
147// their use-cases.
148//
149// ## Concurrent queues
150//
3157f602
XL
151// The basic idea of Rust's Sender/Receiver types is that send() never blocks,
152// but recv() obviously blocks. This means that under the hood there must be
153// some shared and concurrent queue holding all of the actual data.
1a4d82fc
JJ
154//
155// With two flavors of channels, two flavors of queues are also used. We have
156// chosen to use queues from a well-known author that are abbreviated as SPSC
157// and MPSC (single producer, single consumer and multiple producer, single
158// consumer). SPSC queues are used for streams while MPSC queues are used for
159// shared channels.
160//
161// ### SPSC optimizations
162//
163// The SPSC queue found online is essentially a linked list of nodes where one
164// half of the nodes are the "queue of data" and the other half of nodes are a
165// cache of unused nodes. The unused nodes are used such that an allocation is
166// not required on every push() and a free doesn't need to happen on every
167// pop().
168//
169// As found online, however, the cache of nodes is of an infinite size. This
170// means that if a channel at one point in its life had 50k items in the queue,
171// then the queue will always have the capacity for 50k items. I believed that
172// this was an unnecessary limitation of the implementation, so I have altered
173// the queue to optionally have a bound on the cache size.
174//
175// By default, streams will have an unbounded SPSC queue with a small-ish cache
176// size. The hope is that the cache is still large enough to have very fast
177// send() operations while not too large such that millions of channels can
178// coexist at once.
179//
180// ### MPSC optimizations
181//
182// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183// a linked list under the hood to earn its unboundedness, but I have not put
184// forth much effort into having a cache of nodes similar to the SPSC queue.
185//
186// For now, I believe that this is "ok" because shared channels are not the most
187// common type, but soon we may wish to revisit this queue choice and determine
188// another candidate for backend storage of shared channels.
189//
190// ## Overview of the Implementation
191//
192// Now that there's a little background on the concurrent queues used, it's
193// worth going into much more detail about the channels themselves. The basic
194// pseudocode for a send/recv are:
195//
196//
197// send(t) recv()
198// queue.push(t) return if queue.pop()
199// if increment() == -1 deschedule {
200// wakeup() if decrement() > 0
201// cancel_deschedule()
202// }
203// queue.pop()
204//
205// As mentioned before, there are no locks in this implementation, only atomic
206// instructions are used.
207//
208// ### The internal atomic counter
209//
210// Every channel has a shared counter with each half to keep track of the size
211// of the queue. This counter is used to abort descheduling by the receiver and
212// to know when to wake up on the sending side.
213//
214// As seen in the pseudocode, senders will increment this count and receivers
215// will decrement the count. The theory behind this is that if a sender sees a
216// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217// then it doesn't need to block.
218//
219// The recv() method has a beginning call to pop(), and if successful, it needs
220// to decrement the count. It is a crucial implementation detail that this
221// decrement does *not* happen to the shared counter. If this were the case,
222// then it would be possible for the counter to be very negative when there were
223// no receivers waiting, in which case the senders would have to determine when
224// it was actually appropriate to wake up a receiver.
225//
226// Instead, the "steal count" is kept track of separately (not atomically
227// because it's only used by receivers), and then the decrement() call when
228// descheduling will lump in all of the recent steals into one large decrement.
229//
230// The implication of this is that if a sender sees a -1 count, then there's
231// guaranteed to be a waiter waiting!
232//
233// ## Native Implementation
234//
235// A major goal of these channels is to work seamlessly on and off the runtime.
236// All of the previous race conditions have been worded in terms of
237// scheduler-isms (which is obviously not available without the runtime).
238//
239// For now, native usage of channels (off the runtime) will fall back onto
240// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241// is still entirely lock-free, the "deschedule" blocks above are surrounded by
242// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243// condition variable.
244//
245// ## Select
246//
247// Being able to support selection over channels has greatly influenced this
248// design, and not only does selection need to work inside the runtime, but also
249// outside the runtime.
250//
251// The implementation is fairly straightforward. The goal of select() is not to
252// return some data, but only to return which channel can receive data without
253// blocking. The implementation is essentially the entire blocking procedure
254// followed by an increment as soon as its woken up. The cancellation procedure
255// involves an increment and swapping out of to_wake to acquire ownership of the
bd371182 256// thread to unblock.
1a4d82fc
JJ
257//
258// Sadly this current implementation requires multiple allocations, so I have
259// seen the throughput of select() be much worse than it should be. I do not
260// believe that there is anything fundamental that needs to change about these
261// channels, however, in order to support a more efficient select().
262//
263// # Conclusion
264//
265// And now that you've seen all the races that I found and attempted to fix,
266// here's the code for you to find some more!
267
1a4d82fc 268use sync::Arc;
c34b1796 269use error;
1a4d82fc 270use fmt;
1a4d82fc
JJ
271use mem;
272use cell::UnsafeCell;
3157f602 273use time::{Duration, Instant};
1a4d82fc 274
92a42be0 275#[unstable(feature = "mpsc_select", issue = "27800")]
1a4d82fc
JJ
276pub use self::select::{Select, Handle};
277use self::select::StartResult;
278use self::select::StartResult::*;
279use self::blocking::SignalToken;
280
281mod blocking;
282mod oneshot;
283mod select;
284mod shared;
285mod stream;
286mod sync;
287mod mpsc_queue;
288mod spsc_queue;
289
290/// The receiving-half of Rust's channel type. This half can only be owned by
bd371182 291/// one thread
85aaf69f 292#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
293pub struct Receiver<T> {
294 inner: UnsafeCell<Flavor<T>>,
295}
296
297// The receiver port can be sent from place to place, so long as it
298// is not used to receive non-sendable things.
92a42be0 299#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 300unsafe impl<T: Send> Send for Receiver<T> { }
1a4d82fc 301
54a0048b
SL
302#[stable(feature = "rust1", since = "1.0.0")]
303impl<T> !Sync for Receiver<T> { }
304
1a4d82fc
JJ
305/// An iterator over messages on a receiver, this iterator will block
306/// whenever `next` is called, waiting for a new message, and `None` will be
307/// returned when the corresponding channel has hung up.
85aaf69f 308#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 309pub struct Iter<'a, T: 'a> {
1a4d82fc
JJ
310 rx: &'a Receiver<T>
311}
312
5bcae85e
SL
313/// An iterator that attempts to yield all pending values for a receiver.
314/// `None` will be returned when there are no pending values remaining or
315/// if the corresponding channel has hung up.
316///
317/// This Iterator will never block the caller in order to wait for data to
318/// become available. Instead, it will return `None`.
319#[unstable(feature = "receiver_try_iter", issue = "34931")]
320pub struct TryIter<'a, T: 'a> {
321 rx: &'a Receiver<T>
322}
323
d9579d0f
AL
324/// An owning iterator over messages on a receiver, this iterator will block
325/// whenever `next` is called, waiting for a new message, and `None` will be
326/// returned when the corresponding channel has hung up.
327#[stable(feature = "receiver_into_iter", since = "1.1.0")]
328pub struct IntoIter<T> {
329 rx: Receiver<T>
330}
331
1a4d82fc 332/// The sending-half of Rust's asynchronous channel type. This half can only be
bd371182 333/// owned by one thread, but it can be cloned to send to other threads.
85aaf69f 334#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
335pub struct Sender<T> {
336 inner: UnsafeCell<Flavor<T>>,
337}
338
339// The send port can be sent from place to place, so long as it
340// is not used to send non-sendable things.
92a42be0 341#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 342unsafe impl<T: Send> Send for Sender<T> { }
1a4d82fc 343
54a0048b
SL
344#[stable(feature = "rust1", since = "1.0.0")]
345impl<T> !Sync for Sender<T> { }
346
1a4d82fc 347/// The sending-half of Rust's synchronous channel type. This half can only be
bd371182 348/// owned by one thread, but it can be cloned to send to other threads.
85aaf69f 349#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 350pub struct SyncSender<T> {
85aaf69f 351 inner: Arc<UnsafeCell<sync::Packet<T>>>,
1a4d82fc
JJ
352}
353
92a42be0 354#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 355unsafe impl<T: Send> Send for SyncSender<T> {}
85aaf69f 356
92a42be0 357#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
358impl<T> !Sync for SyncSender<T> {}
359
1a4d82fc
JJ
360/// An error returned from the `send` function on channels.
361///
362/// A `send` operation can only fail if the receiving end of a channel is
363/// disconnected, implying that the data could never be received. The error
364/// contains the data being sent as a payload so it can be recovered.
85aaf69f
SL
365#[stable(feature = "rust1", since = "1.0.0")]
366#[derive(PartialEq, Eq, Clone, Copy)]
c34b1796 367pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
1a4d82fc
JJ
368
369/// An error returned from the `recv` function on a `Receiver`.
370///
371/// The `recv` operation can only fail if the sending half of a channel is
372/// disconnected, implying that no further messages will ever be received.
85aaf69f
SL
373#[derive(PartialEq, Eq, Clone, Copy, Debug)]
374#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
375pub struct RecvError;
376
9346a6ac
AL
377/// This enumeration is the list of the possible reasons that `try_recv` could
378/// not return data when called.
85aaf69f
SL
379#[derive(PartialEq, Eq, Clone, Copy, Debug)]
380#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
381pub enum TryRecvError {
382 /// This channel is currently empty, but the sender(s) have not yet
383 /// disconnected, so data may yet become available.
85aaf69f 384 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
385 Empty,
386
387 /// This channel's sending half has become disconnected, and there will
388 /// never be any more data received on this channel
85aaf69f 389 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
390 Disconnected,
391}
392
3157f602
XL
393/// This enumeration is the list of possible errors that `recv_timeout` could
394/// not return data when called.
395#[derive(PartialEq, Eq, Clone, Copy, Debug)]
5bcae85e 396#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602
XL
397pub enum RecvTimeoutError {
398 /// This channel is currently empty, but the sender(s) have not yet
399 /// disconnected, so data may yet become available.
5bcae85e 400 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602
XL
401 Timeout,
402 /// This channel's sending half has become disconnected, and there will
403 /// never be any more data received on this channel
5bcae85e 404 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602
XL
405 Disconnected,
406}
407
1a4d82fc
JJ
408/// This enumeration is the list of the possible error outcomes for the
409/// `SyncSender::try_send` method.
85aaf69f
SL
410#[stable(feature = "rust1", since = "1.0.0")]
411#[derive(PartialEq, Eq, Clone, Copy)]
1a4d82fc
JJ
412pub enum TrySendError<T> {
413 /// The data could not be sent on the channel because it would require that
414 /// the callee block to send the data.
415 ///
416 /// If this is a buffered channel, then the buffer is full at this time. If
417 /// this is not a buffered channel, then there is no receiver available to
418 /// acquire the data.
85aaf69f 419 #[stable(feature = "rust1", since = "1.0.0")]
7453a54e 420 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
1a4d82fc
JJ
421
422 /// This channel's receiving half has disconnected, so the data could not be
423 /// sent. The data is returned back to the callee in this case.
85aaf69f 424 #[stable(feature = "rust1", since = "1.0.0")]
7453a54e 425 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
1a4d82fc
JJ
426}
427
428enum Flavor<T> {
85aaf69f
SL
429 Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
430 Stream(Arc<UnsafeCell<stream::Packet<T>>>),
431 Shared(Arc<UnsafeCell<shared::Packet<T>>>),
432 Sync(Arc<UnsafeCell<sync::Packet<T>>>),
1a4d82fc
JJ
433}
434
435#[doc(hidden)]
436trait UnsafeFlavor<T> {
e9174d1e 437 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
7453a54e 438 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
1a4d82fc
JJ
439 &mut *self.inner_unsafe().get()
440 }
7453a54e 441 unsafe fn inner(&self) -> &Flavor<T> {
1a4d82fc
JJ
442 &*self.inner_unsafe().get()
443 }
444}
445impl<T> UnsafeFlavor<T> for Sender<T> {
e9174d1e 446 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
1a4d82fc
JJ
447 &self.inner
448 }
449}
450impl<T> UnsafeFlavor<T> for Receiver<T> {
e9174d1e 451 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
1a4d82fc
JJ
452 &self.inner
453 }
454}
455
456/// Creates a new asynchronous channel, returning the sender/receiver halves.
457///
458/// All data sent on the sender will become available on the receiver, and no
bd371182 459/// send will block the calling thread (this channel has an "infinite buffer").
1a4d82fc 460///
c34b1796 461/// # Examples
1a4d82fc
JJ
462///
463/// ```
464/// use std::sync::mpsc::channel;
85aaf69f 465/// use std::thread;
1a4d82fc 466///
b039eaaf 467/// // tx is the sending half (tx for transmission), and rx is the receiving
1a4d82fc
JJ
468/// // half (rx for receiving).
469/// let (tx, rx) = channel();
470///
471/// // Spawn off an expensive computation
85aaf69f 472/// thread::spawn(move|| {
1a4d82fc
JJ
473/// # fn expensive_computation() {}
474/// tx.send(expensive_computation()).unwrap();
475/// });
476///
477/// // Do some useful work for awhile
478///
479/// // Let's see what that answer was
480/// println!("{:?}", rx.recv().unwrap());
481/// ```
85aaf69f 482#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 483pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
85aaf69f 484 let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
1a4d82fc
JJ
485 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
486}
487
488/// Creates a new synchronous, bounded channel.
489///
490/// Like asynchronous channels, the `Receiver` will block until a message
491/// becomes available. These channels differ greatly in the semantics of the
492/// sender from asynchronous channels, however.
493///
494/// This channel has an internal buffer on which messages will be queued. When
495/// the internal buffer becomes full, future sends will *block* waiting for the
496/// buffer to open up. Note that a buffer size of 0 is valid, in which case this
497/// becomes "rendezvous channel" where each send will not return until a recv
498/// is paired with it.
499///
500/// As with asynchronous channels, all senders will panic in `send` if the
501/// `Receiver` has been destroyed.
502///
c34b1796 503/// # Examples
1a4d82fc
JJ
504///
505/// ```
506/// use std::sync::mpsc::sync_channel;
85aaf69f 507/// use std::thread;
1a4d82fc
JJ
508///
509/// let (tx, rx) = sync_channel(1);
510///
511/// // this returns immediately
85aaf69f 512/// tx.send(1).unwrap();
1a4d82fc 513///
85aaf69f 514/// thread::spawn(move|| {
1a4d82fc 515/// // this will block until the previous message has been received
85aaf69f 516/// tx.send(2).unwrap();
1a4d82fc
JJ
517/// });
518///
85aaf69f
SL
519/// assert_eq!(rx.recv().unwrap(), 1);
520/// assert_eq!(rx.recv().unwrap(), 2);
1a4d82fc 521/// ```
85aaf69f 522#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 523pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
85aaf69f 524 let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
1a4d82fc
JJ
525 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
526}
527
528////////////////////////////////////////////////////////////////////////////////
529// Sender
530////////////////////////////////////////////////////////////////////////////////
531
c34b1796 532impl<T> Sender<T> {
1a4d82fc
JJ
533 fn new(inner: Flavor<T>) -> Sender<T> {
534 Sender {
535 inner: UnsafeCell::new(inner),
536 }
537 }
538
539 /// Attempts to send a value on this channel, returning it back if it could
540 /// not be sent.
541 ///
542 /// A successful send occurs when it is determined that the other end of
543 /// the channel has not hung up already. An unsuccessful send would be one
544 /// where the corresponding receiver has already been deallocated. Note
545 /// that a return value of `Err` means that the data will never be
546 /// received, but a return value of `Ok` does *not* mean that the data
547 /// will be received. It is possible for the corresponding receiver to
548 /// hang up immediately after this function returns `Ok`.
549 ///
550 /// This method will never block the current thread.
551 ///
c34b1796 552 /// # Examples
1a4d82fc
JJ
553 ///
554 /// ```
555 /// use std::sync::mpsc::channel;
556 ///
557 /// let (tx, rx) = channel();
558 ///
559 /// // This send is always successful
85aaf69f 560 /// tx.send(1).unwrap();
1a4d82fc
JJ
561 ///
562 /// // This send will fail because the receiver is gone
563 /// drop(rx);
a7813a04 564 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
1a4d82fc 565 /// ```
85aaf69f 566 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
567 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
568 let (new_inner, ret) = match *unsafe { self.inner() } {
569 Flavor::Oneshot(ref p) => {
570 unsafe {
571 let p = p.get();
572 if !(*p).sent() {
573 return (*p).send(t).map_err(SendError);
574 } else {
575 let a =
85aaf69f 576 Arc::new(UnsafeCell::new(stream::Packet::new()));
1a4d82fc
JJ
577 let rx = Receiver::new(Flavor::Stream(a.clone()));
578 match (*p).upgrade(rx) {
579 oneshot::UpSuccess => {
580 let ret = (*a.get()).send(t);
581 (a, ret)
582 }
583 oneshot::UpDisconnected => (a, Err(t)),
584 oneshot::UpWoke(token) => {
585 // This send cannot panic because the thread is
586 // asleep (we're looking at it), so the receiver
587 // can't go away.
588 (*a.get()).send(t).ok().unwrap();
c34b1796 589 token.signal();
1a4d82fc
JJ
590 (a, Ok(()))
591 }
592 }
593 }
594 }
595 }
596 Flavor::Stream(ref p) => return unsafe {
597 (*p.get()).send(t).map_err(SendError)
598 },
599 Flavor::Shared(ref p) => return unsafe {
600 (*p.get()).send(t).map_err(SendError)
601 },
602 Flavor::Sync(..) => unreachable!(),
603 };
604
605 unsafe {
606 let tmp = Sender::new(Flavor::Stream(new_inner));
607 mem::swap(self.inner_mut(), tmp.inner_mut());
608 }
609 ret.map_err(SendError)
610 }
611}
612
85aaf69f 613#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 614impl<T> Clone for Sender<T> {
1a4d82fc
JJ
615 fn clone(&self) -> Sender<T> {
616 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
617 Flavor::Oneshot(ref p) => {
85aaf69f 618 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
1a4d82fc
JJ
619 unsafe {
620 let guard = (*a.get()).postinit_lock();
621 let rx = Receiver::new(Flavor::Shared(a.clone()));
622 match (*p.get()).upgrade(rx) {
623 oneshot::UpSuccess |
624 oneshot::UpDisconnected => (a, None, guard),
625 oneshot::UpWoke(task) => (a, Some(task), guard)
626 }
627 }
628 }
629 Flavor::Stream(ref p) => {
85aaf69f 630 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
1a4d82fc
JJ
631 unsafe {
632 let guard = (*a.get()).postinit_lock();
633 let rx = Receiver::new(Flavor::Shared(a.clone()));
634 match (*p.get()).upgrade(rx) {
635 stream::UpSuccess |
636 stream::UpDisconnected => (a, None, guard),
637 stream::UpWoke(task) => (a, Some(task), guard),
638 }
639 }
640 }
641 Flavor::Shared(ref p) => {
642 unsafe { (*p.get()).clone_chan(); }
643 return Sender::new(Flavor::Shared(p.clone()));
644 }
645 Flavor::Sync(..) => unreachable!(),
646 };
647
648 unsafe {
649 (*packet.get()).inherit_blocker(sleeper, guard);
650
651 let tmp = Sender::new(Flavor::Shared(packet.clone()));
652 mem::swap(self.inner_mut(), tmp.inner_mut());
653 }
654 Sender::new(Flavor::Shared(packet))
655 }
656}
657
85aaf69f 658#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 659impl<T> Drop for Sender<T> {
1a4d82fc
JJ
660 fn drop(&mut self) {
661 match *unsafe { self.inner_mut() } {
662 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
663 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
664 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
665 Flavor::Sync(..) => unreachable!(),
666 }
667 }
668}
669
7453a54e
SL
670#[stable(feature = "mpsc_debug", since = "1.7.0")]
671impl<T> fmt::Debug for Sender<T> {
672 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
673 write!(f, "Sender {{ .. }}")
674 }
675}
676
1a4d82fc
JJ
677////////////////////////////////////////////////////////////////////////////////
678// SyncSender
679////////////////////////////////////////////////////////////////////////////////
680
c34b1796 681impl<T> SyncSender<T> {
85aaf69f
SL
682 fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
683 SyncSender { inner: inner }
1a4d82fc
JJ
684 }
685
686 /// Sends a value on this synchronous channel.
687 ///
688 /// This function will *block* until space in the internal buffer becomes
689 /// available or a receiver is available to hand off the message to.
690 ///
691 /// Note that a successful send does *not* guarantee that the receiver will
692 /// ever see the data if there is a buffer on this channel. Items may be
693 /// enqueued in the internal buffer for the receiver to receive at a later
694 /// time. If the buffer size is 0, however, it can be guaranteed that the
695 /// receiver has indeed received the data if this function returns success.
696 ///
697 /// This function will never panic, but it may return `Err` if the
698 /// `Receiver` has disconnected and is no longer able to receive
699 /// information.
85aaf69f 700 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
701 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
702 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
703 }
704
705 /// Attempts to send a value on this channel without blocking.
706 ///
707 /// This method differs from `send` by returning immediately if the
708 /// channel's buffer is full or no receiver is waiting to acquire some
709 /// data. Compared with `send`, this function has two failure cases
710 /// instead of one (one for disconnection, one for a full buffer).
711 ///
712 /// See `SyncSender::send` for notes about guarantees of whether the
713 /// receiver has received the data or not if this function is successful.
85aaf69f 714 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
715 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
716 unsafe { (*self.inner.get()).try_send(t) }
717 }
718}
719
85aaf69f 720#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 721impl<T> Clone for SyncSender<T> {
1a4d82fc
JJ
722 fn clone(&self) -> SyncSender<T> {
723 unsafe { (*self.inner.get()).clone_chan(); }
e9174d1e 724 SyncSender::new(self.inner.clone())
1a4d82fc
JJ
725 }
726}
727
85aaf69f 728#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 729impl<T> Drop for SyncSender<T> {
1a4d82fc
JJ
730 fn drop(&mut self) {
731 unsafe { (*self.inner.get()).drop_chan(); }
732 }
733}
734
7453a54e
SL
735#[stable(feature = "mpsc_debug", since = "1.7.0")]
736impl<T> fmt::Debug for SyncSender<T> {
737 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
738 write!(f, "SyncSender {{ .. }}")
739 }
740}
741
1a4d82fc
JJ
742////////////////////////////////////////////////////////////////////////////////
743// Receiver
744////////////////////////////////////////////////////////////////////////////////
745
c34b1796 746impl<T> Receiver<T> {
1a4d82fc
JJ
747 fn new(inner: Flavor<T>) -> Receiver<T> {
748 Receiver { inner: UnsafeCell::new(inner) }
749 }
750
751 /// Attempts to return a pending value on this receiver without blocking
752 ///
753 /// This method will never block the caller in order to wait for data to
754 /// become available. Instead, this will always return immediately with a
755 /// possible option of pending data on the channel.
756 ///
757 /// This is useful for a flavor of "optimistic check" before deciding to
758 /// block on a receiver.
85aaf69f 759 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
760 pub fn try_recv(&self) -> Result<T, TryRecvError> {
761 loop {
762 let new_port = match *unsafe { self.inner() } {
763 Flavor::Oneshot(ref p) => {
764 match unsafe { (*p.get()).try_recv() } {
765 Ok(t) => return Ok(t),
766 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
767 Err(oneshot::Disconnected) => {
768 return Err(TryRecvError::Disconnected)
769 }
770 Err(oneshot::Upgraded(rx)) => rx,
771 }
772 }
773 Flavor::Stream(ref p) => {
774 match unsafe { (*p.get()).try_recv() } {
775 Ok(t) => return Ok(t),
776 Err(stream::Empty) => return Err(TryRecvError::Empty),
777 Err(stream::Disconnected) => {
778 return Err(TryRecvError::Disconnected)
779 }
780 Err(stream::Upgraded(rx)) => rx,
781 }
782 }
783 Flavor::Shared(ref p) => {
784 match unsafe { (*p.get()).try_recv() } {
785 Ok(t) => return Ok(t),
786 Err(shared::Empty) => return Err(TryRecvError::Empty),
787 Err(shared::Disconnected) => {
788 return Err(TryRecvError::Disconnected)
789 }
790 }
791 }
792 Flavor::Sync(ref p) => {
793 match unsafe { (*p.get()).try_recv() } {
794 Ok(t) => return Ok(t),
795 Err(sync::Empty) => return Err(TryRecvError::Empty),
796 Err(sync::Disconnected) => {
797 return Err(TryRecvError::Disconnected)
798 }
799 }
800 }
801 };
802 unsafe {
803 mem::swap(self.inner_mut(),
804 new_port.inner_mut());
805 }
806 }
807 }
808
9346a6ac 809 /// Attempts to wait for a value on this receiver, returning an error if the
1a4d82fc
JJ
810 /// corresponding channel has hung up.
811 ///
812 /// This function will always block the current thread if there is no data
813 /// available and it's possible for more data to be sent. Once a message is
814 /// sent to the corresponding `Sender`, then this receiver will wake up and
815 /// return that message.
816 ///
817 /// If the corresponding `Sender` has disconnected, or it disconnects while
818 /// this call is blocking, this call will wake up and return `Err` to
819 /// indicate that no more messages can ever be received on this channel.
c1a9b12d
SL
820 /// However, since channels are buffered, messages sent before the disconnect
821 /// will still be properly received.
822 ///
823 /// # Examples
824 ///
825 /// ```
826 /// use std::sync::mpsc;
827 /// use std::thread;
828 ///
829 /// let (send, recv) = mpsc::channel();
830 /// let handle = thread::spawn(move || {
831 /// send.send(1u8).unwrap();
832 /// });
833 ///
834 /// handle.join().unwrap();
835 ///
836 /// assert_eq!(Ok(1), recv.recv());
837 /// ```
838 ///
839 /// Buffering behavior:
840 ///
841 /// ```
842 /// use std::sync::mpsc;
843 /// use std::thread;
844 /// use std::sync::mpsc::RecvError;
845 ///
846 /// let (send, recv) = mpsc::channel();
847 /// let handle = thread::spawn(move || {
848 /// send.send(1u8).unwrap();
849 /// send.send(2).unwrap();
850 /// send.send(3).unwrap();
851 /// drop(send);
852 /// });
853 ///
854 /// // wait for the thread to join so we ensure the sender is dropped
855 /// handle.join().unwrap();
856 ///
857 /// assert_eq!(Ok(1), recv.recv());
858 /// assert_eq!(Ok(2), recv.recv());
859 /// assert_eq!(Ok(3), recv.recv());
860 /// assert_eq!(Err(RecvError), recv.recv());
861 /// ```
85aaf69f 862 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
863 pub fn recv(&self) -> Result<T, RecvError> {
864 loop {
865 let new_port = match *unsafe { self.inner() } {
866 Flavor::Oneshot(ref p) => {
3157f602 867 match unsafe { (*p.get()).recv(None) } {
1a4d82fc 868 Ok(t) => return Ok(t),
1a4d82fc
JJ
869 Err(oneshot::Disconnected) => return Err(RecvError),
870 Err(oneshot::Upgraded(rx)) => rx,
3157f602 871 Err(oneshot::Empty) => unreachable!(),
1a4d82fc
JJ
872 }
873 }
874 Flavor::Stream(ref p) => {
3157f602 875 match unsafe { (*p.get()).recv(None) } {
1a4d82fc 876 Ok(t) => return Ok(t),
1a4d82fc
JJ
877 Err(stream::Disconnected) => return Err(RecvError),
878 Err(stream::Upgraded(rx)) => rx,
3157f602 879 Err(stream::Empty) => unreachable!(),
1a4d82fc
JJ
880 }
881 }
882 Flavor::Shared(ref p) => {
3157f602 883 match unsafe { (*p.get()).recv(None) } {
1a4d82fc 884 Ok(t) => return Ok(t),
1a4d82fc 885 Err(shared::Disconnected) => return Err(RecvError),
3157f602 886 Err(shared::Empty) => unreachable!(),
1a4d82fc
JJ
887 }
888 }
889 Flavor::Sync(ref p) => return unsafe {
3157f602 890 (*p.get()).recv(None).map_err(|_| RecvError)
1a4d82fc
JJ
891 }
892 };
893 unsafe {
894 mem::swap(self.inner_mut(), new_port.inner_mut());
895 }
896 }
897 }
898
3157f602
XL
899 /// Attempts to wait for a value on this receiver, returning an error if the
900 /// corresponding channel has hung up, or if it waits more than `timeout`.
901 ///
902 /// This function will always block the current thread if there is no data
903 /// available and it's possible for more data to be sent. Once a message is
904 /// sent to the corresponding `Sender`, then this receiver will wake up and
905 /// return that message.
906 ///
907 /// If the corresponding `Sender` has disconnected, or it disconnects while
908 /// this call is blocking, this call will wake up and return `Err` to
909 /// indicate that no more messages can ever be received on this channel.
910 /// However, since channels are buffered, messages sent before the disconnect
911 /// will still be properly received.
912 ///
913 /// # Examples
914 ///
915 /// ```no_run
3157f602
XL
916 /// use std::sync::mpsc::{self, RecvTimeoutError};
917 /// use std::time::Duration;
918 ///
919 /// let (send, recv) = mpsc::channel::<()>();
920 ///
921 /// let timeout = Duration::from_millis(100);
922 /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout));
923 /// ```
5bcae85e 924 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602
XL
925 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
926 // Do an optimistic try_recv to avoid the performance impact of
927 // Instant::now() in the full-channel case.
928 match self.try_recv() {
929 Ok(result)
930 => Ok(result),
931 Err(TryRecvError::Disconnected)
932 => Err(RecvTimeoutError::Disconnected),
933 Err(TryRecvError::Empty)
934 => self.recv_max_until(Instant::now() + timeout)
935 }
936 }
937
938 fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
939 use self::RecvTimeoutError::*;
940
941 loop {
942 let port_or_empty = match *unsafe { self.inner() } {
943 Flavor::Oneshot(ref p) => {
944 match unsafe { (*p.get()).recv(Some(deadline)) } {
945 Ok(t) => return Ok(t),
946 Err(oneshot::Disconnected) => return Err(Disconnected),
947 Err(oneshot::Upgraded(rx)) => Some(rx),
948 Err(oneshot::Empty) => None,
949 }
950 }
951 Flavor::Stream(ref p) => {
952 match unsafe { (*p.get()).recv(Some(deadline)) } {
953 Ok(t) => return Ok(t),
954 Err(stream::Disconnected) => return Err(Disconnected),
955 Err(stream::Upgraded(rx)) => Some(rx),
956 Err(stream::Empty) => None,
957 }
958 }
959 Flavor::Shared(ref p) => {
960 match unsafe { (*p.get()).recv(Some(deadline)) } {
961 Ok(t) => return Ok(t),
962 Err(shared::Disconnected) => return Err(Disconnected),
963 Err(shared::Empty) => None,
964 }
965 }
966 Flavor::Sync(ref p) => {
967 match unsafe { (*p.get()).recv(Some(deadline)) } {
968 Ok(t) => return Ok(t),
969 Err(sync::Disconnected) => return Err(Disconnected),
970 Err(sync::Empty) => None,
971 }
972 }
973 };
974
975 if let Some(new_port) = port_or_empty {
976 unsafe {
977 mem::swap(self.inner_mut(), new_port.inner_mut());
978 }
979 }
980
981 // If we're already passed the deadline, and we're here without
982 // data, return a timeout, else try again.
983 if Instant::now() >= deadline {
984 return Err(Timeout);
985 }
986 }
987 }
988
1a4d82fc
JJ
989 /// Returns an iterator that will block waiting for messages, but never
990 /// `panic!`. It will return `None` when the channel has hung up.
85aaf69f 991 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
992 pub fn iter(&self) -> Iter<T> {
993 Iter { rx: self }
994 }
5bcae85e
SL
995
996 /// Returns an iterator that will attempt to yield all pending values.
997 /// It will return `None` if there are no more pending values or if the
998 /// channel has hung up. The iterator will never `panic!` or block the
999 /// user by waiting for values.
1000 #[unstable(feature = "receiver_try_iter", issue = "34931")]
1001 pub fn try_iter(&self) -> TryIter<T> {
1002 TryIter { rx: self }
1003 }
1004
1a4d82fc
JJ
1005}
1006
c34b1796 1007impl<T> select::Packet for Receiver<T> {
1a4d82fc
JJ
1008 fn can_recv(&self) -> bool {
1009 loop {
1010 let new_port = match *unsafe { self.inner() } {
1011 Flavor::Oneshot(ref p) => {
1012 match unsafe { (*p.get()).can_recv() } {
1013 Ok(ret) => return ret,
1014 Err(upgrade) => upgrade,
1015 }
1016 }
1017 Flavor::Stream(ref p) => {
1018 match unsafe { (*p.get()).can_recv() } {
1019 Ok(ret) => return ret,
1020 Err(upgrade) => upgrade,
1021 }
1022 }
1023 Flavor::Shared(ref p) => {
1024 return unsafe { (*p.get()).can_recv() };
1025 }
1026 Flavor::Sync(ref p) => {
1027 return unsafe { (*p.get()).can_recv() };
1028 }
1029 };
1030 unsafe {
1031 mem::swap(self.inner_mut(),
1032 new_port.inner_mut());
1033 }
1034 }
1035 }
1036
1037 fn start_selection(&self, mut token: SignalToken) -> StartResult {
1038 loop {
1039 let (t, new_port) = match *unsafe { self.inner() } {
1040 Flavor::Oneshot(ref p) => {
1041 match unsafe { (*p.get()).start_selection(token) } {
1042 oneshot::SelSuccess => return Installed,
1043 oneshot::SelCanceled => return Abort,
1044 oneshot::SelUpgraded(t, rx) => (t, rx),
1045 }
1046 }
1047 Flavor::Stream(ref p) => {
1048 match unsafe { (*p.get()).start_selection(token) } {
1049 stream::SelSuccess => return Installed,
1050 stream::SelCanceled => return Abort,
1051 stream::SelUpgraded(t, rx) => (t, rx),
1052 }
1053 }
1054 Flavor::Shared(ref p) => {
1055 return unsafe { (*p.get()).start_selection(token) };
1056 }
1057 Flavor::Sync(ref p) => {
1058 return unsafe { (*p.get()).start_selection(token) };
1059 }
1060 };
1061 token = t;
1062 unsafe {
1063 mem::swap(self.inner_mut(), new_port.inner_mut());
1064 }
1065 }
1066 }
1067
1068 fn abort_selection(&self) -> bool {
1069 let mut was_upgrade = false;
1070 loop {
1071 let result = match *unsafe { self.inner() } {
1072 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
1073 Flavor::Stream(ref p) => unsafe {
1074 (*p.get()).abort_selection(was_upgrade)
1075 },
1076 Flavor::Shared(ref p) => return unsafe {
1077 (*p.get()).abort_selection(was_upgrade)
1078 },
1079 Flavor::Sync(ref p) => return unsafe {
1080 (*p.get()).abort_selection()
1081 },
1082 };
1083 let new_port = match result { Ok(b) => return b, Err(p) => p };
1084 was_upgrade = true;
1085 unsafe {
1086 mem::swap(self.inner_mut(),
1087 new_port.inner_mut());
1088 }
1089 }
1090 }
1091}
1092
85aaf69f 1093#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 1094impl<'a, T> Iterator for Iter<'a, T> {
1a4d82fc
JJ
1095 type Item = T;
1096
1097 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1098}
1099
5bcae85e
SL
1100#[unstable(feature = "receiver_try_iter", issue = "34931")]
1101impl<'a, T> Iterator for TryIter<'a, T> {
1102 type Item = T;
1103
1104 fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1105}
1106
d9579d0f
AL
1107#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1108impl<'a, T> IntoIterator for &'a Receiver<T> {
1109 type Item = T;
1110 type IntoIter = Iter<'a, T>;
1111
1112 fn into_iter(self) -> Iter<'a, T> { self.iter() }
1113}
1114
92a42be0 1115#[stable(feature = "receiver_into_iter", since = "1.1.0")]
d9579d0f
AL
1116impl<T> Iterator for IntoIter<T> {
1117 type Item = T;
1118 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1119}
1120
1121#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1122impl <T> IntoIterator for Receiver<T> {
1123 type Item = T;
1124 type IntoIter = IntoIter<T>;
1125
1126 fn into_iter(self) -> IntoIter<T> {
1127 IntoIter { rx: self }
1128 }
1129}
1130
85aaf69f 1131#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 1132impl<T> Drop for Receiver<T> {
1a4d82fc
JJ
1133 fn drop(&mut self) {
1134 match *unsafe { self.inner_mut() } {
1135 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
1136 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
1137 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
1138 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
1139 }
1140 }
1141}
1142
7453a54e
SL
1143#[stable(feature = "mpsc_debug", since = "1.7.0")]
1144impl<T> fmt::Debug for Receiver<T> {
1145 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1146 write!(f, "Receiver {{ .. }}")
1147 }
1148}
1149
85aaf69f
SL
1150#[stable(feature = "rust1", since = "1.0.0")]
1151impl<T> fmt::Debug for SendError<T> {
1152 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1153 "SendError(..)".fmt(f)
1a4d82fc 1154 }
85aaf69f 1155}
1a4d82fc 1156
85aaf69f
SL
1157#[stable(feature = "rust1", since = "1.0.0")]
1158impl<T> fmt::Display for SendError<T> {
1159 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1160 "sending on a closed channel".fmt(f)
1a4d82fc 1161 }
1a4d82fc
JJ
1162}
1163
c34b1796 1164#[stable(feature = "rust1", since = "1.0.0")]
9e0c209e 1165impl<T: Send> error::Error for SendError<T> {
c34b1796
AL
1166 fn description(&self) -> &str {
1167 "sending on a closed channel"
1168 }
1169
1170 fn cause(&self) -> Option<&error::Error> {
1171 None
1172 }
1173}
1174
85aaf69f
SL
1175#[stable(feature = "rust1", since = "1.0.0")]
1176impl<T> fmt::Debug for TrySendError<T> {
1a4d82fc 1177 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85aaf69f
SL
1178 match *self {
1179 TrySendError::Full(..) => "Full(..)".fmt(f),
1180 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1181 }
1a4d82fc
JJ
1182 }
1183}
1184
85aaf69f
SL
1185#[stable(feature = "rust1", since = "1.0.0")]
1186impl<T> fmt::Display for TrySendError<T> {
1a4d82fc
JJ
1187 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1188 match *self {
1189 TrySendError::Full(..) => {
1190 "sending on a full channel".fmt(f)
1191 }
1192 TrySendError::Disconnected(..) => {
1193 "sending on a closed channel".fmt(f)
1194 }
1195 }
1196 }
1197}
1198
c34b1796 1199#[stable(feature = "rust1", since = "1.0.0")]
9e0c209e 1200impl<T: Send> error::Error for TrySendError<T> {
c34b1796
AL
1201
1202 fn description(&self) -> &str {
1203 match *self {
1204 TrySendError::Full(..) => {
1205 "sending on a full channel"
1206 }
1207 TrySendError::Disconnected(..) => {
1208 "sending on a closed channel"
1209 }
1210 }
1211 }
1212
1213 fn cause(&self) -> Option<&error::Error> {
1214 None
1215 }
1216}
1217
85aaf69f
SL
1218#[stable(feature = "rust1", since = "1.0.0")]
1219impl fmt::Display for RecvError {
1a4d82fc
JJ
1220 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1221 "receiving on a closed channel".fmt(f)
1222 }
1223}
1224
c34b1796
AL
1225#[stable(feature = "rust1", since = "1.0.0")]
1226impl error::Error for RecvError {
1227
1228 fn description(&self) -> &str {
1229 "receiving on a closed channel"
1230 }
1231
1232 fn cause(&self) -> Option<&error::Error> {
1233 None
1234 }
1235}
1236
85aaf69f
SL
1237#[stable(feature = "rust1", since = "1.0.0")]
1238impl fmt::Display for TryRecvError {
1a4d82fc
JJ
1239 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1240 match *self {
1241 TryRecvError::Empty => {
1242 "receiving on an empty channel".fmt(f)
1243 }
1244 TryRecvError::Disconnected => {
1245 "receiving on a closed channel".fmt(f)
1246 }
1247 }
1248 }
1249}
1250
c34b1796
AL
1251#[stable(feature = "rust1", since = "1.0.0")]
1252impl error::Error for TryRecvError {
1253
1254 fn description(&self) -> &str {
1255 match *self {
1256 TryRecvError::Empty => {
1257 "receiving on an empty channel"
1258 }
1259 TryRecvError::Disconnected => {
1260 "receiving on a closed channel"
1261 }
1262 }
1263 }
1264
1265 fn cause(&self) -> Option<&error::Error> {
1266 None
1267 }
1268}
1269
1a4d82fc 1270#[cfg(test)]
d9579d0f 1271mod tests {
c1a9b12d 1272 use env;
1a4d82fc 1273 use super::*;
85aaf69f 1274 use thread;
3157f602 1275 use time::{Duration, Instant};
1a4d82fc 1276
c34b1796 1277 pub fn stress_factor() -> usize {
85aaf69f
SL
1278 match env::var("RUST_TEST_STRESS") {
1279 Ok(val) => val.parse().unwrap(),
1280 Err(..) => 1,
1a4d82fc
JJ
1281 }
1282 }
1283
1284 #[test]
1285 fn smoke() {
c34b1796 1286 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1287 tx.send(1).unwrap();
1288 assert_eq!(rx.recv().unwrap(), 1);
1289 }
1290
1291 #[test]
1292 fn drop_full() {
c34b1796 1293 let (tx, _rx) = channel::<Box<isize>>();
85aaf69f 1294 tx.send(box 1).unwrap();
1a4d82fc
JJ
1295 }
1296
1297 #[test]
1298 fn drop_full_shared() {
c34b1796 1299 let (tx, _rx) = channel::<Box<isize>>();
1a4d82fc
JJ
1300 drop(tx.clone());
1301 drop(tx.clone());
85aaf69f 1302 tx.send(box 1).unwrap();
1a4d82fc
JJ
1303 }
1304
1305 #[test]
1306 fn smoke_shared() {
c34b1796 1307 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1308 tx.send(1).unwrap();
1309 assert_eq!(rx.recv().unwrap(), 1);
1310 let tx = tx.clone();
1311 tx.send(1).unwrap();
1312 assert_eq!(rx.recv().unwrap(), 1);
1313 }
1314
1315 #[test]
1316 fn smoke_threads() {
c34b1796 1317 let (tx, rx) = channel::<i32>();
85aaf69f 1318 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1319 tx.send(1).unwrap();
1320 });
1321 assert_eq!(rx.recv().unwrap(), 1);
1322 }
1323
1324 #[test]
1325 fn smoke_port_gone() {
c34b1796 1326 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1327 drop(rx);
1328 assert!(tx.send(1).is_err());
1329 }
1330
1331 #[test]
1332 fn smoke_shared_port_gone() {
c34b1796 1333 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1334 drop(rx);
1335 assert!(tx.send(1).is_err())
1336 }
1337
1338 #[test]
1339 fn smoke_shared_port_gone2() {
c34b1796 1340 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1341 drop(rx);
1342 let tx2 = tx.clone();
1343 drop(tx);
1344 assert!(tx2.send(1).is_err());
1345 }
1346
1347 #[test]
1348 fn port_gone_concurrent() {
c34b1796 1349 let (tx, rx) = channel::<i32>();
85aaf69f 1350 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1351 rx.recv().unwrap();
1352 });
1353 while tx.send(1).is_ok() {}
1354 }
1355
1356 #[test]
1357 fn port_gone_concurrent_shared() {
c34b1796 1358 let (tx, rx) = channel::<i32>();
1a4d82fc 1359 let tx2 = tx.clone();
85aaf69f 1360 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1361 rx.recv().unwrap();
1362 });
1363 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1364 }
1365
1366 #[test]
1367 fn smoke_chan_gone() {
c34b1796 1368 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1369 drop(tx);
1370 assert!(rx.recv().is_err());
1371 }
1372
1373 #[test]
1374 fn smoke_chan_gone_shared() {
1375 let (tx, rx) = channel::<()>();
1376 let tx2 = tx.clone();
1377 drop(tx);
1378 drop(tx2);
1379 assert!(rx.recv().is_err());
1380 }
1381
1382 #[test]
1383 fn chan_gone_concurrent() {
c34b1796 1384 let (tx, rx) = channel::<i32>();
85aaf69f 1385 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1386 tx.send(1).unwrap();
1387 tx.send(1).unwrap();
1388 });
1389 while rx.recv().is_ok() {}
1390 }
1391
1392 #[test]
1393 fn stress() {
c34b1796 1394 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1395 let t = thread::spawn(move|| {
1396 for _ in 0..10000 { tx.send(1).unwrap(); }
1a4d82fc 1397 });
85aaf69f 1398 for _ in 0..10000 {
1a4d82fc
JJ
1399 assert_eq!(rx.recv().unwrap(), 1);
1400 }
1401 t.join().ok().unwrap();
1402 }
1403
1404 #[test]
1405 fn stress_shared() {
c34b1796
AL
1406 const AMT: u32 = 10000;
1407 const NTHREADS: u32 = 8;
1408 let (tx, rx) = channel::<i32>();
1a4d82fc 1409
85aaf69f
SL
1410 let t = thread::spawn(move|| {
1411 for _ in 0..AMT * NTHREADS {
1a4d82fc
JJ
1412 assert_eq!(rx.recv().unwrap(), 1);
1413 }
1414 match rx.try_recv() {
1415 Ok(..) => panic!(),
1416 _ => {}
1417 }
1418 });
1419
85aaf69f 1420 for _ in 0..NTHREADS {
1a4d82fc 1421 let tx = tx.clone();
85aaf69f
SL
1422 thread::spawn(move|| {
1423 for _ in 0..AMT { tx.send(1).unwrap(); }
1a4d82fc
JJ
1424 });
1425 }
1426 drop(tx);
1427 t.join().ok().unwrap();
1428 }
1429
1430 #[test]
1431 fn send_from_outside_runtime() {
1432 let (tx1, rx1) = channel::<()>();
c34b1796 1433 let (tx2, rx2) = channel::<i32>();
85aaf69f 1434 let t1 = thread::spawn(move|| {
1a4d82fc 1435 tx1.send(()).unwrap();
85aaf69f 1436 for _ in 0..40 {
1a4d82fc
JJ
1437 assert_eq!(rx2.recv().unwrap(), 1);
1438 }
1439 });
1440 rx1.recv().unwrap();
85aaf69f
SL
1441 let t2 = thread::spawn(move|| {
1442 for _ in 0..40 {
1a4d82fc
JJ
1443 tx2.send(1).unwrap();
1444 }
1445 });
1446 t1.join().ok().unwrap();
1447 t2.join().ok().unwrap();
1448 }
1449
1450 #[test]
1451 fn recv_from_outside_runtime() {
c34b1796 1452 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1453 let t = thread::spawn(move|| {
1454 for _ in 0..40 {
1a4d82fc
JJ
1455 assert_eq!(rx.recv().unwrap(), 1);
1456 }
1457 });
85aaf69f 1458 for _ in 0..40 {
1a4d82fc
JJ
1459 tx.send(1).unwrap();
1460 }
1461 t.join().ok().unwrap();
1462 }
1463
1464 #[test]
1465 fn no_runtime() {
c34b1796
AL
1466 let (tx1, rx1) = channel::<i32>();
1467 let (tx2, rx2) = channel::<i32>();
85aaf69f 1468 let t1 = thread::spawn(move|| {
1a4d82fc
JJ
1469 assert_eq!(rx1.recv().unwrap(), 1);
1470 tx2.send(2).unwrap();
1471 });
85aaf69f 1472 let t2 = thread::spawn(move|| {
1a4d82fc
JJ
1473 tx1.send(1).unwrap();
1474 assert_eq!(rx2.recv().unwrap(), 2);
1475 });
1476 t1.join().ok().unwrap();
1477 t2.join().ok().unwrap();
1478 }
1479
1480 #[test]
1481 fn oneshot_single_thread_close_port_first() {
1482 // Simple test of closing without sending
c34b1796 1483 let (_tx, rx) = channel::<i32>();
1a4d82fc
JJ
1484 drop(rx);
1485 }
1486
1487 #[test]
1488 fn oneshot_single_thread_close_chan_first() {
1489 // Simple test of closing without sending
c34b1796 1490 let (tx, _rx) = channel::<i32>();
1a4d82fc
JJ
1491 drop(tx);
1492 }
1493
1494 #[test]
1495 fn oneshot_single_thread_send_port_close() {
1496 // Testing that the sender cleans up the payload if receiver is closed
c34b1796 1497 let (tx, rx) = channel::<Box<i32>>();
1a4d82fc
JJ
1498 drop(rx);
1499 assert!(tx.send(box 0).is_err());
1500 }
1501
1502 #[test]
1503 fn oneshot_single_thread_recv_chan_close() {
1504 // Receiving on a closed chan will panic
85aaf69f 1505 let res = thread::spawn(move|| {
c34b1796 1506 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1507 drop(tx);
1508 rx.recv().unwrap();
1509 }).join();
1510 // What is our res?
1511 assert!(res.is_err());
1512 }
1513
1514 #[test]
1515 fn oneshot_single_thread_send_then_recv() {
c34b1796 1516 let (tx, rx) = channel::<Box<i32>>();
1a4d82fc
JJ
1517 tx.send(box 10).unwrap();
1518 assert!(rx.recv().unwrap() == box 10);
1519 }
1520
1521 #[test]
1522 fn oneshot_single_thread_try_send_open() {
c34b1796 1523 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1524 assert!(tx.send(10).is_ok());
1525 assert!(rx.recv().unwrap() == 10);
1526 }
1527
1528 #[test]
1529 fn oneshot_single_thread_try_send_closed() {
c34b1796 1530 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1531 drop(rx);
1532 assert!(tx.send(10).is_err());
1533 }
1534
1535 #[test]
1536 fn oneshot_single_thread_try_recv_open() {
c34b1796 1537 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1538 tx.send(10).unwrap();
1539 assert!(rx.recv() == Ok(10));
1540 }
1541
1542 #[test]
1543 fn oneshot_single_thread_try_recv_closed() {
c34b1796 1544 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1545 drop(tx);
1546 assert!(rx.recv().is_err());
1547 }
1548
1549 #[test]
1550 fn oneshot_single_thread_peek_data() {
c34b1796 1551 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1552 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1553 tx.send(10).unwrap();
1554 assert_eq!(rx.try_recv(), Ok(10));
1555 }
1556
1557 #[test]
1558 fn oneshot_single_thread_peek_close() {
c34b1796 1559 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1560 drop(tx);
1561 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1562 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1563 }
1564
1565 #[test]
1566 fn oneshot_single_thread_peek_open() {
c34b1796 1567 let (_tx, rx) = channel::<i32>();
1a4d82fc
JJ
1568 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1569 }
1570
1571 #[test]
1572 fn oneshot_multi_task_recv_then_send() {
c34b1796 1573 let (tx, rx) = channel::<Box<i32>>();
85aaf69f 1574 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1575 assert!(rx.recv().unwrap() == box 10);
1576 });
1577
1578 tx.send(box 10).unwrap();
1579 }
1580
1581 #[test]
1582 fn oneshot_multi_task_recv_then_close() {
c34b1796 1583 let (tx, rx) = channel::<Box<i32>>();
85aaf69f 1584 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1585 drop(tx);
1586 });
85aaf69f 1587 let res = thread::spawn(move|| {
1a4d82fc
JJ
1588 assert!(rx.recv().unwrap() == box 10);
1589 }).join();
1590 assert!(res.is_err());
1591 }
1592
1593 #[test]
1594 fn oneshot_multi_thread_close_stress() {
85aaf69f 1595 for _ in 0..stress_factor() {
c34b1796 1596 let (tx, rx) = channel::<i32>();
85aaf69f 1597 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1598 drop(rx);
1599 });
1600 drop(tx);
1601 }
1602 }
1603
1604 #[test]
1605 fn oneshot_multi_thread_send_close_stress() {
85aaf69f 1606 for _ in 0..stress_factor() {
c34b1796 1607 let (tx, rx) = channel::<i32>();
85aaf69f 1608 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1609 drop(rx);
1610 });
85aaf69f 1611 let _ = thread::spawn(move|| {
1a4d82fc
JJ
1612 tx.send(1).unwrap();
1613 }).join();
1614 }
1615 }
1616
1617 #[test]
1618 fn oneshot_multi_thread_recv_close_stress() {
85aaf69f 1619 for _ in 0..stress_factor() {
c34b1796 1620 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1621 thread::spawn(move|| {
1622 let res = thread::spawn(move|| {
1a4d82fc
JJ
1623 rx.recv().unwrap();
1624 }).join();
1625 assert!(res.is_err());
1626 });
85aaf69f
SL
1627 let _t = thread::spawn(move|| {
1628 thread::spawn(move|| {
1a4d82fc
JJ
1629 drop(tx);
1630 });
1631 });
1632 }
1633 }
1634
1635 #[test]
1636 fn oneshot_multi_thread_send_recv_stress() {
85aaf69f 1637 for _ in 0..stress_factor() {
c34b1796 1638 let (tx, rx) = channel::<Box<isize>>();
85aaf69f
SL
1639 let _t = thread::spawn(move|| {
1640 tx.send(box 10).unwrap();
1a4d82fc 1641 });
85aaf69f 1642 assert!(rx.recv().unwrap() == box 10);
1a4d82fc
JJ
1643 }
1644 }
1645
1646 #[test]
1647 fn stream_send_recv_stress() {
85aaf69f 1648 for _ in 0..stress_factor() {
1a4d82fc
JJ
1649 let (tx, rx) = channel();
1650
1651 send(tx, 0);
1652 recv(rx, 0);
1653
c34b1796 1654 fn send(tx: Sender<Box<i32>>, i: i32) {
1a4d82fc
JJ
1655 if i == 10 { return }
1656
85aaf69f 1657 thread::spawn(move|| {
1a4d82fc
JJ
1658 tx.send(box i).unwrap();
1659 send(tx, i + 1);
1660 });
1661 }
1662
c34b1796 1663 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1a4d82fc
JJ
1664 if i == 10 { return }
1665
85aaf69f 1666 thread::spawn(move|| {
1a4d82fc
JJ
1667 assert!(rx.recv().unwrap() == box i);
1668 recv(rx, i + 1);
1669 });
1670 }
1671 }
1672 }
1673
3157f602
XL
1674 #[test]
1675 fn oneshot_single_thread_recv_timeout() {
1676 let (tx, rx) = channel();
1677 tx.send(()).unwrap();
1678 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1679 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1680 tx.send(()).unwrap();
1681 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1682 }
1683
1684 #[test]
1685 fn stress_recv_timeout_two_threads() {
1686 let (tx, rx) = channel();
1687 let stress = stress_factor() + 100;
1688 let timeout = Duration::from_millis(100);
1689
1690 thread::spawn(move || {
1691 for i in 0..stress {
1692 if i % 2 == 0 {
1693 thread::sleep(timeout * 2);
1694 }
1695 tx.send(1usize).unwrap();
1696 }
1697 });
1698
1699 let mut recv_count = 0;
1700 loop {
1701 match rx.recv_timeout(timeout) {
1702 Ok(n) => {
1703 assert_eq!(n, 1usize);
1704 recv_count += 1;
1705 }
1706 Err(RecvTimeoutError::Timeout) => continue,
1707 Err(RecvTimeoutError::Disconnected) => break,
1708 }
1709 }
1710
1711 assert_eq!(recv_count, stress);
1712 }
1713
1714 #[test]
1715 fn recv_timeout_upgrade() {
1716 let (tx, rx) = channel::<()>();
1717 let timeout = Duration::from_millis(1);
1718 let _tx_clone = tx.clone();
1719
1720 let start = Instant::now();
1721 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1722 assert!(Instant::now() >= start + timeout);
1723 }
1724
1725 #[test]
1726 fn stress_recv_timeout_shared() {
1727 let (tx, rx) = channel();
1728 let stress = stress_factor() + 100;
1729
1730 for i in 0..stress {
1731 let tx = tx.clone();
1732 thread::spawn(move || {
1733 thread::sleep(Duration::from_millis(i as u64 * 10));
1734 tx.send(1usize).unwrap();
1735 });
1736 }
1737
1738 drop(tx);
1739
1740 let mut recv_count = 0;
1741 loop {
1742 match rx.recv_timeout(Duration::from_millis(10)) {
1743 Ok(n) => {
1744 assert_eq!(n, 1usize);
1745 recv_count += 1;
1746 }
1747 Err(RecvTimeoutError::Timeout) => continue,
1748 Err(RecvTimeoutError::Disconnected) => break,
1749 }
1750 }
1751
1752 assert_eq!(recv_count, stress);
1753 }
1754
1a4d82fc
JJ
1755 #[test]
1756 fn recv_a_lot() {
1757 // Regression test that we don't run out of stack in scheduler context
1758 let (tx, rx) = channel();
85aaf69f
SL
1759 for _ in 0..10000 { tx.send(()).unwrap(); }
1760 for _ in 0..10000 { rx.recv().unwrap(); }
1a4d82fc
JJ
1761 }
1762
3157f602
XL
1763 #[test]
1764 fn shared_recv_timeout() {
1765 let (tx, rx) = channel();
1766 let total = 5;
1767 for _ in 0..total {
1768 let tx = tx.clone();
1769 thread::spawn(move|| {
1770 tx.send(()).unwrap();
1771 });
1772 }
1773
1774 for _ in 0..total { rx.recv().unwrap(); }
1775
1776 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1777 tx.send(()).unwrap();
1778 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
1779 }
1780
1a4d82fc
JJ
1781 #[test]
1782 fn shared_chan_stress() {
1783 let (tx, rx) = channel();
1784 let total = stress_factor() + 100;
85aaf69f 1785 for _ in 0..total {
1a4d82fc 1786 let tx = tx.clone();
85aaf69f 1787 thread::spawn(move|| {
1a4d82fc
JJ
1788 tx.send(()).unwrap();
1789 });
1790 }
1791
85aaf69f 1792 for _ in 0..total {
1a4d82fc
JJ
1793 rx.recv().unwrap();
1794 }
1795 }
1796
1797 #[test]
1798 fn test_nested_recv_iter() {
c34b1796
AL
1799 let (tx, rx) = channel::<i32>();
1800 let (total_tx, total_rx) = channel::<i32>();
1a4d82fc 1801
85aaf69f 1802 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1803 let mut acc = 0;
1804 for x in rx.iter() {
1805 acc += x;
1806 }
1807 total_tx.send(acc).unwrap();
1808 });
1809
1810 tx.send(3).unwrap();
1811 tx.send(1).unwrap();
1812 tx.send(2).unwrap();
1813 drop(tx);
1814 assert_eq!(total_rx.recv().unwrap(), 6);
1815 }
1816
1817 #[test]
1818 fn test_recv_iter_break() {
c34b1796 1819 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1820 let (count_tx, count_rx) = channel();
1821
85aaf69f 1822 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1823 let mut count = 0;
1824 for x in rx.iter() {
1825 if count >= 3 {
1826 break;
1827 } else {
1828 count += x;
1829 }
1830 }
1831 count_tx.send(count).unwrap();
1832 });
1833
1834 tx.send(2).unwrap();
1835 tx.send(2).unwrap();
1836 tx.send(2).unwrap();
1837 let _ = tx.send(2);
1838 drop(tx);
1839 assert_eq!(count_rx.recv().unwrap(), 4);
1840 }
1841
5bcae85e
SL
1842 #[test]
1843 fn test_recv_try_iter() {
1844 let (request_tx, request_rx) = channel();
1845 let (response_tx, response_rx) = channel();
1846
1847 // Request `x`s until we have `6`.
1848 let t = thread::spawn(move|| {
1849 let mut count = 0;
1850 loop {
1851 for x in response_rx.try_iter() {
1852 count += x;
1853 if count == 6 {
1854 return count;
1855 }
1856 }
1857 request_tx.send(()).unwrap();
1858 }
1859 });
1860
1861 for _ in request_rx.iter() {
1862 if response_tx.send(2).is_err() {
1863 break;
1864 }
1865 }
1866
1867 assert_eq!(t.join().unwrap(), 6);
1868 }
1869
d9579d0f
AL
1870 #[test]
1871 fn test_recv_into_iter_owned() {
1872 let mut iter = {
1873 let (tx, rx) = channel::<i32>();
1874 tx.send(1).unwrap();
1875 tx.send(2).unwrap();
1876
1877 rx.into_iter()
1878 };
1879 assert_eq!(iter.next().unwrap(), 1);
1880 assert_eq!(iter.next().unwrap(), 2);
1881 assert_eq!(iter.next().is_none(), true);
1882 }
1883
1884 #[test]
1885 fn test_recv_into_iter_borrowed() {
1886 let (tx, rx) = channel::<i32>();
1887 tx.send(1).unwrap();
1888 tx.send(2).unwrap();
1889 drop(tx);
1890 let mut iter = (&rx).into_iter();
1891 assert_eq!(iter.next().unwrap(), 1);
1892 assert_eq!(iter.next().unwrap(), 2);
1893 assert_eq!(iter.next().is_none(), true);
1894 }
1895
1a4d82fc
JJ
1896 #[test]
1897 fn try_recv_states() {
c34b1796 1898 let (tx1, rx1) = channel::<i32>();
1a4d82fc
JJ
1899 let (tx2, rx2) = channel::<()>();
1900 let (tx3, rx3) = channel::<()>();
85aaf69f 1901 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1902 rx2.recv().unwrap();
1903 tx1.send(1).unwrap();
1904 tx3.send(()).unwrap();
1905 rx2.recv().unwrap();
1906 drop(tx1);
1907 tx3.send(()).unwrap();
1908 });
1909
1910 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1911 tx2.send(()).unwrap();
1912 rx3.recv().unwrap();
1913 assert_eq!(rx1.try_recv(), Ok(1));
1914 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1915 tx2.send(()).unwrap();
1916 rx3.recv().unwrap();
1917 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1918 }
1919
1920 // This bug used to end up in a livelock inside of the Receiver destructor
1921 // because the internal state of the Shared packet was corrupted
1922 #[test]
1923 fn destroy_upgraded_shared_port_when_sender_still_active() {
1924 let (tx, rx) = channel();
1925 let (tx2, rx2) = channel();
85aaf69f 1926 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1927 rx.recv().unwrap(); // wait on a oneshot
1928 drop(rx); // destroy a shared
1929 tx2.send(()).unwrap();
1930 });
bd371182 1931 // make sure the other thread has gone to sleep
85aaf69f 1932 for _ in 0..5000 { thread::yield_now(); }
1a4d82fc
JJ
1933
1934 // upgrade to a shared chan and send a message
1935 let t = tx.clone();
1936 drop(tx);
1937 t.send(()).unwrap();
1938
bd371182 1939 // wait for the child thread to exit before we exit
1a4d82fc
JJ
1940 rx2.recv().unwrap();
1941 }
1942}
1943
1944#[cfg(test)]
1945mod sync_tests {
c1a9b12d 1946 use env;
85aaf69f 1947 use thread;
1a4d82fc 1948 use super::*;
3157f602 1949 use time::Duration;
1a4d82fc 1950
c34b1796 1951 pub fn stress_factor() -> usize {
85aaf69f
SL
1952 match env::var("RUST_TEST_STRESS") {
1953 Ok(val) => val.parse().unwrap(),
1954 Err(..) => 1,
1a4d82fc
JJ
1955 }
1956 }
1957
1958 #[test]
1959 fn smoke() {
c34b1796 1960 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1961 tx.send(1).unwrap();
1962 assert_eq!(rx.recv().unwrap(), 1);
1963 }
1964
1965 #[test]
1966 fn drop_full() {
c34b1796 1967 let (tx, _rx) = sync_channel::<Box<isize>>(1);
85aaf69f 1968 tx.send(box 1).unwrap();
1a4d82fc
JJ
1969 }
1970
1971 #[test]
1972 fn smoke_shared() {
c34b1796 1973 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1974 tx.send(1).unwrap();
1975 assert_eq!(rx.recv().unwrap(), 1);
1976 let tx = tx.clone();
1977 tx.send(1).unwrap();
1978 assert_eq!(rx.recv().unwrap(), 1);
1979 }
1980
3157f602
XL
1981 #[test]
1982 fn recv_timeout() {
1983 let (tx, rx) = sync_channel::<i32>(1);
1984 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
1985 tx.send(1).unwrap();
1986 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1987 }
1988
1a4d82fc
JJ
1989 #[test]
1990 fn smoke_threads() {
c34b1796 1991 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1992 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1993 tx.send(1).unwrap();
1994 });
1995 assert_eq!(rx.recv().unwrap(), 1);
1996 }
1997
1998 #[test]
1999 fn smoke_port_gone() {
c34b1796 2000 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2001 drop(rx);
2002 assert!(tx.send(1).is_err());
2003 }
2004
2005 #[test]
2006 fn smoke_shared_port_gone2() {
c34b1796 2007 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2008 drop(rx);
2009 let tx2 = tx.clone();
2010 drop(tx);
2011 assert!(tx2.send(1).is_err());
2012 }
2013
2014 #[test]
2015 fn port_gone_concurrent() {
c34b1796 2016 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2017 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2018 rx.recv().unwrap();
2019 });
2020 while tx.send(1).is_ok() {}
2021 }
2022
2023 #[test]
2024 fn port_gone_concurrent_shared() {
c34b1796 2025 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc 2026 let tx2 = tx.clone();
85aaf69f 2027 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2028 rx.recv().unwrap();
2029 });
2030 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2031 }
2032
2033 #[test]
2034 fn smoke_chan_gone() {
c34b1796 2035 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2036 drop(tx);
2037 assert!(rx.recv().is_err());
2038 }
2039
2040 #[test]
2041 fn smoke_chan_gone_shared() {
2042 let (tx, rx) = sync_channel::<()>(0);
2043 let tx2 = tx.clone();
2044 drop(tx);
2045 drop(tx2);
2046 assert!(rx.recv().is_err());
2047 }
2048
2049 #[test]
2050 fn chan_gone_concurrent() {
c34b1796 2051 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2052 thread::spawn(move|| {
1a4d82fc
JJ
2053 tx.send(1).unwrap();
2054 tx.send(1).unwrap();
2055 });
2056 while rx.recv().is_ok() {}
2057 }
2058
2059 #[test]
2060 fn stress() {
c34b1796 2061 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f
SL
2062 thread::spawn(move|| {
2063 for _ in 0..10000 { tx.send(1).unwrap(); }
1a4d82fc 2064 });
85aaf69f 2065 for _ in 0..10000 {
1a4d82fc
JJ
2066 assert_eq!(rx.recv().unwrap(), 1);
2067 }
2068 }
2069
2070 #[test]
3157f602
XL
2071 fn stress_recv_timeout_two_threads() {
2072 let (tx, rx) = sync_channel::<i32>(0);
2073
2074 thread::spawn(move|| {
2075 for _ in 0..10000 { tx.send(1).unwrap(); }
2076 });
2077
2078 let mut recv_count = 0;
2079 loop {
2080 match rx.recv_timeout(Duration::from_millis(1)) {
2081 Ok(v) => {
2082 assert_eq!(v, 1);
2083 recv_count += 1;
2084 },
2085 Err(RecvTimeoutError::Timeout) => continue,
2086 Err(RecvTimeoutError::Disconnected) => break,
2087 }
2088 }
2089
2090 assert_eq!(recv_count, 10000);
2091 }
2092
2093 #[test]
2094 fn stress_recv_timeout_shared() {
2095 const AMT: u32 = 1000;
2096 const NTHREADS: u32 = 8;
2097 let (tx, rx) = sync_channel::<i32>(0);
2098 let (dtx, drx) = sync_channel::<()>(0);
2099
2100 thread::spawn(move|| {
2101 let mut recv_count = 0;
2102 loop {
2103 match rx.recv_timeout(Duration::from_millis(10)) {
2104 Ok(v) => {
2105 assert_eq!(v, 1);
2106 recv_count += 1;
2107 },
2108 Err(RecvTimeoutError::Timeout) => continue,
2109 Err(RecvTimeoutError::Disconnected) => break,
2110 }
2111 }
2112
2113 assert_eq!(recv_count, AMT * NTHREADS);
2114 assert!(rx.try_recv().is_err());
2115
2116 dtx.send(()).unwrap();
2117 });
2118
2119 for _ in 0..NTHREADS {
2120 let tx = tx.clone();
2121 thread::spawn(move|| {
2122 for _ in 0..AMT { tx.send(1).unwrap(); }
2123 });
2124 }
2125
2126 drop(tx);
2127
2128 drx.recv().unwrap();
2129 }
2130
2131 #[test]
1a4d82fc 2132 fn stress_shared() {
c34b1796
AL
2133 const AMT: u32 = 1000;
2134 const NTHREADS: u32 = 8;
2135 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2136 let (dtx, drx) = sync_channel::<()>(0);
2137
85aaf69f
SL
2138 thread::spawn(move|| {
2139 for _ in 0..AMT * NTHREADS {
1a4d82fc
JJ
2140 assert_eq!(rx.recv().unwrap(), 1);
2141 }
2142 match rx.try_recv() {
2143 Ok(..) => panic!(),
2144 _ => {}
2145 }
2146 dtx.send(()).unwrap();
2147 });
2148
85aaf69f 2149 for _ in 0..NTHREADS {
1a4d82fc 2150 let tx = tx.clone();
85aaf69f
SL
2151 thread::spawn(move|| {
2152 for _ in 0..AMT { tx.send(1).unwrap(); }
1a4d82fc
JJ
2153 });
2154 }
2155 drop(tx);
2156 drx.recv().unwrap();
2157 }
2158
2159 #[test]
2160 fn oneshot_single_thread_close_port_first() {
2161 // Simple test of closing without sending
c34b1796 2162 let (_tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2163 drop(rx);
2164 }
2165
2166 #[test]
2167 fn oneshot_single_thread_close_chan_first() {
2168 // Simple test of closing without sending
c34b1796 2169 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2170 drop(tx);
2171 }
2172
2173 #[test]
2174 fn oneshot_single_thread_send_port_close() {
2175 // Testing that the sender cleans up the payload if receiver is closed
c34b1796 2176 let (tx, rx) = sync_channel::<Box<i32>>(0);
1a4d82fc
JJ
2177 drop(rx);
2178 assert!(tx.send(box 0).is_err());
2179 }
2180
2181 #[test]
2182 fn oneshot_single_thread_recv_chan_close() {
2183 // Receiving on a closed chan will panic
85aaf69f 2184 let res = thread::spawn(move|| {
c34b1796 2185 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2186 drop(tx);
2187 rx.recv().unwrap();
2188 }).join();
2189 // What is our res?
2190 assert!(res.is_err());
2191 }
2192
2193 #[test]
2194 fn oneshot_single_thread_send_then_recv() {
c34b1796 2195 let (tx, rx) = sync_channel::<Box<i32>>(1);
1a4d82fc
JJ
2196 tx.send(box 10).unwrap();
2197 assert!(rx.recv().unwrap() == box 10);
2198 }
2199
2200 #[test]
2201 fn oneshot_single_thread_try_send_open() {
c34b1796 2202 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2203 assert_eq!(tx.try_send(10), Ok(()));
2204 assert!(rx.recv().unwrap() == 10);
2205 }
2206
2207 #[test]
2208 fn oneshot_single_thread_try_send_closed() {
c34b1796 2209 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2210 drop(rx);
2211 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2212 }
2213
2214 #[test]
2215 fn oneshot_single_thread_try_send_closed2() {
c34b1796 2216 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2217 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2218 }
2219
2220 #[test]
2221 fn oneshot_single_thread_try_recv_open() {
c34b1796 2222 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2223 tx.send(10).unwrap();
2224 assert!(rx.recv() == Ok(10));
2225 }
2226
2227 #[test]
2228 fn oneshot_single_thread_try_recv_closed() {
c34b1796 2229 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2230 drop(tx);
2231 assert!(rx.recv().is_err());
2232 }
2233
5bcae85e
SL
2234 #[test]
2235 fn oneshot_single_thread_try_recv_closed_with_data() {
2236 let (tx, rx) = sync_channel::<i32>(1);
2237 tx.send(10).unwrap();
2238 drop(tx);
2239 assert_eq!(rx.try_recv(), Ok(10));
2240 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2241 }
2242
1a4d82fc
JJ
2243 #[test]
2244 fn oneshot_single_thread_peek_data() {
c34b1796 2245 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2246 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2247 tx.send(10).unwrap();
2248 assert_eq!(rx.try_recv(), Ok(10));
2249 }
2250
2251 #[test]
2252 fn oneshot_single_thread_peek_close() {
c34b1796 2253 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2254 drop(tx);
2255 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2256 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2257 }
2258
2259 #[test]
2260 fn oneshot_single_thread_peek_open() {
c34b1796 2261 let (_tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2262 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2263 }
2264
2265 #[test]
2266 fn oneshot_multi_task_recv_then_send() {
c34b1796 2267 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f 2268 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2269 assert!(rx.recv().unwrap() == box 10);
2270 });
2271
2272 tx.send(box 10).unwrap();
2273 }
2274
2275 #[test]
2276 fn oneshot_multi_task_recv_then_close() {
c34b1796 2277 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f 2278 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2279 drop(tx);
2280 });
85aaf69f 2281 let res = thread::spawn(move|| {
1a4d82fc
JJ
2282 assert!(rx.recv().unwrap() == box 10);
2283 }).join();
2284 assert!(res.is_err());
2285 }
2286
2287 #[test]
2288 fn oneshot_multi_thread_close_stress() {
85aaf69f 2289 for _ in 0..stress_factor() {
c34b1796 2290 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2291 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2292 drop(rx);
2293 });
2294 drop(tx);
2295 }
2296 }
2297
2298 #[test]
2299 fn oneshot_multi_thread_send_close_stress() {
85aaf69f 2300 for _ in 0..stress_factor() {
c34b1796 2301 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2302 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2303 drop(rx);
2304 });
85aaf69f 2305 let _ = thread::spawn(move || {
1a4d82fc
JJ
2306 tx.send(1).unwrap();
2307 }).join();
2308 }
2309 }
2310
2311 #[test]
2312 fn oneshot_multi_thread_recv_close_stress() {
85aaf69f 2313 for _ in 0..stress_factor() {
c34b1796 2314 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f
SL
2315 let _t = thread::spawn(move|| {
2316 let res = thread::spawn(move|| {
1a4d82fc
JJ
2317 rx.recv().unwrap();
2318 }).join();
2319 assert!(res.is_err());
2320 });
85aaf69f
SL
2321 let _t = thread::spawn(move|| {
2322 thread::spawn(move|| {
1a4d82fc
JJ
2323 drop(tx);
2324 });
2325 });
2326 }
2327 }
2328
2329 #[test]
2330 fn oneshot_multi_thread_send_recv_stress() {
85aaf69f 2331 for _ in 0..stress_factor() {
c34b1796 2332 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f
SL
2333 let _t = thread::spawn(move|| {
2334 tx.send(box 10).unwrap();
1a4d82fc 2335 });
85aaf69f 2336 assert!(rx.recv().unwrap() == box 10);
1a4d82fc
JJ
2337 }
2338 }
2339
2340 #[test]
2341 fn stream_send_recv_stress() {
85aaf69f 2342 for _ in 0..stress_factor() {
c34b1796 2343 let (tx, rx) = sync_channel::<Box<i32>>(0);
1a4d82fc
JJ
2344
2345 send(tx, 0);
2346 recv(rx, 0);
2347
c34b1796 2348 fn send(tx: SyncSender<Box<i32>>, i: i32) {
1a4d82fc
JJ
2349 if i == 10 { return }
2350
85aaf69f 2351 thread::spawn(move|| {
1a4d82fc
JJ
2352 tx.send(box i).unwrap();
2353 send(tx, i + 1);
2354 });
2355 }
2356
c34b1796 2357 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1a4d82fc
JJ
2358 if i == 10 { return }
2359
85aaf69f 2360 thread::spawn(move|| {
1a4d82fc
JJ
2361 assert!(rx.recv().unwrap() == box i);
2362 recv(rx, i + 1);
2363 });
2364 }
2365 }
2366 }
2367
2368 #[test]
2369 fn recv_a_lot() {
2370 // Regression test that we don't run out of stack in scheduler context
2371 let (tx, rx) = sync_channel(10000);
85aaf69f
SL
2372 for _ in 0..10000 { tx.send(()).unwrap(); }
2373 for _ in 0..10000 { rx.recv().unwrap(); }
1a4d82fc
JJ
2374 }
2375
2376 #[test]
2377 fn shared_chan_stress() {
2378 let (tx, rx) = sync_channel(0);
2379 let total = stress_factor() + 100;
85aaf69f 2380 for _ in 0..total {
1a4d82fc 2381 let tx = tx.clone();
85aaf69f 2382 thread::spawn(move|| {
1a4d82fc
JJ
2383 tx.send(()).unwrap();
2384 });
2385 }
2386
85aaf69f 2387 for _ in 0..total {
1a4d82fc
JJ
2388 rx.recv().unwrap();
2389 }
2390 }
2391
2392 #[test]
2393 fn test_nested_recv_iter() {
c34b1796
AL
2394 let (tx, rx) = sync_channel::<i32>(0);
2395 let (total_tx, total_rx) = sync_channel::<i32>(0);
1a4d82fc 2396
85aaf69f 2397 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2398 let mut acc = 0;
2399 for x in rx.iter() {
2400 acc += x;
2401 }
2402 total_tx.send(acc).unwrap();
2403 });
2404
2405 tx.send(3).unwrap();
2406 tx.send(1).unwrap();
2407 tx.send(2).unwrap();
2408 drop(tx);
2409 assert_eq!(total_rx.recv().unwrap(), 6);
2410 }
2411
2412 #[test]
2413 fn test_recv_iter_break() {
c34b1796 2414 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2415 let (count_tx, count_rx) = sync_channel(0);
2416
85aaf69f 2417 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2418 let mut count = 0;
2419 for x in rx.iter() {
2420 if count >= 3 {
2421 break;
2422 } else {
2423 count += x;
2424 }
2425 }
2426 count_tx.send(count).unwrap();
2427 });
2428
2429 tx.send(2).unwrap();
2430 tx.send(2).unwrap();
2431 tx.send(2).unwrap();
2432 let _ = tx.try_send(2);
2433 drop(tx);
2434 assert_eq!(count_rx.recv().unwrap(), 4);
2435 }
2436
2437 #[test]
2438 fn try_recv_states() {
c34b1796 2439 let (tx1, rx1) = sync_channel::<i32>(1);
1a4d82fc
JJ
2440 let (tx2, rx2) = sync_channel::<()>(1);
2441 let (tx3, rx3) = sync_channel::<()>(1);
85aaf69f 2442 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2443 rx2.recv().unwrap();
2444 tx1.send(1).unwrap();
2445 tx3.send(()).unwrap();
2446 rx2.recv().unwrap();
2447 drop(tx1);
2448 tx3.send(()).unwrap();
2449 });
2450
2451 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2452 tx2.send(()).unwrap();
2453 rx3.recv().unwrap();
2454 assert_eq!(rx1.try_recv(), Ok(1));
2455 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2456 tx2.send(()).unwrap();
2457 rx3.recv().unwrap();
2458 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2459 }
2460
2461 // This bug used to end up in a livelock inside of the Receiver destructor
2462 // because the internal state of the Shared packet was corrupted
2463 #[test]
2464 fn destroy_upgraded_shared_port_when_sender_still_active() {
2465 let (tx, rx) = sync_channel::<()>(0);
2466 let (tx2, rx2) = sync_channel::<()>(0);
85aaf69f 2467 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2468 rx.recv().unwrap(); // wait on a oneshot
2469 drop(rx); // destroy a shared
2470 tx2.send(()).unwrap();
2471 });
bd371182 2472 // make sure the other thread has gone to sleep
85aaf69f 2473 for _ in 0..5000 { thread::yield_now(); }
1a4d82fc
JJ
2474
2475 // upgrade to a shared chan and send a message
2476 let t = tx.clone();
2477 drop(tx);
2478 t.send(()).unwrap();
2479
bd371182 2480 // wait for the child thread to exit before we exit
1a4d82fc
JJ
2481 rx2.recv().unwrap();
2482 }
2483
2484 #[test]
2485 fn send1() {
c34b1796 2486 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2487 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
1a4d82fc
JJ
2488 assert_eq!(tx.send(1), Ok(()));
2489 }
2490
2491 #[test]
2492 fn send2() {
c34b1796 2493 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2494 let _t = thread::spawn(move|| { drop(rx); });
1a4d82fc
JJ
2495 assert!(tx.send(1).is_err());
2496 }
2497
2498 #[test]
2499 fn send3() {
c34b1796 2500 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc 2501 assert_eq!(tx.send(1), Ok(()));
85aaf69f 2502 let _t =thread::spawn(move|| { drop(rx); });
1a4d82fc
JJ
2503 assert!(tx.send(1).is_err());
2504 }
2505
2506 #[test]
2507 fn send4() {
c34b1796 2508 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2509 let tx2 = tx.clone();
2510 let (done, donerx) = channel();
2511 let done2 = done.clone();
85aaf69f 2512 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2513 assert!(tx.send(1).is_err());
2514 done.send(()).unwrap();
2515 });
85aaf69f 2516 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2517 assert!(tx2.send(2).is_err());
2518 done2.send(()).unwrap();
2519 });
2520 drop(rx);
2521 donerx.recv().unwrap();
2522 donerx.recv().unwrap();
2523 }
2524
2525 #[test]
2526 fn try_send1() {
c34b1796 2527 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2528 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2529 }
2530
2531 #[test]
2532 fn try_send2() {
c34b1796 2533 let (tx, _rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2534 assert_eq!(tx.try_send(1), Ok(()));
2535 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2536 }
2537
2538 #[test]
2539 fn try_send3() {
c34b1796 2540 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2541 assert_eq!(tx.try_send(1), Ok(()));
2542 drop(rx);
2543 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2544 }
2545
2546 #[test]
2547 fn issue_15761() {
2548 fn repro() {
2549 let (tx1, rx1) = sync_channel::<()>(3);
2550 let (tx2, rx2) = sync_channel::<()>(3);
2551
85aaf69f 2552 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2553 rx1.recv().unwrap();
2554 tx2.try_send(()).unwrap();
2555 });
2556
2557 tx1.try_send(()).unwrap();
2558 rx2.recv().unwrap();
2559 }
2560
85aaf69f 2561 for _ in 0..100 {
1a4d82fc
JJ
2562 repro()
2563 }
2564 }
7453a54e
SL
2565
2566 #[test]
2567 fn fmt_debug_sender() {
2568 let (tx, _) = channel::<i32>();
2569 assert_eq!(format!("{:?}", tx), "Sender { .. }");
2570 }
2571
2572 #[test]
2573 fn fmt_debug_recv() {
2574 let (_, rx) = channel::<i32>();
2575 assert_eq!(format!("{:?}", rx), "Receiver { .. }");
2576 }
2577
2578 #[test]
2579 fn fmt_debug_sync_sender() {
2580 let (tx, _) = sync_channel::<i32>(1);
2581 assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
2582 }
1a4d82fc 2583}