]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/mod.rs
Imported Upstream version 1.6.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mod.rs
CommitLineData
1a4d82fc
JJ
1// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
85aaf69f 11//! Multi-producer, single-consumer FIFO queue communication primitives.
1a4d82fc
JJ
12//!
13//! This module provides message-based communication over channels, concretely
14//! defined among three types:
15//!
16//! * `Sender`
17//! * `SyncSender`
18//! * `Receiver`
19//!
20//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21//! senders are clone-able (multi-producer) such that many threads can send
22//! simultaneously to one receiver (single-consumer).
23//!
24//! These channels come in two flavors:
25//!
26//! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27//! will return a `(Sender, Receiver)` tuple where all sends will be
28//! **asynchronous** (they never block). The channel conceptually has an
29//! infinite buffer.
30//!
31//! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32//! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33//! is a pre-allocated buffer of a fixed size. All sends will be
34//! **synchronous** by blocking until there is buffer space available. Note
35//! that a bound of 0 is allowed, causing the channel to become a
36//! "rendezvous" channel where each sender atomically hands off a message to
37//! a receiver.
38//!
39//! ## Disconnection
40//!
41//! The send and receive operations on channels will all return a `Result`
42//! indicating whether the operation succeeded or not. An unsuccessful operation
43//! is normally indicative of the other half of a channel having "hung up" by
44//! being dropped in its corresponding thread.
45//!
46//! Once half of a channel has been deallocated, most operations can no longer
47//! continue to make progress, so `Err` will be returned. Many applications will
48//! continue to `unwrap()` the results returned from this module, instigating a
49//! propagation of failure among threads if one unexpectedly dies.
50//!
51//! # Examples
52//!
53//! Simple usage:
54//!
55//! ```
85aaf69f 56//! use std::thread;
1a4d82fc
JJ
57//! use std::sync::mpsc::channel;
58//!
59//! // Create a simple streaming channel
60//! let (tx, rx) = channel();
85aaf69f
SL
61//! thread::spawn(move|| {
62//! tx.send(10).unwrap();
1a4d82fc 63//! });
85aaf69f 64//! assert_eq!(rx.recv().unwrap(), 10);
1a4d82fc
JJ
65//! ```
66//!
67//! Shared usage:
68//!
69//! ```
85aaf69f 70//! use std::thread;
1a4d82fc
JJ
71//! use std::sync::mpsc::channel;
72//!
73//! // Create a shared channel that can be sent along from many threads
74//! // where tx is the sending half (tx for transmission), and rx is the receiving
75//! // half (rx for receiving).
76//! let (tx, rx) = channel();
85aaf69f 77//! for i in 0..10 {
1a4d82fc 78//! let tx = tx.clone();
85aaf69f 79//! thread::spawn(move|| {
1a4d82fc
JJ
80//! tx.send(i).unwrap();
81//! });
82//! }
83//!
85aaf69f 84//! for _ in 0..10 {
1a4d82fc
JJ
85//! let j = rx.recv().unwrap();
86//! assert!(0 <= j && j < 10);
87//! }
88//! ```
89//!
90//! Propagating panics:
91//!
92//! ```
93//! use std::sync::mpsc::channel;
94//!
95//! // The call to recv() will return an error because the channel has already
96//! // hung up (or been deallocated)
c34b1796 97//! let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
98//! drop(tx);
99//! assert!(rx.recv().is_err());
100//! ```
101//!
102//! Synchronous channels:
103//!
104//! ```
85aaf69f 105//! use std::thread;
1a4d82fc
JJ
106//! use std::sync::mpsc::sync_channel;
107//!
c34b1796 108//! let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 109//! thread::spawn(move|| {
bd371182 110//! // This will wait for the parent thread to start receiving
1a4d82fc
JJ
111//! tx.send(53).unwrap();
112//! });
113//! rx.recv().unwrap();
114//! ```
1a4d82fc 115
85aaf69f 116#![stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
117
118// A description of how Rust's channel implementation works
119//
120// Channels are supposed to be the basic building block for all other
121// concurrent primitives that are used in Rust. As a result, the channel type
122// needs to be highly optimized, flexible, and broad enough for use everywhere.
123//
124// The choice of implementation of all channels is to be built on lock-free data
125// structures. The channels themselves are then consequently also lock-free data
126// structures. As always with lock-free code, this is a very "here be dragons"
127// territory, especially because I'm unaware of any academic papers that have
128// gone into great length about channels of these flavors.
129//
130// ## Flavors of channels
131//
132// From the perspective of a consumer of this library, there is only one flavor
133// of channel. This channel can be used as a stream and cloned to allow multiple
134// senders. Under the hood, however, there are actually three flavors of
135// channels in play.
136//
137// * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
138// They contain as few atomics as possible and involve one and
139// exactly one allocation.
140// * Streams - these channels are optimized for the non-shared use case. They
141// use a different concurrent queue that is more tailored for this
142// use case. The initial allocation of this flavor of channel is not
143// optimized.
144// * Shared - this is the most general form of channel that this module offers,
145// a channel with multiple senders. This type is as optimized as it
146// can be, but the previous two types mentioned are much faster for
147// their use-cases.
148//
149// ## Concurrent queues
150//
151// The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
152// recv() obviously blocks. This means that under the hood there must be some
153// shared and concurrent queue holding all of the actual data.
154//
155// With two flavors of channels, two flavors of queues are also used. We have
156// chosen to use queues from a well-known author that are abbreviated as SPSC
157// and MPSC (single producer, single consumer and multiple producer, single
158// consumer). SPSC queues are used for streams while MPSC queues are used for
159// shared channels.
160//
161// ### SPSC optimizations
162//
163// The SPSC queue found online is essentially a linked list of nodes where one
164// half of the nodes are the "queue of data" and the other half of nodes are a
165// cache of unused nodes. The unused nodes are used such that an allocation is
166// not required on every push() and a free doesn't need to happen on every
167// pop().
168//
169// As found online, however, the cache of nodes is of an infinite size. This
170// means that if a channel at one point in its life had 50k items in the queue,
171// then the queue will always have the capacity for 50k items. I believed that
172// this was an unnecessary limitation of the implementation, so I have altered
173// the queue to optionally have a bound on the cache size.
174//
175// By default, streams will have an unbounded SPSC queue with a small-ish cache
176// size. The hope is that the cache is still large enough to have very fast
177// send() operations while not too large such that millions of channels can
178// coexist at once.
179//
180// ### MPSC optimizations
181//
182// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183// a linked list under the hood to earn its unboundedness, but I have not put
184// forth much effort into having a cache of nodes similar to the SPSC queue.
185//
186// For now, I believe that this is "ok" because shared channels are not the most
187// common type, but soon we may wish to revisit this queue choice and determine
188// another candidate for backend storage of shared channels.
189//
190// ## Overview of the Implementation
191//
192// Now that there's a little background on the concurrent queues used, it's
193// worth going into much more detail about the channels themselves. The basic
194// pseudocode for a send/recv are:
195//
196//
197// send(t) recv()
198// queue.push(t) return if queue.pop()
199// if increment() == -1 deschedule {
200// wakeup() if decrement() > 0
201// cancel_deschedule()
202// }
203// queue.pop()
204//
205// As mentioned before, there are no locks in this implementation, only atomic
206// instructions are used.
207//
208// ### The internal atomic counter
209//
210// Every channel has a shared counter with each half to keep track of the size
211// of the queue. This counter is used to abort descheduling by the receiver and
212// to know when to wake up on the sending side.
213//
214// As seen in the pseudocode, senders will increment this count and receivers
215// will decrement the count. The theory behind this is that if a sender sees a
216// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217// then it doesn't need to block.
218//
219// The recv() method has a beginning call to pop(), and if successful, it needs
220// to decrement the count. It is a crucial implementation detail that this
221// decrement does *not* happen to the shared counter. If this were the case,
222// then it would be possible for the counter to be very negative when there were
223// no receivers waiting, in which case the senders would have to determine when
224// it was actually appropriate to wake up a receiver.
225//
226// Instead, the "steal count" is kept track of separately (not atomically
227// because it's only used by receivers), and then the decrement() call when
228// descheduling will lump in all of the recent steals into one large decrement.
229//
230// The implication of this is that if a sender sees a -1 count, then there's
231// guaranteed to be a waiter waiting!
232//
233// ## Native Implementation
234//
235// A major goal of these channels is to work seamlessly on and off the runtime.
236// All of the previous race conditions have been worded in terms of
237// scheduler-isms (which is obviously not available without the runtime).
238//
239// For now, native usage of channels (off the runtime) will fall back onto
240// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241// is still entirely lock-free, the "deschedule" blocks above are surrounded by
242// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243// condition variable.
244//
245// ## Select
246//
247// Being able to support selection over channels has greatly influenced this
248// design, and not only does selection need to work inside the runtime, but also
249// outside the runtime.
250//
251// The implementation is fairly straightforward. The goal of select() is not to
252// return some data, but only to return which channel can receive data without
253// blocking. The implementation is essentially the entire blocking procedure
254// followed by an increment as soon as its woken up. The cancellation procedure
255// involves an increment and swapping out of to_wake to acquire ownership of the
bd371182 256// thread to unblock.
1a4d82fc
JJ
257//
258// Sadly this current implementation requires multiple allocations, so I have
259// seen the throughput of select() be much worse than it should be. I do not
260// believe that there is anything fundamental that needs to change about these
261// channels, however, in order to support a more efficient select().
262//
263// # Conclusion
264//
265// And now that you've seen all the races that I found and attempted to fix,
266// here's the code for you to find some more!
267
1a4d82fc 268use sync::Arc;
c34b1796 269use error;
1a4d82fc 270use fmt;
1a4d82fc
JJ
271use mem;
272use cell::UnsafeCell;
bd371182 273use marker::Reflect;
1a4d82fc 274
92a42be0 275#[unstable(feature = "mpsc_select", issue = "27800")]
1a4d82fc
JJ
276pub use self::select::{Select, Handle};
277use self::select::StartResult;
278use self::select::StartResult::*;
279use self::blocking::SignalToken;
280
281mod blocking;
282mod oneshot;
283mod select;
284mod shared;
285mod stream;
286mod sync;
287mod mpsc_queue;
288mod spsc_queue;
289
290/// The receiving-half of Rust's channel type. This half can only be owned by
bd371182 291/// one thread
85aaf69f 292#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
293pub struct Receiver<T> {
294 inner: UnsafeCell<Flavor<T>>,
295}
296
297// The receiver port can be sent from place to place, so long as it
298// is not used to receive non-sendable things.
92a42be0 299#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 300unsafe impl<T: Send> Send for Receiver<T> { }
1a4d82fc
JJ
301
302/// An iterator over messages on a receiver, this iterator will block
303/// whenever `next` is called, waiting for a new message, and `None` will be
304/// returned when the corresponding channel has hung up.
85aaf69f 305#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 306pub struct Iter<'a, T: 'a> {
1a4d82fc
JJ
307 rx: &'a Receiver<T>
308}
309
d9579d0f
AL
310/// An owning iterator over messages on a receiver, this iterator will block
311/// whenever `next` is called, waiting for a new message, and `None` will be
312/// returned when the corresponding channel has hung up.
313#[stable(feature = "receiver_into_iter", since = "1.1.0")]
314pub struct IntoIter<T> {
315 rx: Receiver<T>
316}
317
1a4d82fc 318/// The sending-half of Rust's asynchronous channel type. This half can only be
bd371182 319/// owned by one thread, but it can be cloned to send to other threads.
85aaf69f 320#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
321pub struct Sender<T> {
322 inner: UnsafeCell<Flavor<T>>,
323}
324
325// The send port can be sent from place to place, so long as it
326// is not used to send non-sendable things.
92a42be0 327#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 328unsafe impl<T: Send> Send for Sender<T> { }
1a4d82fc
JJ
329
330/// The sending-half of Rust's synchronous channel type. This half can only be
bd371182 331/// owned by one thread, but it can be cloned to send to other threads.
85aaf69f 332#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 333pub struct SyncSender<T> {
85aaf69f 334 inner: Arc<UnsafeCell<sync::Packet<T>>>,
1a4d82fc
JJ
335}
336
92a42be0 337#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 338unsafe impl<T: Send> Send for SyncSender<T> {}
85aaf69f 339
92a42be0 340#[stable(feature = "rust1", since = "1.0.0")]
85aaf69f
SL
341impl<T> !Sync for SyncSender<T> {}
342
1a4d82fc
JJ
343/// An error returned from the `send` function on channels.
344///
345/// A `send` operation can only fail if the receiving end of a channel is
346/// disconnected, implying that the data could never be received. The error
347/// contains the data being sent as a payload so it can be recovered.
85aaf69f
SL
348#[stable(feature = "rust1", since = "1.0.0")]
349#[derive(PartialEq, Eq, Clone, Copy)]
c34b1796 350pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
1a4d82fc
JJ
351
352/// An error returned from the `recv` function on a `Receiver`.
353///
354/// The `recv` operation can only fail if the sending half of a channel is
355/// disconnected, implying that no further messages will ever be received.
85aaf69f
SL
356#[derive(PartialEq, Eq, Clone, Copy, Debug)]
357#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
358pub struct RecvError;
359
9346a6ac
AL
360/// This enumeration is the list of the possible reasons that `try_recv` could
361/// not return data when called.
85aaf69f
SL
362#[derive(PartialEq, Eq, Clone, Copy, Debug)]
363#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
364pub enum TryRecvError {
365 /// This channel is currently empty, but the sender(s) have not yet
366 /// disconnected, so data may yet become available.
85aaf69f 367 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
368 Empty,
369
370 /// This channel's sending half has become disconnected, and there will
371 /// never be any more data received on this channel
85aaf69f 372 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
373 Disconnected,
374}
375
376/// This enumeration is the list of the possible error outcomes for the
377/// `SyncSender::try_send` method.
85aaf69f
SL
378#[stable(feature = "rust1", since = "1.0.0")]
379#[derive(PartialEq, Eq, Clone, Copy)]
1a4d82fc
JJ
380pub enum TrySendError<T> {
381 /// The data could not be sent on the channel because it would require that
382 /// the callee block to send the data.
383 ///
384 /// If this is a buffered channel, then the buffer is full at this time. If
385 /// this is not a buffered channel, then there is no receiver available to
386 /// acquire the data.
85aaf69f 387 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
388 Full(T),
389
390 /// This channel's receiving half has disconnected, so the data could not be
391 /// sent. The data is returned back to the callee in this case.
85aaf69f 392 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
393 Disconnected(T),
394}
395
396enum Flavor<T> {
85aaf69f
SL
397 Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
398 Stream(Arc<UnsafeCell<stream::Packet<T>>>),
399 Shared(Arc<UnsafeCell<shared::Packet<T>>>),
400 Sync(Arc<UnsafeCell<sync::Packet<T>>>),
1a4d82fc
JJ
401}
402
403#[doc(hidden)]
404trait UnsafeFlavor<T> {
e9174d1e 405 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
1a4d82fc
JJ
406 unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
407 &mut *self.inner_unsafe().get()
408 }
409 unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
410 &*self.inner_unsafe().get()
411 }
412}
413impl<T> UnsafeFlavor<T> for Sender<T> {
e9174d1e 414 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
1a4d82fc
JJ
415 &self.inner
416 }
417}
418impl<T> UnsafeFlavor<T> for Receiver<T> {
e9174d1e 419 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
1a4d82fc
JJ
420 &self.inner
421 }
422}
423
424/// Creates a new asynchronous channel, returning the sender/receiver halves.
425///
426/// All data sent on the sender will become available on the receiver, and no
bd371182 427/// send will block the calling thread (this channel has an "infinite buffer").
1a4d82fc 428///
c34b1796 429/// # Examples
1a4d82fc
JJ
430///
431/// ```
432/// use std::sync::mpsc::channel;
85aaf69f 433/// use std::thread;
1a4d82fc 434///
b039eaaf 435/// // tx is the sending half (tx for transmission), and rx is the receiving
1a4d82fc
JJ
436/// // half (rx for receiving).
437/// let (tx, rx) = channel();
438///
439/// // Spawn off an expensive computation
85aaf69f 440/// thread::spawn(move|| {
1a4d82fc
JJ
441/// # fn expensive_computation() {}
442/// tx.send(expensive_computation()).unwrap();
443/// });
444///
445/// // Do some useful work for awhile
446///
447/// // Let's see what that answer was
448/// println!("{:?}", rx.recv().unwrap());
449/// ```
85aaf69f 450#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 451pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
85aaf69f 452 let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
1a4d82fc
JJ
453 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
454}
455
456/// Creates a new synchronous, bounded channel.
457///
458/// Like asynchronous channels, the `Receiver` will block until a message
459/// becomes available. These channels differ greatly in the semantics of the
460/// sender from asynchronous channels, however.
461///
462/// This channel has an internal buffer on which messages will be queued. When
463/// the internal buffer becomes full, future sends will *block* waiting for the
464/// buffer to open up. Note that a buffer size of 0 is valid, in which case this
465/// becomes "rendezvous channel" where each send will not return until a recv
466/// is paired with it.
467///
468/// As with asynchronous channels, all senders will panic in `send` if the
469/// `Receiver` has been destroyed.
470///
c34b1796 471/// # Examples
1a4d82fc
JJ
472///
473/// ```
474/// use std::sync::mpsc::sync_channel;
85aaf69f 475/// use std::thread;
1a4d82fc
JJ
476///
477/// let (tx, rx) = sync_channel(1);
478///
479/// // this returns immediately
85aaf69f 480/// tx.send(1).unwrap();
1a4d82fc 481///
85aaf69f 482/// thread::spawn(move|| {
1a4d82fc 483/// // this will block until the previous message has been received
85aaf69f 484/// tx.send(2).unwrap();
1a4d82fc
JJ
485/// });
486///
85aaf69f
SL
487/// assert_eq!(rx.recv().unwrap(), 1);
488/// assert_eq!(rx.recv().unwrap(), 2);
1a4d82fc 489/// ```
85aaf69f 490#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 491pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
85aaf69f 492 let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
1a4d82fc
JJ
493 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
494}
495
496////////////////////////////////////////////////////////////////////////////////
497// Sender
498////////////////////////////////////////////////////////////////////////////////
499
c34b1796 500impl<T> Sender<T> {
1a4d82fc
JJ
501 fn new(inner: Flavor<T>) -> Sender<T> {
502 Sender {
503 inner: UnsafeCell::new(inner),
504 }
505 }
506
507 /// Attempts to send a value on this channel, returning it back if it could
508 /// not be sent.
509 ///
510 /// A successful send occurs when it is determined that the other end of
511 /// the channel has not hung up already. An unsuccessful send would be one
512 /// where the corresponding receiver has already been deallocated. Note
513 /// that a return value of `Err` means that the data will never be
514 /// received, but a return value of `Ok` does *not* mean that the data
515 /// will be received. It is possible for the corresponding receiver to
516 /// hang up immediately after this function returns `Ok`.
517 ///
518 /// This method will never block the current thread.
519 ///
c34b1796 520 /// # Examples
1a4d82fc
JJ
521 ///
522 /// ```
523 /// use std::sync::mpsc::channel;
524 ///
525 /// let (tx, rx) = channel();
526 ///
527 /// // This send is always successful
85aaf69f 528 /// tx.send(1).unwrap();
1a4d82fc
JJ
529 ///
530 /// // This send will fail because the receiver is gone
531 /// drop(rx);
85aaf69f 532 /// assert_eq!(tx.send(1).err().unwrap().0, 1);
1a4d82fc 533 /// ```
85aaf69f 534 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
535 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
536 let (new_inner, ret) = match *unsafe { self.inner() } {
537 Flavor::Oneshot(ref p) => {
538 unsafe {
539 let p = p.get();
540 if !(*p).sent() {
541 return (*p).send(t).map_err(SendError);
542 } else {
543 let a =
85aaf69f 544 Arc::new(UnsafeCell::new(stream::Packet::new()));
1a4d82fc
JJ
545 let rx = Receiver::new(Flavor::Stream(a.clone()));
546 match (*p).upgrade(rx) {
547 oneshot::UpSuccess => {
548 let ret = (*a.get()).send(t);
549 (a, ret)
550 }
551 oneshot::UpDisconnected => (a, Err(t)),
552 oneshot::UpWoke(token) => {
553 // This send cannot panic because the thread is
554 // asleep (we're looking at it), so the receiver
555 // can't go away.
556 (*a.get()).send(t).ok().unwrap();
c34b1796 557 token.signal();
1a4d82fc
JJ
558 (a, Ok(()))
559 }
560 }
561 }
562 }
563 }
564 Flavor::Stream(ref p) => return unsafe {
565 (*p.get()).send(t).map_err(SendError)
566 },
567 Flavor::Shared(ref p) => return unsafe {
568 (*p.get()).send(t).map_err(SendError)
569 },
570 Flavor::Sync(..) => unreachable!(),
571 };
572
573 unsafe {
574 let tmp = Sender::new(Flavor::Stream(new_inner));
575 mem::swap(self.inner_mut(), tmp.inner_mut());
576 }
577 ret.map_err(SendError)
578 }
579}
580
85aaf69f 581#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 582impl<T> Clone for Sender<T> {
1a4d82fc
JJ
583 fn clone(&self) -> Sender<T> {
584 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
585 Flavor::Oneshot(ref p) => {
85aaf69f 586 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
1a4d82fc
JJ
587 unsafe {
588 let guard = (*a.get()).postinit_lock();
589 let rx = Receiver::new(Flavor::Shared(a.clone()));
590 match (*p.get()).upgrade(rx) {
591 oneshot::UpSuccess |
592 oneshot::UpDisconnected => (a, None, guard),
593 oneshot::UpWoke(task) => (a, Some(task), guard)
594 }
595 }
596 }
597 Flavor::Stream(ref p) => {
85aaf69f 598 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
1a4d82fc
JJ
599 unsafe {
600 let guard = (*a.get()).postinit_lock();
601 let rx = Receiver::new(Flavor::Shared(a.clone()));
602 match (*p.get()).upgrade(rx) {
603 stream::UpSuccess |
604 stream::UpDisconnected => (a, None, guard),
605 stream::UpWoke(task) => (a, Some(task), guard),
606 }
607 }
608 }
609 Flavor::Shared(ref p) => {
610 unsafe { (*p.get()).clone_chan(); }
611 return Sender::new(Flavor::Shared(p.clone()));
612 }
613 Flavor::Sync(..) => unreachable!(),
614 };
615
616 unsafe {
617 (*packet.get()).inherit_blocker(sleeper, guard);
618
619 let tmp = Sender::new(Flavor::Shared(packet.clone()));
620 mem::swap(self.inner_mut(), tmp.inner_mut());
621 }
622 Sender::new(Flavor::Shared(packet))
623 }
624}
625
85aaf69f 626#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 627impl<T> Drop for Sender<T> {
1a4d82fc
JJ
628 fn drop(&mut self) {
629 match *unsafe { self.inner_mut() } {
630 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
631 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
632 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
633 Flavor::Sync(..) => unreachable!(),
634 }
635 }
636}
637
638////////////////////////////////////////////////////////////////////////////////
639// SyncSender
640////////////////////////////////////////////////////////////////////////////////
641
c34b1796 642impl<T> SyncSender<T> {
85aaf69f
SL
643 fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
644 SyncSender { inner: inner }
1a4d82fc
JJ
645 }
646
647 /// Sends a value on this synchronous channel.
648 ///
649 /// This function will *block* until space in the internal buffer becomes
650 /// available or a receiver is available to hand off the message to.
651 ///
652 /// Note that a successful send does *not* guarantee that the receiver will
653 /// ever see the data if there is a buffer on this channel. Items may be
654 /// enqueued in the internal buffer for the receiver to receive at a later
655 /// time. If the buffer size is 0, however, it can be guaranteed that the
656 /// receiver has indeed received the data if this function returns success.
657 ///
658 /// This function will never panic, but it may return `Err` if the
659 /// `Receiver` has disconnected and is no longer able to receive
660 /// information.
85aaf69f 661 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
662 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
663 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
664 }
665
666 /// Attempts to send a value on this channel without blocking.
667 ///
668 /// This method differs from `send` by returning immediately if the
669 /// channel's buffer is full or no receiver is waiting to acquire some
670 /// data. Compared with `send`, this function has two failure cases
671 /// instead of one (one for disconnection, one for a full buffer).
672 ///
673 /// See `SyncSender::send` for notes about guarantees of whether the
674 /// receiver has received the data or not if this function is successful.
85aaf69f 675 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
676 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
677 unsafe { (*self.inner.get()).try_send(t) }
678 }
679}
680
85aaf69f 681#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 682impl<T> Clone for SyncSender<T> {
1a4d82fc
JJ
683 fn clone(&self) -> SyncSender<T> {
684 unsafe { (*self.inner.get()).clone_chan(); }
e9174d1e 685 SyncSender::new(self.inner.clone())
1a4d82fc
JJ
686 }
687}
688
85aaf69f 689#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 690impl<T> Drop for SyncSender<T> {
1a4d82fc
JJ
691 fn drop(&mut self) {
692 unsafe { (*self.inner.get()).drop_chan(); }
693 }
694}
695
696////////////////////////////////////////////////////////////////////////////////
697// Receiver
698////////////////////////////////////////////////////////////////////////////////
699
c34b1796 700impl<T> Receiver<T> {
1a4d82fc
JJ
701 fn new(inner: Flavor<T>) -> Receiver<T> {
702 Receiver { inner: UnsafeCell::new(inner) }
703 }
704
705 /// Attempts to return a pending value on this receiver without blocking
706 ///
707 /// This method will never block the caller in order to wait for data to
708 /// become available. Instead, this will always return immediately with a
709 /// possible option of pending data on the channel.
710 ///
711 /// This is useful for a flavor of "optimistic check" before deciding to
712 /// block on a receiver.
85aaf69f 713 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
714 pub fn try_recv(&self) -> Result<T, TryRecvError> {
715 loop {
716 let new_port = match *unsafe { self.inner() } {
717 Flavor::Oneshot(ref p) => {
718 match unsafe { (*p.get()).try_recv() } {
719 Ok(t) => return Ok(t),
720 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
721 Err(oneshot::Disconnected) => {
722 return Err(TryRecvError::Disconnected)
723 }
724 Err(oneshot::Upgraded(rx)) => rx,
725 }
726 }
727 Flavor::Stream(ref p) => {
728 match unsafe { (*p.get()).try_recv() } {
729 Ok(t) => return Ok(t),
730 Err(stream::Empty) => return Err(TryRecvError::Empty),
731 Err(stream::Disconnected) => {
732 return Err(TryRecvError::Disconnected)
733 }
734 Err(stream::Upgraded(rx)) => rx,
735 }
736 }
737 Flavor::Shared(ref p) => {
738 match unsafe { (*p.get()).try_recv() } {
739 Ok(t) => return Ok(t),
740 Err(shared::Empty) => return Err(TryRecvError::Empty),
741 Err(shared::Disconnected) => {
742 return Err(TryRecvError::Disconnected)
743 }
744 }
745 }
746 Flavor::Sync(ref p) => {
747 match unsafe { (*p.get()).try_recv() } {
748 Ok(t) => return Ok(t),
749 Err(sync::Empty) => return Err(TryRecvError::Empty),
750 Err(sync::Disconnected) => {
751 return Err(TryRecvError::Disconnected)
752 }
753 }
754 }
755 };
756 unsafe {
757 mem::swap(self.inner_mut(),
758 new_port.inner_mut());
759 }
760 }
761 }
762
9346a6ac 763 /// Attempts to wait for a value on this receiver, returning an error if the
1a4d82fc
JJ
764 /// corresponding channel has hung up.
765 ///
766 /// This function will always block the current thread if there is no data
767 /// available and it's possible for more data to be sent. Once a message is
768 /// sent to the corresponding `Sender`, then this receiver will wake up and
769 /// return that message.
770 ///
771 /// If the corresponding `Sender` has disconnected, or it disconnects while
772 /// this call is blocking, this call will wake up and return `Err` to
773 /// indicate that no more messages can ever be received on this channel.
c1a9b12d
SL
774 /// However, since channels are buffered, messages sent before the disconnect
775 /// will still be properly received.
776 ///
777 /// # Examples
778 ///
779 /// ```
780 /// use std::sync::mpsc;
781 /// use std::thread;
782 ///
783 /// let (send, recv) = mpsc::channel();
784 /// let handle = thread::spawn(move || {
785 /// send.send(1u8).unwrap();
786 /// });
787 ///
788 /// handle.join().unwrap();
789 ///
790 /// assert_eq!(Ok(1), recv.recv());
791 /// ```
792 ///
793 /// Buffering behavior:
794 ///
795 /// ```
796 /// use std::sync::mpsc;
797 /// use std::thread;
798 /// use std::sync::mpsc::RecvError;
799 ///
800 /// let (send, recv) = mpsc::channel();
801 /// let handle = thread::spawn(move || {
802 /// send.send(1u8).unwrap();
803 /// send.send(2).unwrap();
804 /// send.send(3).unwrap();
805 /// drop(send);
806 /// });
807 ///
808 /// // wait for the thread to join so we ensure the sender is dropped
809 /// handle.join().unwrap();
810 ///
811 /// assert_eq!(Ok(1), recv.recv());
812 /// assert_eq!(Ok(2), recv.recv());
813 /// assert_eq!(Ok(3), recv.recv());
814 /// assert_eq!(Err(RecvError), recv.recv());
815 /// ```
85aaf69f 816 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
817 pub fn recv(&self) -> Result<T, RecvError> {
818 loop {
819 let new_port = match *unsafe { self.inner() } {
820 Flavor::Oneshot(ref p) => {
821 match unsafe { (*p.get()).recv() } {
822 Ok(t) => return Ok(t),
823 Err(oneshot::Empty) => return unreachable!(),
824 Err(oneshot::Disconnected) => return Err(RecvError),
825 Err(oneshot::Upgraded(rx)) => rx,
826 }
827 }
828 Flavor::Stream(ref p) => {
829 match unsafe { (*p.get()).recv() } {
830 Ok(t) => return Ok(t),
831 Err(stream::Empty) => return unreachable!(),
832 Err(stream::Disconnected) => return Err(RecvError),
833 Err(stream::Upgraded(rx)) => rx,
834 }
835 }
836 Flavor::Shared(ref p) => {
837 match unsafe { (*p.get()).recv() } {
838 Ok(t) => return Ok(t),
839 Err(shared::Empty) => return unreachable!(),
840 Err(shared::Disconnected) => return Err(RecvError),
841 }
842 }
843 Flavor::Sync(ref p) => return unsafe {
844 (*p.get()).recv().map_err(|()| RecvError)
845 }
846 };
847 unsafe {
848 mem::swap(self.inner_mut(), new_port.inner_mut());
849 }
850 }
851 }
852
853 /// Returns an iterator that will block waiting for messages, but never
854 /// `panic!`. It will return `None` when the channel has hung up.
85aaf69f 855 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
856 pub fn iter(&self) -> Iter<T> {
857 Iter { rx: self }
858 }
859}
860
c34b1796 861impl<T> select::Packet for Receiver<T> {
1a4d82fc
JJ
862 fn can_recv(&self) -> bool {
863 loop {
864 let new_port = match *unsafe { self.inner() } {
865 Flavor::Oneshot(ref p) => {
866 match unsafe { (*p.get()).can_recv() } {
867 Ok(ret) => return ret,
868 Err(upgrade) => upgrade,
869 }
870 }
871 Flavor::Stream(ref p) => {
872 match unsafe { (*p.get()).can_recv() } {
873 Ok(ret) => return ret,
874 Err(upgrade) => upgrade,
875 }
876 }
877 Flavor::Shared(ref p) => {
878 return unsafe { (*p.get()).can_recv() };
879 }
880 Flavor::Sync(ref p) => {
881 return unsafe { (*p.get()).can_recv() };
882 }
883 };
884 unsafe {
885 mem::swap(self.inner_mut(),
886 new_port.inner_mut());
887 }
888 }
889 }
890
891 fn start_selection(&self, mut token: SignalToken) -> StartResult {
892 loop {
893 let (t, new_port) = match *unsafe { self.inner() } {
894 Flavor::Oneshot(ref p) => {
895 match unsafe { (*p.get()).start_selection(token) } {
896 oneshot::SelSuccess => return Installed,
897 oneshot::SelCanceled => return Abort,
898 oneshot::SelUpgraded(t, rx) => (t, rx),
899 }
900 }
901 Flavor::Stream(ref p) => {
902 match unsafe { (*p.get()).start_selection(token) } {
903 stream::SelSuccess => return Installed,
904 stream::SelCanceled => return Abort,
905 stream::SelUpgraded(t, rx) => (t, rx),
906 }
907 }
908 Flavor::Shared(ref p) => {
909 return unsafe { (*p.get()).start_selection(token) };
910 }
911 Flavor::Sync(ref p) => {
912 return unsafe { (*p.get()).start_selection(token) };
913 }
914 };
915 token = t;
916 unsafe {
917 mem::swap(self.inner_mut(), new_port.inner_mut());
918 }
919 }
920 }
921
922 fn abort_selection(&self) -> bool {
923 let mut was_upgrade = false;
924 loop {
925 let result = match *unsafe { self.inner() } {
926 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
927 Flavor::Stream(ref p) => unsafe {
928 (*p.get()).abort_selection(was_upgrade)
929 },
930 Flavor::Shared(ref p) => return unsafe {
931 (*p.get()).abort_selection(was_upgrade)
932 },
933 Flavor::Sync(ref p) => return unsafe {
934 (*p.get()).abort_selection()
935 },
936 };
937 let new_port = match result { Ok(b) => return b, Err(p) => p };
938 was_upgrade = true;
939 unsafe {
940 mem::swap(self.inner_mut(),
941 new_port.inner_mut());
942 }
943 }
944 }
945}
946
85aaf69f 947#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 948impl<'a, T> Iterator for Iter<'a, T> {
1a4d82fc
JJ
949 type Item = T;
950
951 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
952}
953
d9579d0f
AL
954#[stable(feature = "receiver_into_iter", since = "1.1.0")]
955impl<'a, T> IntoIterator for &'a Receiver<T> {
956 type Item = T;
957 type IntoIter = Iter<'a, T>;
958
959 fn into_iter(self) -> Iter<'a, T> { self.iter() }
960}
961
92a42be0 962#[stable(feature = "receiver_into_iter", since = "1.1.0")]
d9579d0f
AL
963impl<T> Iterator for IntoIter<T> {
964 type Item = T;
965 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
966}
967
968#[stable(feature = "receiver_into_iter", since = "1.1.0")]
969impl <T> IntoIterator for Receiver<T> {
970 type Item = T;
971 type IntoIter = IntoIter<T>;
972
973 fn into_iter(self) -> IntoIter<T> {
974 IntoIter { rx: self }
975 }
976}
977
85aaf69f 978#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 979impl<T> Drop for Receiver<T> {
1a4d82fc
JJ
980 fn drop(&mut self) {
981 match *unsafe { self.inner_mut() } {
982 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
983 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
984 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
985 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
986 }
987 }
988}
989
85aaf69f
SL
990#[stable(feature = "rust1", since = "1.0.0")]
991impl<T> fmt::Debug for SendError<T> {
992 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
993 "SendError(..)".fmt(f)
1a4d82fc 994 }
85aaf69f 995}
1a4d82fc 996
85aaf69f
SL
997#[stable(feature = "rust1", since = "1.0.0")]
998impl<T> fmt::Display for SendError<T> {
999 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1000 "sending on a closed channel".fmt(f)
1a4d82fc 1001 }
1a4d82fc
JJ
1002}
1003
c34b1796 1004#[stable(feature = "rust1", since = "1.0.0")]
bd371182 1005impl<T: Send + Reflect> error::Error for SendError<T> {
c34b1796
AL
1006 fn description(&self) -> &str {
1007 "sending on a closed channel"
1008 }
1009
1010 fn cause(&self) -> Option<&error::Error> {
1011 None
1012 }
1013}
1014
85aaf69f
SL
1015#[stable(feature = "rust1", since = "1.0.0")]
1016impl<T> fmt::Debug for TrySendError<T> {
1a4d82fc 1017 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85aaf69f
SL
1018 match *self {
1019 TrySendError::Full(..) => "Full(..)".fmt(f),
1020 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1021 }
1a4d82fc
JJ
1022 }
1023}
1024
85aaf69f
SL
1025#[stable(feature = "rust1", since = "1.0.0")]
1026impl<T> fmt::Display for TrySendError<T> {
1a4d82fc
JJ
1027 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1028 match *self {
1029 TrySendError::Full(..) => {
1030 "sending on a full channel".fmt(f)
1031 }
1032 TrySendError::Disconnected(..) => {
1033 "sending on a closed channel".fmt(f)
1034 }
1035 }
1036 }
1037}
1038
c34b1796 1039#[stable(feature = "rust1", since = "1.0.0")]
bd371182 1040impl<T: Send + Reflect> error::Error for TrySendError<T> {
c34b1796
AL
1041
1042 fn description(&self) -> &str {
1043 match *self {
1044 TrySendError::Full(..) => {
1045 "sending on a full channel"
1046 }
1047 TrySendError::Disconnected(..) => {
1048 "sending on a closed channel"
1049 }
1050 }
1051 }
1052
1053 fn cause(&self) -> Option<&error::Error> {
1054 None
1055 }
1056}
1057
85aaf69f
SL
1058#[stable(feature = "rust1", since = "1.0.0")]
1059impl fmt::Display for RecvError {
1a4d82fc
JJ
1060 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1061 "receiving on a closed channel".fmt(f)
1062 }
1063}
1064
c34b1796
AL
1065#[stable(feature = "rust1", since = "1.0.0")]
1066impl error::Error for RecvError {
1067
1068 fn description(&self) -> &str {
1069 "receiving on a closed channel"
1070 }
1071
1072 fn cause(&self) -> Option<&error::Error> {
1073 None
1074 }
1075}
1076
85aaf69f
SL
1077#[stable(feature = "rust1", since = "1.0.0")]
1078impl fmt::Display for TryRecvError {
1a4d82fc
JJ
1079 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1080 match *self {
1081 TryRecvError::Empty => {
1082 "receiving on an empty channel".fmt(f)
1083 }
1084 TryRecvError::Disconnected => {
1085 "receiving on a closed channel".fmt(f)
1086 }
1087 }
1088 }
1089}
1090
c34b1796
AL
1091#[stable(feature = "rust1", since = "1.0.0")]
1092impl error::Error for TryRecvError {
1093
1094 fn description(&self) -> &str {
1095 match *self {
1096 TryRecvError::Empty => {
1097 "receiving on an empty channel"
1098 }
1099 TryRecvError::Disconnected => {
1100 "receiving on a closed channel"
1101 }
1102 }
1103 }
1104
1105 fn cause(&self) -> Option<&error::Error> {
1106 None
1107 }
1108}
1109
1a4d82fc 1110#[cfg(test)]
d9579d0f 1111mod tests {
1a4d82fc
JJ
1112 use prelude::v1::*;
1113
c1a9b12d 1114 use env;
1a4d82fc 1115 use super::*;
85aaf69f 1116 use thread;
1a4d82fc 1117
c34b1796 1118 pub fn stress_factor() -> usize {
85aaf69f
SL
1119 match env::var("RUST_TEST_STRESS") {
1120 Ok(val) => val.parse().unwrap(),
1121 Err(..) => 1,
1a4d82fc
JJ
1122 }
1123 }
1124
1125 #[test]
1126 fn smoke() {
c34b1796 1127 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1128 tx.send(1).unwrap();
1129 assert_eq!(rx.recv().unwrap(), 1);
1130 }
1131
1132 #[test]
1133 fn drop_full() {
c34b1796 1134 let (tx, _rx) = channel::<Box<isize>>();
85aaf69f 1135 tx.send(box 1).unwrap();
1a4d82fc
JJ
1136 }
1137
1138 #[test]
1139 fn drop_full_shared() {
c34b1796 1140 let (tx, _rx) = channel::<Box<isize>>();
1a4d82fc
JJ
1141 drop(tx.clone());
1142 drop(tx.clone());
85aaf69f 1143 tx.send(box 1).unwrap();
1a4d82fc
JJ
1144 }
1145
1146 #[test]
1147 fn smoke_shared() {
c34b1796 1148 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1149 tx.send(1).unwrap();
1150 assert_eq!(rx.recv().unwrap(), 1);
1151 let tx = tx.clone();
1152 tx.send(1).unwrap();
1153 assert_eq!(rx.recv().unwrap(), 1);
1154 }
1155
1156 #[test]
1157 fn smoke_threads() {
c34b1796 1158 let (tx, rx) = channel::<i32>();
85aaf69f 1159 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1160 tx.send(1).unwrap();
1161 });
1162 assert_eq!(rx.recv().unwrap(), 1);
1163 }
1164
1165 #[test]
1166 fn smoke_port_gone() {
c34b1796 1167 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1168 drop(rx);
1169 assert!(tx.send(1).is_err());
1170 }
1171
1172 #[test]
1173 fn smoke_shared_port_gone() {
c34b1796 1174 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1175 drop(rx);
1176 assert!(tx.send(1).is_err())
1177 }
1178
1179 #[test]
1180 fn smoke_shared_port_gone2() {
c34b1796 1181 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1182 drop(rx);
1183 let tx2 = tx.clone();
1184 drop(tx);
1185 assert!(tx2.send(1).is_err());
1186 }
1187
1188 #[test]
1189 fn port_gone_concurrent() {
c34b1796 1190 let (tx, rx) = channel::<i32>();
85aaf69f 1191 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1192 rx.recv().unwrap();
1193 });
1194 while tx.send(1).is_ok() {}
1195 }
1196
1197 #[test]
1198 fn port_gone_concurrent_shared() {
c34b1796 1199 let (tx, rx) = channel::<i32>();
1a4d82fc 1200 let tx2 = tx.clone();
85aaf69f 1201 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1202 rx.recv().unwrap();
1203 });
1204 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1205 }
1206
1207 #[test]
1208 fn smoke_chan_gone() {
c34b1796 1209 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1210 drop(tx);
1211 assert!(rx.recv().is_err());
1212 }
1213
1214 #[test]
1215 fn smoke_chan_gone_shared() {
1216 let (tx, rx) = channel::<()>();
1217 let tx2 = tx.clone();
1218 drop(tx);
1219 drop(tx2);
1220 assert!(rx.recv().is_err());
1221 }
1222
1223 #[test]
1224 fn chan_gone_concurrent() {
c34b1796 1225 let (tx, rx) = channel::<i32>();
85aaf69f 1226 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1227 tx.send(1).unwrap();
1228 tx.send(1).unwrap();
1229 });
1230 while rx.recv().is_ok() {}
1231 }
1232
1233 #[test]
1234 fn stress() {
c34b1796 1235 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1236 let t = thread::spawn(move|| {
1237 for _ in 0..10000 { tx.send(1).unwrap(); }
1a4d82fc 1238 });
85aaf69f 1239 for _ in 0..10000 {
1a4d82fc
JJ
1240 assert_eq!(rx.recv().unwrap(), 1);
1241 }
1242 t.join().ok().unwrap();
1243 }
1244
1245 #[test]
1246 fn stress_shared() {
c34b1796
AL
1247 const AMT: u32 = 10000;
1248 const NTHREADS: u32 = 8;
1249 let (tx, rx) = channel::<i32>();
1a4d82fc 1250
85aaf69f
SL
1251 let t = thread::spawn(move|| {
1252 for _ in 0..AMT * NTHREADS {
1a4d82fc
JJ
1253 assert_eq!(rx.recv().unwrap(), 1);
1254 }
1255 match rx.try_recv() {
1256 Ok(..) => panic!(),
1257 _ => {}
1258 }
1259 });
1260
85aaf69f 1261 for _ in 0..NTHREADS {
1a4d82fc 1262 let tx = tx.clone();
85aaf69f
SL
1263 thread::spawn(move|| {
1264 for _ in 0..AMT { tx.send(1).unwrap(); }
1a4d82fc
JJ
1265 });
1266 }
1267 drop(tx);
1268 t.join().ok().unwrap();
1269 }
1270
1271 #[test]
1272 fn send_from_outside_runtime() {
1273 let (tx1, rx1) = channel::<()>();
c34b1796 1274 let (tx2, rx2) = channel::<i32>();
85aaf69f 1275 let t1 = thread::spawn(move|| {
1a4d82fc 1276 tx1.send(()).unwrap();
85aaf69f 1277 for _ in 0..40 {
1a4d82fc
JJ
1278 assert_eq!(rx2.recv().unwrap(), 1);
1279 }
1280 });
1281 rx1.recv().unwrap();
85aaf69f
SL
1282 let t2 = thread::spawn(move|| {
1283 for _ in 0..40 {
1a4d82fc
JJ
1284 tx2.send(1).unwrap();
1285 }
1286 });
1287 t1.join().ok().unwrap();
1288 t2.join().ok().unwrap();
1289 }
1290
1291 #[test]
1292 fn recv_from_outside_runtime() {
c34b1796 1293 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1294 let t = thread::spawn(move|| {
1295 for _ in 0..40 {
1a4d82fc
JJ
1296 assert_eq!(rx.recv().unwrap(), 1);
1297 }
1298 });
85aaf69f 1299 for _ in 0..40 {
1a4d82fc
JJ
1300 tx.send(1).unwrap();
1301 }
1302 t.join().ok().unwrap();
1303 }
1304
1305 #[test]
1306 fn no_runtime() {
c34b1796
AL
1307 let (tx1, rx1) = channel::<i32>();
1308 let (tx2, rx2) = channel::<i32>();
85aaf69f 1309 let t1 = thread::spawn(move|| {
1a4d82fc
JJ
1310 assert_eq!(rx1.recv().unwrap(), 1);
1311 tx2.send(2).unwrap();
1312 });
85aaf69f 1313 let t2 = thread::spawn(move|| {
1a4d82fc
JJ
1314 tx1.send(1).unwrap();
1315 assert_eq!(rx2.recv().unwrap(), 2);
1316 });
1317 t1.join().ok().unwrap();
1318 t2.join().ok().unwrap();
1319 }
1320
1321 #[test]
1322 fn oneshot_single_thread_close_port_first() {
1323 // Simple test of closing without sending
c34b1796 1324 let (_tx, rx) = channel::<i32>();
1a4d82fc
JJ
1325 drop(rx);
1326 }
1327
1328 #[test]
1329 fn oneshot_single_thread_close_chan_first() {
1330 // Simple test of closing without sending
c34b1796 1331 let (tx, _rx) = channel::<i32>();
1a4d82fc
JJ
1332 drop(tx);
1333 }
1334
1335 #[test]
1336 fn oneshot_single_thread_send_port_close() {
1337 // Testing that the sender cleans up the payload if receiver is closed
c34b1796 1338 let (tx, rx) = channel::<Box<i32>>();
1a4d82fc
JJ
1339 drop(rx);
1340 assert!(tx.send(box 0).is_err());
1341 }
1342
1343 #[test]
1344 fn oneshot_single_thread_recv_chan_close() {
1345 // Receiving on a closed chan will panic
85aaf69f 1346 let res = thread::spawn(move|| {
c34b1796 1347 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1348 drop(tx);
1349 rx.recv().unwrap();
1350 }).join();
1351 // What is our res?
1352 assert!(res.is_err());
1353 }
1354
1355 #[test]
1356 fn oneshot_single_thread_send_then_recv() {
c34b1796 1357 let (tx, rx) = channel::<Box<i32>>();
1a4d82fc
JJ
1358 tx.send(box 10).unwrap();
1359 assert!(rx.recv().unwrap() == box 10);
1360 }
1361
1362 #[test]
1363 fn oneshot_single_thread_try_send_open() {
c34b1796 1364 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1365 assert!(tx.send(10).is_ok());
1366 assert!(rx.recv().unwrap() == 10);
1367 }
1368
1369 #[test]
1370 fn oneshot_single_thread_try_send_closed() {
c34b1796 1371 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1372 drop(rx);
1373 assert!(tx.send(10).is_err());
1374 }
1375
1376 #[test]
1377 fn oneshot_single_thread_try_recv_open() {
c34b1796 1378 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1379 tx.send(10).unwrap();
1380 assert!(rx.recv() == Ok(10));
1381 }
1382
1383 #[test]
1384 fn oneshot_single_thread_try_recv_closed() {
c34b1796 1385 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1386 drop(tx);
1387 assert!(rx.recv().is_err());
1388 }
1389
1390 #[test]
1391 fn oneshot_single_thread_peek_data() {
c34b1796 1392 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1393 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1394 tx.send(10).unwrap();
1395 assert_eq!(rx.try_recv(), Ok(10));
1396 }
1397
1398 #[test]
1399 fn oneshot_single_thread_peek_close() {
c34b1796 1400 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1401 drop(tx);
1402 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1403 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1404 }
1405
1406 #[test]
1407 fn oneshot_single_thread_peek_open() {
c34b1796 1408 let (_tx, rx) = channel::<i32>();
1a4d82fc
JJ
1409 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1410 }
1411
1412 #[test]
1413 fn oneshot_multi_task_recv_then_send() {
c34b1796 1414 let (tx, rx) = channel::<Box<i32>>();
85aaf69f 1415 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1416 assert!(rx.recv().unwrap() == box 10);
1417 });
1418
1419 tx.send(box 10).unwrap();
1420 }
1421
1422 #[test]
1423 fn oneshot_multi_task_recv_then_close() {
c34b1796 1424 let (tx, rx) = channel::<Box<i32>>();
85aaf69f 1425 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1426 drop(tx);
1427 });
85aaf69f 1428 let res = thread::spawn(move|| {
1a4d82fc
JJ
1429 assert!(rx.recv().unwrap() == box 10);
1430 }).join();
1431 assert!(res.is_err());
1432 }
1433
1434 #[test]
1435 fn oneshot_multi_thread_close_stress() {
85aaf69f 1436 for _ in 0..stress_factor() {
c34b1796 1437 let (tx, rx) = channel::<i32>();
85aaf69f 1438 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1439 drop(rx);
1440 });
1441 drop(tx);
1442 }
1443 }
1444
1445 #[test]
1446 fn oneshot_multi_thread_send_close_stress() {
85aaf69f 1447 for _ in 0..stress_factor() {
c34b1796 1448 let (tx, rx) = channel::<i32>();
85aaf69f 1449 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1450 drop(rx);
1451 });
85aaf69f 1452 let _ = thread::spawn(move|| {
1a4d82fc
JJ
1453 tx.send(1).unwrap();
1454 }).join();
1455 }
1456 }
1457
1458 #[test]
1459 fn oneshot_multi_thread_recv_close_stress() {
85aaf69f 1460 for _ in 0..stress_factor() {
c34b1796 1461 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1462 thread::spawn(move|| {
1463 let res = thread::spawn(move|| {
1a4d82fc
JJ
1464 rx.recv().unwrap();
1465 }).join();
1466 assert!(res.is_err());
1467 });
85aaf69f
SL
1468 let _t = thread::spawn(move|| {
1469 thread::spawn(move|| {
1a4d82fc
JJ
1470 drop(tx);
1471 });
1472 });
1473 }
1474 }
1475
1476 #[test]
1477 fn oneshot_multi_thread_send_recv_stress() {
85aaf69f 1478 for _ in 0..stress_factor() {
c34b1796 1479 let (tx, rx) = channel::<Box<isize>>();
85aaf69f
SL
1480 let _t = thread::spawn(move|| {
1481 tx.send(box 10).unwrap();
1a4d82fc 1482 });
85aaf69f 1483 assert!(rx.recv().unwrap() == box 10);
1a4d82fc
JJ
1484 }
1485 }
1486
1487 #[test]
1488 fn stream_send_recv_stress() {
85aaf69f 1489 for _ in 0..stress_factor() {
1a4d82fc
JJ
1490 let (tx, rx) = channel();
1491
1492 send(tx, 0);
1493 recv(rx, 0);
1494
c34b1796 1495 fn send(tx: Sender<Box<i32>>, i: i32) {
1a4d82fc
JJ
1496 if i == 10 { return }
1497
85aaf69f 1498 thread::spawn(move|| {
1a4d82fc
JJ
1499 tx.send(box i).unwrap();
1500 send(tx, i + 1);
1501 });
1502 }
1503
c34b1796 1504 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1a4d82fc
JJ
1505 if i == 10 { return }
1506
85aaf69f 1507 thread::spawn(move|| {
1a4d82fc
JJ
1508 assert!(rx.recv().unwrap() == box i);
1509 recv(rx, i + 1);
1510 });
1511 }
1512 }
1513 }
1514
1515 #[test]
1516 fn recv_a_lot() {
1517 // Regression test that we don't run out of stack in scheduler context
1518 let (tx, rx) = channel();
85aaf69f
SL
1519 for _ in 0..10000 { tx.send(()).unwrap(); }
1520 for _ in 0..10000 { rx.recv().unwrap(); }
1a4d82fc
JJ
1521 }
1522
1523 #[test]
1524 fn shared_chan_stress() {
1525 let (tx, rx) = channel();
1526 let total = stress_factor() + 100;
85aaf69f 1527 for _ in 0..total {
1a4d82fc 1528 let tx = tx.clone();
85aaf69f 1529 thread::spawn(move|| {
1a4d82fc
JJ
1530 tx.send(()).unwrap();
1531 });
1532 }
1533
85aaf69f 1534 for _ in 0..total {
1a4d82fc
JJ
1535 rx.recv().unwrap();
1536 }
1537 }
1538
1539 #[test]
1540 fn test_nested_recv_iter() {
c34b1796
AL
1541 let (tx, rx) = channel::<i32>();
1542 let (total_tx, total_rx) = channel::<i32>();
1a4d82fc 1543
85aaf69f 1544 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1545 let mut acc = 0;
1546 for x in rx.iter() {
1547 acc += x;
1548 }
1549 total_tx.send(acc).unwrap();
1550 });
1551
1552 tx.send(3).unwrap();
1553 tx.send(1).unwrap();
1554 tx.send(2).unwrap();
1555 drop(tx);
1556 assert_eq!(total_rx.recv().unwrap(), 6);
1557 }
1558
1559 #[test]
1560 fn test_recv_iter_break() {
c34b1796 1561 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1562 let (count_tx, count_rx) = channel();
1563
85aaf69f 1564 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1565 let mut count = 0;
1566 for x in rx.iter() {
1567 if count >= 3 {
1568 break;
1569 } else {
1570 count += x;
1571 }
1572 }
1573 count_tx.send(count).unwrap();
1574 });
1575
1576 tx.send(2).unwrap();
1577 tx.send(2).unwrap();
1578 tx.send(2).unwrap();
1579 let _ = tx.send(2);
1580 drop(tx);
1581 assert_eq!(count_rx.recv().unwrap(), 4);
1582 }
1583
d9579d0f
AL
1584 #[test]
1585 fn test_recv_into_iter_owned() {
1586 let mut iter = {
1587 let (tx, rx) = channel::<i32>();
1588 tx.send(1).unwrap();
1589 tx.send(2).unwrap();
1590
1591 rx.into_iter()
1592 };
1593 assert_eq!(iter.next().unwrap(), 1);
1594 assert_eq!(iter.next().unwrap(), 2);
1595 assert_eq!(iter.next().is_none(), true);
1596 }
1597
1598 #[test]
1599 fn test_recv_into_iter_borrowed() {
1600 let (tx, rx) = channel::<i32>();
1601 tx.send(1).unwrap();
1602 tx.send(2).unwrap();
1603 drop(tx);
1604 let mut iter = (&rx).into_iter();
1605 assert_eq!(iter.next().unwrap(), 1);
1606 assert_eq!(iter.next().unwrap(), 2);
1607 assert_eq!(iter.next().is_none(), true);
1608 }
1609
1a4d82fc
JJ
1610 #[test]
1611 fn try_recv_states() {
c34b1796 1612 let (tx1, rx1) = channel::<i32>();
1a4d82fc
JJ
1613 let (tx2, rx2) = channel::<()>();
1614 let (tx3, rx3) = channel::<()>();
85aaf69f 1615 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1616 rx2.recv().unwrap();
1617 tx1.send(1).unwrap();
1618 tx3.send(()).unwrap();
1619 rx2.recv().unwrap();
1620 drop(tx1);
1621 tx3.send(()).unwrap();
1622 });
1623
1624 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1625 tx2.send(()).unwrap();
1626 rx3.recv().unwrap();
1627 assert_eq!(rx1.try_recv(), Ok(1));
1628 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1629 tx2.send(()).unwrap();
1630 rx3.recv().unwrap();
1631 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1632 }
1633
1634 // This bug used to end up in a livelock inside of the Receiver destructor
1635 // because the internal state of the Shared packet was corrupted
1636 #[test]
1637 fn destroy_upgraded_shared_port_when_sender_still_active() {
1638 let (tx, rx) = channel();
1639 let (tx2, rx2) = channel();
85aaf69f 1640 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1641 rx.recv().unwrap(); // wait on a oneshot
1642 drop(rx); // destroy a shared
1643 tx2.send(()).unwrap();
1644 });
bd371182 1645 // make sure the other thread has gone to sleep
85aaf69f 1646 for _ in 0..5000 { thread::yield_now(); }
1a4d82fc
JJ
1647
1648 // upgrade to a shared chan and send a message
1649 let t = tx.clone();
1650 drop(tx);
1651 t.send(()).unwrap();
1652
bd371182 1653 // wait for the child thread to exit before we exit
1a4d82fc
JJ
1654 rx2.recv().unwrap();
1655 }
1656}
1657
1658#[cfg(test)]
1659mod sync_tests {
1660 use prelude::v1::*;
1661
c1a9b12d 1662 use env;
85aaf69f 1663 use thread;
1a4d82fc
JJ
1664 use super::*;
1665
c34b1796 1666 pub fn stress_factor() -> usize {
85aaf69f
SL
1667 match env::var("RUST_TEST_STRESS") {
1668 Ok(val) => val.parse().unwrap(),
1669 Err(..) => 1,
1a4d82fc
JJ
1670 }
1671 }
1672
1673 #[test]
1674 fn smoke() {
c34b1796 1675 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1676 tx.send(1).unwrap();
1677 assert_eq!(rx.recv().unwrap(), 1);
1678 }
1679
1680 #[test]
1681 fn drop_full() {
c34b1796 1682 let (tx, _rx) = sync_channel::<Box<isize>>(1);
85aaf69f 1683 tx.send(box 1).unwrap();
1a4d82fc
JJ
1684 }
1685
1686 #[test]
1687 fn smoke_shared() {
c34b1796 1688 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1689 tx.send(1).unwrap();
1690 assert_eq!(rx.recv().unwrap(), 1);
1691 let tx = tx.clone();
1692 tx.send(1).unwrap();
1693 assert_eq!(rx.recv().unwrap(), 1);
1694 }
1695
1696 #[test]
1697 fn smoke_threads() {
c34b1796 1698 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1699 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1700 tx.send(1).unwrap();
1701 });
1702 assert_eq!(rx.recv().unwrap(), 1);
1703 }
1704
1705 #[test]
1706 fn smoke_port_gone() {
c34b1796 1707 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1708 drop(rx);
1709 assert!(tx.send(1).is_err());
1710 }
1711
1712 #[test]
1713 fn smoke_shared_port_gone2() {
c34b1796 1714 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1715 drop(rx);
1716 let tx2 = tx.clone();
1717 drop(tx);
1718 assert!(tx2.send(1).is_err());
1719 }
1720
1721 #[test]
1722 fn port_gone_concurrent() {
c34b1796 1723 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1724 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1725 rx.recv().unwrap();
1726 });
1727 while tx.send(1).is_ok() {}
1728 }
1729
1730 #[test]
1731 fn port_gone_concurrent_shared() {
c34b1796 1732 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc 1733 let tx2 = tx.clone();
85aaf69f 1734 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1735 rx.recv().unwrap();
1736 });
1737 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1738 }
1739
1740 #[test]
1741 fn smoke_chan_gone() {
c34b1796 1742 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1743 drop(tx);
1744 assert!(rx.recv().is_err());
1745 }
1746
1747 #[test]
1748 fn smoke_chan_gone_shared() {
1749 let (tx, rx) = sync_channel::<()>(0);
1750 let tx2 = tx.clone();
1751 drop(tx);
1752 drop(tx2);
1753 assert!(rx.recv().is_err());
1754 }
1755
1756 #[test]
1757 fn chan_gone_concurrent() {
c34b1796 1758 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1759 thread::spawn(move|| {
1a4d82fc
JJ
1760 tx.send(1).unwrap();
1761 tx.send(1).unwrap();
1762 });
1763 while rx.recv().is_ok() {}
1764 }
1765
1766 #[test]
1767 fn stress() {
c34b1796 1768 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f
SL
1769 thread::spawn(move|| {
1770 for _ in 0..10000 { tx.send(1).unwrap(); }
1a4d82fc 1771 });
85aaf69f 1772 for _ in 0..10000 {
1a4d82fc
JJ
1773 assert_eq!(rx.recv().unwrap(), 1);
1774 }
1775 }
1776
1777 #[test]
1778 fn stress_shared() {
c34b1796
AL
1779 const AMT: u32 = 1000;
1780 const NTHREADS: u32 = 8;
1781 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1782 let (dtx, drx) = sync_channel::<()>(0);
1783
85aaf69f
SL
1784 thread::spawn(move|| {
1785 for _ in 0..AMT * NTHREADS {
1a4d82fc
JJ
1786 assert_eq!(rx.recv().unwrap(), 1);
1787 }
1788 match rx.try_recv() {
1789 Ok(..) => panic!(),
1790 _ => {}
1791 }
1792 dtx.send(()).unwrap();
1793 });
1794
85aaf69f 1795 for _ in 0..NTHREADS {
1a4d82fc 1796 let tx = tx.clone();
85aaf69f
SL
1797 thread::spawn(move|| {
1798 for _ in 0..AMT { tx.send(1).unwrap(); }
1a4d82fc
JJ
1799 });
1800 }
1801 drop(tx);
1802 drx.recv().unwrap();
1803 }
1804
1805 #[test]
1806 fn oneshot_single_thread_close_port_first() {
1807 // Simple test of closing without sending
c34b1796 1808 let (_tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1809 drop(rx);
1810 }
1811
1812 #[test]
1813 fn oneshot_single_thread_close_chan_first() {
1814 // Simple test of closing without sending
c34b1796 1815 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1816 drop(tx);
1817 }
1818
1819 #[test]
1820 fn oneshot_single_thread_send_port_close() {
1821 // Testing that the sender cleans up the payload if receiver is closed
c34b1796 1822 let (tx, rx) = sync_channel::<Box<i32>>(0);
1a4d82fc
JJ
1823 drop(rx);
1824 assert!(tx.send(box 0).is_err());
1825 }
1826
1827 #[test]
1828 fn oneshot_single_thread_recv_chan_close() {
1829 // Receiving on a closed chan will panic
85aaf69f 1830 let res = thread::spawn(move|| {
c34b1796 1831 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1832 drop(tx);
1833 rx.recv().unwrap();
1834 }).join();
1835 // What is our res?
1836 assert!(res.is_err());
1837 }
1838
1839 #[test]
1840 fn oneshot_single_thread_send_then_recv() {
c34b1796 1841 let (tx, rx) = sync_channel::<Box<i32>>(1);
1a4d82fc
JJ
1842 tx.send(box 10).unwrap();
1843 assert!(rx.recv().unwrap() == box 10);
1844 }
1845
1846 #[test]
1847 fn oneshot_single_thread_try_send_open() {
c34b1796 1848 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1849 assert_eq!(tx.try_send(10), Ok(()));
1850 assert!(rx.recv().unwrap() == 10);
1851 }
1852
1853 #[test]
1854 fn oneshot_single_thread_try_send_closed() {
c34b1796 1855 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1856 drop(rx);
1857 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1858 }
1859
1860 #[test]
1861 fn oneshot_single_thread_try_send_closed2() {
c34b1796 1862 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1863 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1864 }
1865
1866 #[test]
1867 fn oneshot_single_thread_try_recv_open() {
c34b1796 1868 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1869 tx.send(10).unwrap();
1870 assert!(rx.recv() == Ok(10));
1871 }
1872
1873 #[test]
1874 fn oneshot_single_thread_try_recv_closed() {
c34b1796 1875 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1876 drop(tx);
1877 assert!(rx.recv().is_err());
1878 }
1879
1880 #[test]
1881 fn oneshot_single_thread_peek_data() {
c34b1796 1882 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1883 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1884 tx.send(10).unwrap();
1885 assert_eq!(rx.try_recv(), Ok(10));
1886 }
1887
1888 #[test]
1889 fn oneshot_single_thread_peek_close() {
c34b1796 1890 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1891 drop(tx);
1892 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1893 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1894 }
1895
1896 #[test]
1897 fn oneshot_single_thread_peek_open() {
c34b1796 1898 let (_tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1899 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1900 }
1901
1902 #[test]
1903 fn oneshot_multi_task_recv_then_send() {
c34b1796 1904 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f 1905 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1906 assert!(rx.recv().unwrap() == box 10);
1907 });
1908
1909 tx.send(box 10).unwrap();
1910 }
1911
1912 #[test]
1913 fn oneshot_multi_task_recv_then_close() {
c34b1796 1914 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f 1915 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1916 drop(tx);
1917 });
85aaf69f 1918 let res = thread::spawn(move|| {
1a4d82fc
JJ
1919 assert!(rx.recv().unwrap() == box 10);
1920 }).join();
1921 assert!(res.is_err());
1922 }
1923
1924 #[test]
1925 fn oneshot_multi_thread_close_stress() {
85aaf69f 1926 for _ in 0..stress_factor() {
c34b1796 1927 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1928 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1929 drop(rx);
1930 });
1931 drop(tx);
1932 }
1933 }
1934
1935 #[test]
1936 fn oneshot_multi_thread_send_close_stress() {
85aaf69f 1937 for _ in 0..stress_factor() {
c34b1796 1938 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1939 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1940 drop(rx);
1941 });
85aaf69f 1942 let _ = thread::spawn(move || {
1a4d82fc
JJ
1943 tx.send(1).unwrap();
1944 }).join();
1945 }
1946 }
1947
1948 #[test]
1949 fn oneshot_multi_thread_recv_close_stress() {
85aaf69f 1950 for _ in 0..stress_factor() {
c34b1796 1951 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f
SL
1952 let _t = thread::spawn(move|| {
1953 let res = thread::spawn(move|| {
1a4d82fc
JJ
1954 rx.recv().unwrap();
1955 }).join();
1956 assert!(res.is_err());
1957 });
85aaf69f
SL
1958 let _t = thread::spawn(move|| {
1959 thread::spawn(move|| {
1a4d82fc
JJ
1960 drop(tx);
1961 });
1962 });
1963 }
1964 }
1965
1966 #[test]
1967 fn oneshot_multi_thread_send_recv_stress() {
85aaf69f 1968 for _ in 0..stress_factor() {
c34b1796 1969 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f
SL
1970 let _t = thread::spawn(move|| {
1971 tx.send(box 10).unwrap();
1a4d82fc 1972 });
85aaf69f 1973 assert!(rx.recv().unwrap() == box 10);
1a4d82fc
JJ
1974 }
1975 }
1976
1977 #[test]
1978 fn stream_send_recv_stress() {
85aaf69f 1979 for _ in 0..stress_factor() {
c34b1796 1980 let (tx, rx) = sync_channel::<Box<i32>>(0);
1a4d82fc
JJ
1981
1982 send(tx, 0);
1983 recv(rx, 0);
1984
c34b1796 1985 fn send(tx: SyncSender<Box<i32>>, i: i32) {
1a4d82fc
JJ
1986 if i == 10 { return }
1987
85aaf69f 1988 thread::spawn(move|| {
1a4d82fc
JJ
1989 tx.send(box i).unwrap();
1990 send(tx, i + 1);
1991 });
1992 }
1993
c34b1796 1994 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1a4d82fc
JJ
1995 if i == 10 { return }
1996
85aaf69f 1997 thread::spawn(move|| {
1a4d82fc
JJ
1998 assert!(rx.recv().unwrap() == box i);
1999 recv(rx, i + 1);
2000 });
2001 }
2002 }
2003 }
2004
2005 #[test]
2006 fn recv_a_lot() {
2007 // Regression test that we don't run out of stack in scheduler context
2008 let (tx, rx) = sync_channel(10000);
85aaf69f
SL
2009 for _ in 0..10000 { tx.send(()).unwrap(); }
2010 for _ in 0..10000 { rx.recv().unwrap(); }
1a4d82fc
JJ
2011 }
2012
2013 #[test]
2014 fn shared_chan_stress() {
2015 let (tx, rx) = sync_channel(0);
2016 let total = stress_factor() + 100;
85aaf69f 2017 for _ in 0..total {
1a4d82fc 2018 let tx = tx.clone();
85aaf69f 2019 thread::spawn(move|| {
1a4d82fc
JJ
2020 tx.send(()).unwrap();
2021 });
2022 }
2023
85aaf69f 2024 for _ in 0..total {
1a4d82fc
JJ
2025 rx.recv().unwrap();
2026 }
2027 }
2028
2029 #[test]
2030 fn test_nested_recv_iter() {
c34b1796
AL
2031 let (tx, rx) = sync_channel::<i32>(0);
2032 let (total_tx, total_rx) = sync_channel::<i32>(0);
1a4d82fc 2033
85aaf69f 2034 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2035 let mut acc = 0;
2036 for x in rx.iter() {
2037 acc += x;
2038 }
2039 total_tx.send(acc).unwrap();
2040 });
2041
2042 tx.send(3).unwrap();
2043 tx.send(1).unwrap();
2044 tx.send(2).unwrap();
2045 drop(tx);
2046 assert_eq!(total_rx.recv().unwrap(), 6);
2047 }
2048
2049 #[test]
2050 fn test_recv_iter_break() {
c34b1796 2051 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2052 let (count_tx, count_rx) = sync_channel(0);
2053
85aaf69f 2054 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2055 let mut count = 0;
2056 for x in rx.iter() {
2057 if count >= 3 {
2058 break;
2059 } else {
2060 count += x;
2061 }
2062 }
2063 count_tx.send(count).unwrap();
2064 });
2065
2066 tx.send(2).unwrap();
2067 tx.send(2).unwrap();
2068 tx.send(2).unwrap();
2069 let _ = tx.try_send(2);
2070 drop(tx);
2071 assert_eq!(count_rx.recv().unwrap(), 4);
2072 }
2073
2074 #[test]
2075 fn try_recv_states() {
c34b1796 2076 let (tx1, rx1) = sync_channel::<i32>(1);
1a4d82fc
JJ
2077 let (tx2, rx2) = sync_channel::<()>(1);
2078 let (tx3, rx3) = sync_channel::<()>(1);
85aaf69f 2079 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2080 rx2.recv().unwrap();
2081 tx1.send(1).unwrap();
2082 tx3.send(()).unwrap();
2083 rx2.recv().unwrap();
2084 drop(tx1);
2085 tx3.send(()).unwrap();
2086 });
2087
2088 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2089 tx2.send(()).unwrap();
2090 rx3.recv().unwrap();
2091 assert_eq!(rx1.try_recv(), Ok(1));
2092 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2093 tx2.send(()).unwrap();
2094 rx3.recv().unwrap();
2095 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2096 }
2097
2098 // This bug used to end up in a livelock inside of the Receiver destructor
2099 // because the internal state of the Shared packet was corrupted
2100 #[test]
2101 fn destroy_upgraded_shared_port_when_sender_still_active() {
2102 let (tx, rx) = sync_channel::<()>(0);
2103 let (tx2, rx2) = sync_channel::<()>(0);
85aaf69f 2104 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2105 rx.recv().unwrap(); // wait on a oneshot
2106 drop(rx); // destroy a shared
2107 tx2.send(()).unwrap();
2108 });
bd371182 2109 // make sure the other thread has gone to sleep
85aaf69f 2110 for _ in 0..5000 { thread::yield_now(); }
1a4d82fc
JJ
2111
2112 // upgrade to a shared chan and send a message
2113 let t = tx.clone();
2114 drop(tx);
2115 t.send(()).unwrap();
2116
bd371182 2117 // wait for the child thread to exit before we exit
1a4d82fc
JJ
2118 rx2.recv().unwrap();
2119 }
2120
2121 #[test]
2122 fn send1() {
c34b1796 2123 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2124 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
1a4d82fc
JJ
2125 assert_eq!(tx.send(1), Ok(()));
2126 }
2127
2128 #[test]
2129 fn send2() {
c34b1796 2130 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2131 let _t = thread::spawn(move|| { drop(rx); });
1a4d82fc
JJ
2132 assert!(tx.send(1).is_err());
2133 }
2134
2135 #[test]
2136 fn send3() {
c34b1796 2137 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc 2138 assert_eq!(tx.send(1), Ok(()));
85aaf69f 2139 let _t =thread::spawn(move|| { drop(rx); });
1a4d82fc
JJ
2140 assert!(tx.send(1).is_err());
2141 }
2142
2143 #[test]
2144 fn send4() {
c34b1796 2145 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2146 let tx2 = tx.clone();
2147 let (done, donerx) = channel();
2148 let done2 = done.clone();
85aaf69f 2149 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2150 assert!(tx.send(1).is_err());
2151 done.send(()).unwrap();
2152 });
85aaf69f 2153 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2154 assert!(tx2.send(2).is_err());
2155 done2.send(()).unwrap();
2156 });
2157 drop(rx);
2158 donerx.recv().unwrap();
2159 donerx.recv().unwrap();
2160 }
2161
2162 #[test]
2163 fn try_send1() {
c34b1796 2164 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2165 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2166 }
2167
2168 #[test]
2169 fn try_send2() {
c34b1796 2170 let (tx, _rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2171 assert_eq!(tx.try_send(1), Ok(()));
2172 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2173 }
2174
2175 #[test]
2176 fn try_send3() {
c34b1796 2177 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2178 assert_eq!(tx.try_send(1), Ok(()));
2179 drop(rx);
2180 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2181 }
2182
2183 #[test]
2184 fn issue_15761() {
2185 fn repro() {
2186 let (tx1, rx1) = sync_channel::<()>(3);
2187 let (tx2, rx2) = sync_channel::<()>(3);
2188
85aaf69f 2189 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2190 rx1.recv().unwrap();
2191 tx2.try_send(()).unwrap();
2192 });
2193
2194 tx1.try_send(()).unwrap();
2195 rx2.recv().unwrap();
2196 }
2197
85aaf69f 2198 for _ in 0..100 {
1a4d82fc
JJ
2199 repro()
2200 }
2201 }
2202}