]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/mod.rs
Imported Upstream version 1.0.0~beta
[rustc.git] / src / libstd / sync / mpsc / mod.rs
CommitLineData
1a4d82fc
JJ
1// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
85aaf69f 11//! Multi-producer, single-consumer FIFO queue communication primitives.
1a4d82fc
JJ
12//!
13//! This module provides message-based communication over channels, concretely
14//! defined among three types:
15//!
16//! * `Sender`
17//! * `SyncSender`
18//! * `Receiver`
19//!
20//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21//! senders are clone-able (multi-producer) such that many threads can send
22//! simultaneously to one receiver (single-consumer).
23//!
24//! These channels come in two flavors:
25//!
26//! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27//! will return a `(Sender, Receiver)` tuple where all sends will be
28//! **asynchronous** (they never block). The channel conceptually has an
29//! infinite buffer.
30//!
31//! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32//! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33//! is a pre-allocated buffer of a fixed size. All sends will be
34//! **synchronous** by blocking until there is buffer space available. Note
35//! that a bound of 0 is allowed, causing the channel to become a
36//! "rendezvous" channel where each sender atomically hands off a message to
37//! a receiver.
38//!
39//! ## Disconnection
40//!
41//! The send and receive operations on channels will all return a `Result`
42//! indicating whether the operation succeeded or not. An unsuccessful operation
43//! is normally indicative of the other half of a channel having "hung up" by
44//! being dropped in its corresponding thread.
45//!
46//! Once half of a channel has been deallocated, most operations can no longer
47//! continue to make progress, so `Err` will be returned. Many applications will
48//! continue to `unwrap()` the results returned from this module, instigating a
49//! propagation of failure among threads if one unexpectedly dies.
50//!
51//! # Examples
52//!
53//! Simple usage:
54//!
55//! ```
85aaf69f 56//! use std::thread;
1a4d82fc
JJ
57//! use std::sync::mpsc::channel;
58//!
59//! // Create a simple streaming channel
60//! let (tx, rx) = channel();
85aaf69f
SL
61//! thread::spawn(move|| {
62//! tx.send(10).unwrap();
1a4d82fc 63//! });
85aaf69f 64//! assert_eq!(rx.recv().unwrap(), 10);
1a4d82fc
JJ
65//! ```
66//!
67//! Shared usage:
68//!
69//! ```
85aaf69f 70//! use std::thread;
1a4d82fc
JJ
71//! use std::sync::mpsc::channel;
72//!
73//! // Create a shared channel that can be sent along from many threads
74//! // where tx is the sending half (tx for transmission), and rx is the receiving
75//! // half (rx for receiving).
76//! let (tx, rx) = channel();
85aaf69f 77//! for i in 0..10 {
1a4d82fc 78//! let tx = tx.clone();
85aaf69f 79//! thread::spawn(move|| {
1a4d82fc
JJ
80//! tx.send(i).unwrap();
81//! });
82//! }
83//!
85aaf69f 84//! for _ in 0..10 {
1a4d82fc
JJ
85//! let j = rx.recv().unwrap();
86//! assert!(0 <= j && j < 10);
87//! }
88//! ```
89//!
90//! Propagating panics:
91//!
92//! ```
93//! use std::sync::mpsc::channel;
94//!
95//! // The call to recv() will return an error because the channel has already
96//! // hung up (or been deallocated)
c34b1796 97//! let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
98//! drop(tx);
99//! assert!(rx.recv().is_err());
100//! ```
101//!
102//! Synchronous channels:
103//!
104//! ```
85aaf69f 105//! use std::thread;
1a4d82fc
JJ
106//! use std::sync::mpsc::sync_channel;
107//!
c34b1796 108//! let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 109//! thread::spawn(move|| {
1a4d82fc
JJ
110//! // This will wait for the parent task to start receiving
111//! tx.send(53).unwrap();
112//! });
113//! rx.recv().unwrap();
114//! ```
115//!
116//! Reading from a channel with a timeout requires to use a Timer together
c34b1796 117//! with the channel. You can use the `select!` macro to select either and
1a4d82fc
JJ
118//! handle the timeout case. This first example will break out of the loop
119//! after 10 seconds no matter what:
120//!
121//! ```no_run
c34b1796 122//! # #![feature(std_misc, old_io)]
1a4d82fc 123//! use std::sync::mpsc::channel;
85aaf69f 124//! use std::old_io::timer::Timer;
1a4d82fc
JJ
125//! use std::time::Duration;
126//!
c34b1796 127//! let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
128//! let mut timer = Timer::new().unwrap();
129//! let timeout = timer.oneshot(Duration::seconds(10));
130//!
131//! loop {
132//! select! {
133//! val = rx.recv() => println!("Received {}", val.unwrap()),
134//! _ = timeout.recv() => {
135//! println!("timed out, total time was more than 10 seconds");
136//! break;
137//! }
138//! }
139//! }
140//! ```
141//!
142//! This second example is more costly since it allocates a new timer every
143//! time a message is received, but it allows you to timeout after the channel
144//! has been inactive for 5 seconds:
145//!
146//! ```no_run
c34b1796 147//! # #![feature(std_misc, old_io)]
1a4d82fc 148//! use std::sync::mpsc::channel;
85aaf69f 149//! use std::old_io::timer::Timer;
1a4d82fc
JJ
150//! use std::time::Duration;
151//!
c34b1796 152//! let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
153//! let mut timer = Timer::new().unwrap();
154//!
155//! loop {
156//! let timeout = timer.oneshot(Duration::seconds(5));
157//!
158//! select! {
159//! val = rx.recv() => println!("Received {}", val.unwrap()),
160//! _ = timeout.recv() => {
161//! println!("timed out, no message received in 5 seconds");
162//! break;
163//! }
164//! }
165//! }
166//! ```
167
85aaf69f 168#![stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
169
170// A description of how Rust's channel implementation works
171//
172// Channels are supposed to be the basic building block for all other
173// concurrent primitives that are used in Rust. As a result, the channel type
174// needs to be highly optimized, flexible, and broad enough for use everywhere.
175//
176// The choice of implementation of all channels is to be built on lock-free data
177// structures. The channels themselves are then consequently also lock-free data
178// structures. As always with lock-free code, this is a very "here be dragons"
179// territory, especially because I'm unaware of any academic papers that have
180// gone into great length about channels of these flavors.
181//
182// ## Flavors of channels
183//
184// From the perspective of a consumer of this library, there is only one flavor
185// of channel. This channel can be used as a stream and cloned to allow multiple
186// senders. Under the hood, however, there are actually three flavors of
187// channels in play.
188//
189// * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
190// They contain as few atomics as possible and involve one and
191// exactly one allocation.
192// * Streams - these channels are optimized for the non-shared use case. They
193// use a different concurrent queue that is more tailored for this
194// use case. The initial allocation of this flavor of channel is not
195// optimized.
196// * Shared - this is the most general form of channel that this module offers,
197// a channel with multiple senders. This type is as optimized as it
198// can be, but the previous two types mentioned are much faster for
199// their use-cases.
200//
201// ## Concurrent queues
202//
203// The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
204// recv() obviously blocks. This means that under the hood there must be some
205// shared and concurrent queue holding all of the actual data.
206//
207// With two flavors of channels, two flavors of queues are also used. We have
208// chosen to use queues from a well-known author that are abbreviated as SPSC
209// and MPSC (single producer, single consumer and multiple producer, single
210// consumer). SPSC queues are used for streams while MPSC queues are used for
211// shared channels.
212//
213// ### SPSC optimizations
214//
215// The SPSC queue found online is essentially a linked list of nodes where one
216// half of the nodes are the "queue of data" and the other half of nodes are a
217// cache of unused nodes. The unused nodes are used such that an allocation is
218// not required on every push() and a free doesn't need to happen on every
219// pop().
220//
221// As found online, however, the cache of nodes is of an infinite size. This
222// means that if a channel at one point in its life had 50k items in the queue,
223// then the queue will always have the capacity for 50k items. I believed that
224// this was an unnecessary limitation of the implementation, so I have altered
225// the queue to optionally have a bound on the cache size.
226//
227// By default, streams will have an unbounded SPSC queue with a small-ish cache
228// size. The hope is that the cache is still large enough to have very fast
229// send() operations while not too large such that millions of channels can
230// coexist at once.
231//
232// ### MPSC optimizations
233//
234// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
235// a linked list under the hood to earn its unboundedness, but I have not put
236// forth much effort into having a cache of nodes similar to the SPSC queue.
237//
238// For now, I believe that this is "ok" because shared channels are not the most
239// common type, but soon we may wish to revisit this queue choice and determine
240// another candidate for backend storage of shared channels.
241//
242// ## Overview of the Implementation
243//
244// Now that there's a little background on the concurrent queues used, it's
245// worth going into much more detail about the channels themselves. The basic
246// pseudocode for a send/recv are:
247//
248//
249// send(t) recv()
250// queue.push(t) return if queue.pop()
251// if increment() == -1 deschedule {
252// wakeup() if decrement() > 0
253// cancel_deschedule()
254// }
255// queue.pop()
256//
257// As mentioned before, there are no locks in this implementation, only atomic
258// instructions are used.
259//
260// ### The internal atomic counter
261//
262// Every channel has a shared counter with each half to keep track of the size
263// of the queue. This counter is used to abort descheduling by the receiver and
264// to know when to wake up on the sending side.
265//
266// As seen in the pseudocode, senders will increment this count and receivers
267// will decrement the count. The theory behind this is that if a sender sees a
268// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
269// then it doesn't need to block.
270//
271// The recv() method has a beginning call to pop(), and if successful, it needs
272// to decrement the count. It is a crucial implementation detail that this
273// decrement does *not* happen to the shared counter. If this were the case,
274// then it would be possible for the counter to be very negative when there were
275// no receivers waiting, in which case the senders would have to determine when
276// it was actually appropriate to wake up a receiver.
277//
278// Instead, the "steal count" is kept track of separately (not atomically
279// because it's only used by receivers), and then the decrement() call when
280// descheduling will lump in all of the recent steals into one large decrement.
281//
282// The implication of this is that if a sender sees a -1 count, then there's
283// guaranteed to be a waiter waiting!
284//
285// ## Native Implementation
286//
287// A major goal of these channels is to work seamlessly on and off the runtime.
288// All of the previous race conditions have been worded in terms of
289// scheduler-isms (which is obviously not available without the runtime).
290//
291// For now, native usage of channels (off the runtime) will fall back onto
292// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
293// is still entirely lock-free, the "deschedule" blocks above are surrounded by
294// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
295// condition variable.
296//
297// ## Select
298//
299// Being able to support selection over channels has greatly influenced this
300// design, and not only does selection need to work inside the runtime, but also
301// outside the runtime.
302//
303// The implementation is fairly straightforward. The goal of select() is not to
304// return some data, but only to return which channel can receive data without
305// blocking. The implementation is essentially the entire blocking procedure
306// followed by an increment as soon as its woken up. The cancellation procedure
307// involves an increment and swapping out of to_wake to acquire ownership of the
308// task to unblock.
309//
310// Sadly this current implementation requires multiple allocations, so I have
311// seen the throughput of select() be much worse than it should be. I do not
312// believe that there is anything fundamental that needs to change about these
313// channels, however, in order to support a more efficient select().
314//
315// # Conclusion
316//
317// And now that you've seen all the races that I found and attempted to fix,
318// here's the code for you to find some more!
319
320use prelude::v1::*;
321
322use sync::Arc;
c34b1796 323use error;
1a4d82fc 324use fmt;
1a4d82fc
JJ
325use mem;
326use cell::UnsafeCell;
327
328pub use self::select::{Select, Handle};
329use self::select::StartResult;
330use self::select::StartResult::*;
331use self::blocking::SignalToken;
332
333mod blocking;
334mod oneshot;
335mod select;
336mod shared;
337mod stream;
338mod sync;
339mod mpsc_queue;
340mod spsc_queue;
341
342/// The receiving-half of Rust's channel type. This half can only be owned by
343/// one task
85aaf69f 344#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
345pub struct Receiver<T> {
346 inner: UnsafeCell<Flavor<T>>,
347}
348
349// The receiver port can be sent from place to place, so long as it
350// is not used to receive non-sendable things.
c34b1796 351unsafe impl<T: Send> Send for Receiver<T> { }
1a4d82fc
JJ
352
353/// An iterator over messages on a receiver, this iterator will block
354/// whenever `next` is called, waiting for a new message, and `None` will be
355/// returned when the corresponding channel has hung up.
85aaf69f 356#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 357pub struct Iter<'a, T: 'a> {
1a4d82fc
JJ
358 rx: &'a Receiver<T>
359}
360
361/// The sending-half of Rust's asynchronous channel type. This half can only be
362/// owned by one task, but it can be cloned to send to other tasks.
85aaf69f 363#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
364pub struct Sender<T> {
365 inner: UnsafeCell<Flavor<T>>,
366}
367
368// The send port can be sent from place to place, so long as it
369// is not used to send non-sendable things.
c34b1796 370unsafe impl<T: Send> Send for Sender<T> { }
1a4d82fc
JJ
371
372/// The sending-half of Rust's synchronous channel type. This half can only be
373/// owned by one task, but it can be cloned to send to other tasks.
85aaf69f 374#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 375pub struct SyncSender<T> {
85aaf69f 376 inner: Arc<UnsafeCell<sync::Packet<T>>>,
1a4d82fc
JJ
377}
378
c34b1796 379unsafe impl<T: Send> Send for SyncSender<T> {}
85aaf69f
SL
380
381impl<T> !Sync for SyncSender<T> {}
382
1a4d82fc
JJ
383/// An error returned from the `send` function on channels.
384///
385/// A `send` operation can only fail if the receiving end of a channel is
386/// disconnected, implying that the data could never be received. The error
387/// contains the data being sent as a payload so it can be recovered.
85aaf69f
SL
388#[stable(feature = "rust1", since = "1.0.0")]
389#[derive(PartialEq, Eq, Clone, Copy)]
c34b1796 390pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
1a4d82fc
JJ
391
392/// An error returned from the `recv` function on a `Receiver`.
393///
394/// The `recv` operation can only fail if the sending half of a channel is
395/// disconnected, implying that no further messages will ever be received.
85aaf69f
SL
396#[derive(PartialEq, Eq, Clone, Copy, Debug)]
397#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
398pub struct RecvError;
399
400/// This enumeration is the list of the possible reasons that try_recv could not
401/// return data when called.
85aaf69f
SL
402#[derive(PartialEq, Eq, Clone, Copy, Debug)]
403#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
404pub enum TryRecvError {
405 /// This channel is currently empty, but the sender(s) have not yet
406 /// disconnected, so data may yet become available.
85aaf69f 407 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
408 Empty,
409
410 /// This channel's sending half has become disconnected, and there will
411 /// never be any more data received on this channel
85aaf69f 412 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
413 Disconnected,
414}
415
416/// This enumeration is the list of the possible error outcomes for the
417/// `SyncSender::try_send` method.
85aaf69f
SL
418#[stable(feature = "rust1", since = "1.0.0")]
419#[derive(PartialEq, Eq, Clone, Copy)]
1a4d82fc
JJ
420pub enum TrySendError<T> {
421 /// The data could not be sent on the channel because it would require that
422 /// the callee block to send the data.
423 ///
424 /// If this is a buffered channel, then the buffer is full at this time. If
425 /// this is not a buffered channel, then there is no receiver available to
426 /// acquire the data.
85aaf69f 427 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
428 Full(T),
429
430 /// This channel's receiving half has disconnected, so the data could not be
431 /// sent. The data is returned back to the callee in this case.
85aaf69f 432 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
433 Disconnected(T),
434}
435
436enum Flavor<T> {
85aaf69f
SL
437 Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
438 Stream(Arc<UnsafeCell<stream::Packet<T>>>),
439 Shared(Arc<UnsafeCell<shared::Packet<T>>>),
440 Sync(Arc<UnsafeCell<sync::Packet<T>>>),
1a4d82fc
JJ
441}
442
443#[doc(hidden)]
444trait UnsafeFlavor<T> {
445 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
446 unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
447 &mut *self.inner_unsafe().get()
448 }
449 unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
450 &*self.inner_unsafe().get()
451 }
452}
453impl<T> UnsafeFlavor<T> for Sender<T> {
454 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
455 &self.inner
456 }
457}
458impl<T> UnsafeFlavor<T> for Receiver<T> {
459 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
460 &self.inner
461 }
462}
463
464/// Creates a new asynchronous channel, returning the sender/receiver halves.
465///
466/// All data sent on the sender will become available on the receiver, and no
467/// send will block the calling task (this channel has an "infinite buffer").
468///
c34b1796 469/// # Examples
1a4d82fc
JJ
470///
471/// ```
472/// use std::sync::mpsc::channel;
85aaf69f 473/// use std::thread;
1a4d82fc
JJ
474///
475/// // tx is is the sending half (tx for transmission), and rx is the receiving
476/// // half (rx for receiving).
477/// let (tx, rx) = channel();
478///
479/// // Spawn off an expensive computation
85aaf69f 480/// thread::spawn(move|| {
1a4d82fc
JJ
481/// # fn expensive_computation() {}
482/// tx.send(expensive_computation()).unwrap();
483/// });
484///
485/// // Do some useful work for awhile
486///
487/// // Let's see what that answer was
488/// println!("{:?}", rx.recv().unwrap());
489/// ```
85aaf69f 490#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 491pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
85aaf69f 492 let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
1a4d82fc
JJ
493 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
494}
495
496/// Creates a new synchronous, bounded channel.
497///
498/// Like asynchronous channels, the `Receiver` will block until a message
499/// becomes available. These channels differ greatly in the semantics of the
500/// sender from asynchronous channels, however.
501///
502/// This channel has an internal buffer on which messages will be queued. When
503/// the internal buffer becomes full, future sends will *block* waiting for the
504/// buffer to open up. Note that a buffer size of 0 is valid, in which case this
505/// becomes "rendezvous channel" where each send will not return until a recv
506/// is paired with it.
507///
508/// As with asynchronous channels, all senders will panic in `send` if the
509/// `Receiver` has been destroyed.
510///
c34b1796 511/// # Examples
1a4d82fc
JJ
512///
513/// ```
514/// use std::sync::mpsc::sync_channel;
85aaf69f 515/// use std::thread;
1a4d82fc
JJ
516///
517/// let (tx, rx) = sync_channel(1);
518///
519/// // this returns immediately
85aaf69f 520/// tx.send(1).unwrap();
1a4d82fc 521///
85aaf69f 522/// thread::spawn(move|| {
1a4d82fc 523/// // this will block until the previous message has been received
85aaf69f 524/// tx.send(2).unwrap();
1a4d82fc
JJ
525/// });
526///
85aaf69f
SL
527/// assert_eq!(rx.recv().unwrap(), 1);
528/// assert_eq!(rx.recv().unwrap(), 2);
1a4d82fc 529/// ```
85aaf69f 530#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 531pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
85aaf69f 532 let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
1a4d82fc
JJ
533 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
534}
535
536////////////////////////////////////////////////////////////////////////////////
537// Sender
538////////////////////////////////////////////////////////////////////////////////
539
c34b1796 540impl<T> Sender<T> {
1a4d82fc
JJ
541 fn new(inner: Flavor<T>) -> Sender<T> {
542 Sender {
543 inner: UnsafeCell::new(inner),
544 }
545 }
546
547 /// Attempts to send a value on this channel, returning it back if it could
548 /// not be sent.
549 ///
550 /// A successful send occurs when it is determined that the other end of
551 /// the channel has not hung up already. An unsuccessful send would be one
552 /// where the corresponding receiver has already been deallocated. Note
553 /// that a return value of `Err` means that the data will never be
554 /// received, but a return value of `Ok` does *not* mean that the data
555 /// will be received. It is possible for the corresponding receiver to
556 /// hang up immediately after this function returns `Ok`.
557 ///
558 /// This method will never block the current thread.
559 ///
c34b1796 560 /// # Examples
1a4d82fc
JJ
561 ///
562 /// ```
563 /// use std::sync::mpsc::channel;
564 ///
565 /// let (tx, rx) = channel();
566 ///
567 /// // This send is always successful
85aaf69f 568 /// tx.send(1).unwrap();
1a4d82fc
JJ
569 ///
570 /// // This send will fail because the receiver is gone
571 /// drop(rx);
85aaf69f 572 /// assert_eq!(tx.send(1).err().unwrap().0, 1);
1a4d82fc 573 /// ```
85aaf69f 574 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
575 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
576 let (new_inner, ret) = match *unsafe { self.inner() } {
577 Flavor::Oneshot(ref p) => {
578 unsafe {
579 let p = p.get();
580 if !(*p).sent() {
581 return (*p).send(t).map_err(SendError);
582 } else {
583 let a =
85aaf69f 584 Arc::new(UnsafeCell::new(stream::Packet::new()));
1a4d82fc
JJ
585 let rx = Receiver::new(Flavor::Stream(a.clone()));
586 match (*p).upgrade(rx) {
587 oneshot::UpSuccess => {
588 let ret = (*a.get()).send(t);
589 (a, ret)
590 }
591 oneshot::UpDisconnected => (a, Err(t)),
592 oneshot::UpWoke(token) => {
593 // This send cannot panic because the thread is
594 // asleep (we're looking at it), so the receiver
595 // can't go away.
596 (*a.get()).send(t).ok().unwrap();
c34b1796 597 token.signal();
1a4d82fc
JJ
598 (a, Ok(()))
599 }
600 }
601 }
602 }
603 }
604 Flavor::Stream(ref p) => return unsafe {
605 (*p.get()).send(t).map_err(SendError)
606 },
607 Flavor::Shared(ref p) => return unsafe {
608 (*p.get()).send(t).map_err(SendError)
609 },
610 Flavor::Sync(..) => unreachable!(),
611 };
612
613 unsafe {
614 let tmp = Sender::new(Flavor::Stream(new_inner));
615 mem::swap(self.inner_mut(), tmp.inner_mut());
616 }
617 ret.map_err(SendError)
618 }
619}
620
85aaf69f 621#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 622impl<T> Clone for Sender<T> {
1a4d82fc
JJ
623 fn clone(&self) -> Sender<T> {
624 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
625 Flavor::Oneshot(ref p) => {
85aaf69f 626 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
1a4d82fc
JJ
627 unsafe {
628 let guard = (*a.get()).postinit_lock();
629 let rx = Receiver::new(Flavor::Shared(a.clone()));
630 match (*p.get()).upgrade(rx) {
631 oneshot::UpSuccess |
632 oneshot::UpDisconnected => (a, None, guard),
633 oneshot::UpWoke(task) => (a, Some(task), guard)
634 }
635 }
636 }
637 Flavor::Stream(ref p) => {
85aaf69f 638 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
1a4d82fc
JJ
639 unsafe {
640 let guard = (*a.get()).postinit_lock();
641 let rx = Receiver::new(Flavor::Shared(a.clone()));
642 match (*p.get()).upgrade(rx) {
643 stream::UpSuccess |
644 stream::UpDisconnected => (a, None, guard),
645 stream::UpWoke(task) => (a, Some(task), guard),
646 }
647 }
648 }
649 Flavor::Shared(ref p) => {
650 unsafe { (*p.get()).clone_chan(); }
651 return Sender::new(Flavor::Shared(p.clone()));
652 }
653 Flavor::Sync(..) => unreachable!(),
654 };
655
656 unsafe {
657 (*packet.get()).inherit_blocker(sleeper, guard);
658
659 let tmp = Sender::new(Flavor::Shared(packet.clone()));
660 mem::swap(self.inner_mut(), tmp.inner_mut());
661 }
662 Sender::new(Flavor::Shared(packet))
663 }
664}
665
666#[unsafe_destructor]
85aaf69f 667#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 668impl<T> Drop for Sender<T> {
1a4d82fc
JJ
669 fn drop(&mut self) {
670 match *unsafe { self.inner_mut() } {
671 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
672 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
673 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
674 Flavor::Sync(..) => unreachable!(),
675 }
676 }
677}
678
679////////////////////////////////////////////////////////////////////////////////
680// SyncSender
681////////////////////////////////////////////////////////////////////////////////
682
c34b1796 683impl<T> SyncSender<T> {
85aaf69f
SL
684 fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
685 SyncSender { inner: inner }
1a4d82fc
JJ
686 }
687
688 /// Sends a value on this synchronous channel.
689 ///
690 /// This function will *block* until space in the internal buffer becomes
691 /// available or a receiver is available to hand off the message to.
692 ///
693 /// Note that a successful send does *not* guarantee that the receiver will
694 /// ever see the data if there is a buffer on this channel. Items may be
695 /// enqueued in the internal buffer for the receiver to receive at a later
696 /// time. If the buffer size is 0, however, it can be guaranteed that the
697 /// receiver has indeed received the data if this function returns success.
698 ///
699 /// This function will never panic, but it may return `Err` if the
700 /// `Receiver` has disconnected and is no longer able to receive
701 /// information.
85aaf69f 702 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
703 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
704 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
705 }
706
707 /// Attempts to send a value on this channel without blocking.
708 ///
709 /// This method differs from `send` by returning immediately if the
710 /// channel's buffer is full or no receiver is waiting to acquire some
711 /// data. Compared with `send`, this function has two failure cases
712 /// instead of one (one for disconnection, one for a full buffer).
713 ///
714 /// See `SyncSender::send` for notes about guarantees of whether the
715 /// receiver has received the data or not if this function is successful.
85aaf69f 716 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
717 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
718 unsafe { (*self.inner.get()).try_send(t) }
719 }
720}
721
85aaf69f 722#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 723impl<T> Clone for SyncSender<T> {
1a4d82fc
JJ
724 fn clone(&self) -> SyncSender<T> {
725 unsafe { (*self.inner.get()).clone_chan(); }
726 return SyncSender::new(self.inner.clone());
727 }
728}
729
730#[unsafe_destructor]
85aaf69f 731#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 732impl<T> Drop for SyncSender<T> {
1a4d82fc
JJ
733 fn drop(&mut self) {
734 unsafe { (*self.inner.get()).drop_chan(); }
735 }
736}
737
738////////////////////////////////////////////////////////////////////////////////
739// Receiver
740////////////////////////////////////////////////////////////////////////////////
741
c34b1796 742impl<T> Receiver<T> {
1a4d82fc
JJ
743 fn new(inner: Flavor<T>) -> Receiver<T> {
744 Receiver { inner: UnsafeCell::new(inner) }
745 }
746
747 /// Attempts to return a pending value on this receiver without blocking
748 ///
749 /// This method will never block the caller in order to wait for data to
750 /// become available. Instead, this will always return immediately with a
751 /// possible option of pending data on the channel.
752 ///
753 /// This is useful for a flavor of "optimistic check" before deciding to
754 /// block on a receiver.
85aaf69f 755 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
756 pub fn try_recv(&self) -> Result<T, TryRecvError> {
757 loop {
758 let new_port = match *unsafe { self.inner() } {
759 Flavor::Oneshot(ref p) => {
760 match unsafe { (*p.get()).try_recv() } {
761 Ok(t) => return Ok(t),
762 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
763 Err(oneshot::Disconnected) => {
764 return Err(TryRecvError::Disconnected)
765 }
766 Err(oneshot::Upgraded(rx)) => rx,
767 }
768 }
769 Flavor::Stream(ref p) => {
770 match unsafe { (*p.get()).try_recv() } {
771 Ok(t) => return Ok(t),
772 Err(stream::Empty) => return Err(TryRecvError::Empty),
773 Err(stream::Disconnected) => {
774 return Err(TryRecvError::Disconnected)
775 }
776 Err(stream::Upgraded(rx)) => rx,
777 }
778 }
779 Flavor::Shared(ref p) => {
780 match unsafe { (*p.get()).try_recv() } {
781 Ok(t) => return Ok(t),
782 Err(shared::Empty) => return Err(TryRecvError::Empty),
783 Err(shared::Disconnected) => {
784 return Err(TryRecvError::Disconnected)
785 }
786 }
787 }
788 Flavor::Sync(ref p) => {
789 match unsafe { (*p.get()).try_recv() } {
790 Ok(t) => return Ok(t),
791 Err(sync::Empty) => return Err(TryRecvError::Empty),
792 Err(sync::Disconnected) => {
793 return Err(TryRecvError::Disconnected)
794 }
795 }
796 }
797 };
798 unsafe {
799 mem::swap(self.inner_mut(),
800 new_port.inner_mut());
801 }
802 }
803 }
804
805 /// Attempt to wait for a value on this receiver, returning an error if the
806 /// corresponding channel has hung up.
807 ///
808 /// This function will always block the current thread if there is no data
809 /// available and it's possible for more data to be sent. Once a message is
810 /// sent to the corresponding `Sender`, then this receiver will wake up and
811 /// return that message.
812 ///
813 /// If the corresponding `Sender` has disconnected, or it disconnects while
814 /// this call is blocking, this call will wake up and return `Err` to
815 /// indicate that no more messages can ever be received on this channel.
85aaf69f 816 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
817 pub fn recv(&self) -> Result<T, RecvError> {
818 loop {
819 let new_port = match *unsafe { self.inner() } {
820 Flavor::Oneshot(ref p) => {
821 match unsafe { (*p.get()).recv() } {
822 Ok(t) => return Ok(t),
823 Err(oneshot::Empty) => return unreachable!(),
824 Err(oneshot::Disconnected) => return Err(RecvError),
825 Err(oneshot::Upgraded(rx)) => rx,
826 }
827 }
828 Flavor::Stream(ref p) => {
829 match unsafe { (*p.get()).recv() } {
830 Ok(t) => return Ok(t),
831 Err(stream::Empty) => return unreachable!(),
832 Err(stream::Disconnected) => return Err(RecvError),
833 Err(stream::Upgraded(rx)) => rx,
834 }
835 }
836 Flavor::Shared(ref p) => {
837 match unsafe { (*p.get()).recv() } {
838 Ok(t) => return Ok(t),
839 Err(shared::Empty) => return unreachable!(),
840 Err(shared::Disconnected) => return Err(RecvError),
841 }
842 }
843 Flavor::Sync(ref p) => return unsafe {
844 (*p.get()).recv().map_err(|()| RecvError)
845 }
846 };
847 unsafe {
848 mem::swap(self.inner_mut(), new_port.inner_mut());
849 }
850 }
851 }
852
853 /// Returns an iterator that will block waiting for messages, but never
854 /// `panic!`. It will return `None` when the channel has hung up.
85aaf69f 855 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
856 pub fn iter(&self) -> Iter<T> {
857 Iter { rx: self }
858 }
859}
860
c34b1796 861impl<T> select::Packet for Receiver<T> {
1a4d82fc
JJ
862 fn can_recv(&self) -> bool {
863 loop {
864 let new_port = match *unsafe { self.inner() } {
865 Flavor::Oneshot(ref p) => {
866 match unsafe { (*p.get()).can_recv() } {
867 Ok(ret) => return ret,
868 Err(upgrade) => upgrade,
869 }
870 }
871 Flavor::Stream(ref p) => {
872 match unsafe { (*p.get()).can_recv() } {
873 Ok(ret) => return ret,
874 Err(upgrade) => upgrade,
875 }
876 }
877 Flavor::Shared(ref p) => {
878 return unsafe { (*p.get()).can_recv() };
879 }
880 Flavor::Sync(ref p) => {
881 return unsafe { (*p.get()).can_recv() };
882 }
883 };
884 unsafe {
885 mem::swap(self.inner_mut(),
886 new_port.inner_mut());
887 }
888 }
889 }
890
891 fn start_selection(&self, mut token: SignalToken) -> StartResult {
892 loop {
893 let (t, new_port) = match *unsafe { self.inner() } {
894 Flavor::Oneshot(ref p) => {
895 match unsafe { (*p.get()).start_selection(token) } {
896 oneshot::SelSuccess => return Installed,
897 oneshot::SelCanceled => return Abort,
898 oneshot::SelUpgraded(t, rx) => (t, rx),
899 }
900 }
901 Flavor::Stream(ref p) => {
902 match unsafe { (*p.get()).start_selection(token) } {
903 stream::SelSuccess => return Installed,
904 stream::SelCanceled => return Abort,
905 stream::SelUpgraded(t, rx) => (t, rx),
906 }
907 }
908 Flavor::Shared(ref p) => {
909 return unsafe { (*p.get()).start_selection(token) };
910 }
911 Flavor::Sync(ref p) => {
912 return unsafe { (*p.get()).start_selection(token) };
913 }
914 };
915 token = t;
916 unsafe {
917 mem::swap(self.inner_mut(), new_port.inner_mut());
918 }
919 }
920 }
921
922 fn abort_selection(&self) -> bool {
923 let mut was_upgrade = false;
924 loop {
925 let result = match *unsafe { self.inner() } {
926 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
927 Flavor::Stream(ref p) => unsafe {
928 (*p.get()).abort_selection(was_upgrade)
929 },
930 Flavor::Shared(ref p) => return unsafe {
931 (*p.get()).abort_selection(was_upgrade)
932 },
933 Flavor::Sync(ref p) => return unsafe {
934 (*p.get()).abort_selection()
935 },
936 };
937 let new_port = match result { Ok(b) => return b, Err(p) => p };
938 was_upgrade = true;
939 unsafe {
940 mem::swap(self.inner_mut(),
941 new_port.inner_mut());
942 }
943 }
944 }
945}
946
85aaf69f 947#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 948impl<'a, T> Iterator for Iter<'a, T> {
1a4d82fc
JJ
949 type Item = T;
950
951 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
952}
953
954#[unsafe_destructor]
85aaf69f 955#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 956impl<T> Drop for Receiver<T> {
1a4d82fc
JJ
957 fn drop(&mut self) {
958 match *unsafe { self.inner_mut() } {
959 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
960 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
961 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
962 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
963 }
964 }
965}
966
85aaf69f
SL
967#[stable(feature = "rust1", since = "1.0.0")]
968impl<T> fmt::Debug for SendError<T> {
969 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
970 "SendError(..)".fmt(f)
1a4d82fc 971 }
85aaf69f 972}
1a4d82fc 973
85aaf69f
SL
974#[stable(feature = "rust1", since = "1.0.0")]
975impl<T> fmt::Display for SendError<T> {
976 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
977 "sending on a closed channel".fmt(f)
1a4d82fc 978 }
1a4d82fc
JJ
979}
980
c34b1796
AL
981#[stable(feature = "rust1", since = "1.0.0")]
982impl<T: Send> error::Error for SendError<T> {
983
984 fn description(&self) -> &str {
985 "sending on a closed channel"
986 }
987
988 fn cause(&self) -> Option<&error::Error> {
989 None
990 }
991}
992
85aaf69f
SL
993#[stable(feature = "rust1", since = "1.0.0")]
994impl<T> fmt::Debug for TrySendError<T> {
1a4d82fc 995 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85aaf69f
SL
996 match *self {
997 TrySendError::Full(..) => "Full(..)".fmt(f),
998 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
999 }
1a4d82fc
JJ
1000 }
1001}
1002
85aaf69f
SL
1003#[stable(feature = "rust1", since = "1.0.0")]
1004impl<T> fmt::Display for TrySendError<T> {
1a4d82fc
JJ
1005 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1006 match *self {
1007 TrySendError::Full(..) => {
1008 "sending on a full channel".fmt(f)
1009 }
1010 TrySendError::Disconnected(..) => {
1011 "sending on a closed channel".fmt(f)
1012 }
1013 }
1014 }
1015}
1016
c34b1796
AL
1017#[stable(feature = "rust1", since = "1.0.0")]
1018impl<T: Send> error::Error for TrySendError<T> {
1019
1020 fn description(&self) -> &str {
1021 match *self {
1022 TrySendError::Full(..) => {
1023 "sending on a full channel"
1024 }
1025 TrySendError::Disconnected(..) => {
1026 "sending on a closed channel"
1027 }
1028 }
1029 }
1030
1031 fn cause(&self) -> Option<&error::Error> {
1032 None
1033 }
1034}
1035
85aaf69f
SL
1036#[stable(feature = "rust1", since = "1.0.0")]
1037impl fmt::Display for RecvError {
1a4d82fc
JJ
1038 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1039 "receiving on a closed channel".fmt(f)
1040 }
1041}
1042
c34b1796
AL
1043#[stable(feature = "rust1", since = "1.0.0")]
1044impl error::Error for RecvError {
1045
1046 fn description(&self) -> &str {
1047 "receiving on a closed channel"
1048 }
1049
1050 fn cause(&self) -> Option<&error::Error> {
1051 None
1052 }
1053}
1054
85aaf69f
SL
1055#[stable(feature = "rust1", since = "1.0.0")]
1056impl fmt::Display for TryRecvError {
1a4d82fc
JJ
1057 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1058 match *self {
1059 TryRecvError::Empty => {
1060 "receiving on an empty channel".fmt(f)
1061 }
1062 TryRecvError::Disconnected => {
1063 "receiving on a closed channel".fmt(f)
1064 }
1065 }
1066 }
1067}
1068
c34b1796
AL
1069#[stable(feature = "rust1", since = "1.0.0")]
1070impl error::Error for TryRecvError {
1071
1072 fn description(&self) -> &str {
1073 match *self {
1074 TryRecvError::Empty => {
1075 "receiving on an empty channel"
1076 }
1077 TryRecvError::Disconnected => {
1078 "receiving on a closed channel"
1079 }
1080 }
1081 }
1082
1083 fn cause(&self) -> Option<&error::Error> {
1084 None
1085 }
1086}
1087
1a4d82fc
JJ
1088#[cfg(test)]
1089mod test {
1090 use prelude::v1::*;
1091
85aaf69f 1092 use std::env;
1a4d82fc 1093 use super::*;
85aaf69f 1094 use thread;
1a4d82fc 1095
c34b1796 1096 pub fn stress_factor() -> usize {
85aaf69f
SL
1097 match env::var("RUST_TEST_STRESS") {
1098 Ok(val) => val.parse().unwrap(),
1099 Err(..) => 1,
1a4d82fc
JJ
1100 }
1101 }
1102
1103 #[test]
1104 fn smoke() {
c34b1796 1105 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1106 tx.send(1).unwrap();
1107 assert_eq!(rx.recv().unwrap(), 1);
1108 }
1109
1110 #[test]
1111 fn drop_full() {
c34b1796 1112 let (tx, _rx) = channel::<Box<isize>>();
85aaf69f 1113 tx.send(box 1).unwrap();
1a4d82fc
JJ
1114 }
1115
1116 #[test]
1117 fn drop_full_shared() {
c34b1796 1118 let (tx, _rx) = channel::<Box<isize>>();
1a4d82fc
JJ
1119 drop(tx.clone());
1120 drop(tx.clone());
85aaf69f 1121 tx.send(box 1).unwrap();
1a4d82fc
JJ
1122 }
1123
1124 #[test]
1125 fn smoke_shared() {
c34b1796 1126 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1127 tx.send(1).unwrap();
1128 assert_eq!(rx.recv().unwrap(), 1);
1129 let tx = tx.clone();
1130 tx.send(1).unwrap();
1131 assert_eq!(rx.recv().unwrap(), 1);
1132 }
1133
1134 #[test]
1135 fn smoke_threads() {
c34b1796 1136 let (tx, rx) = channel::<i32>();
85aaf69f 1137 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1138 tx.send(1).unwrap();
1139 });
1140 assert_eq!(rx.recv().unwrap(), 1);
1141 }
1142
1143 #[test]
1144 fn smoke_port_gone() {
c34b1796 1145 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1146 drop(rx);
1147 assert!(tx.send(1).is_err());
1148 }
1149
1150 #[test]
1151 fn smoke_shared_port_gone() {
c34b1796 1152 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1153 drop(rx);
1154 assert!(tx.send(1).is_err())
1155 }
1156
1157 #[test]
1158 fn smoke_shared_port_gone2() {
c34b1796 1159 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1160 drop(rx);
1161 let tx2 = tx.clone();
1162 drop(tx);
1163 assert!(tx2.send(1).is_err());
1164 }
1165
1166 #[test]
1167 fn port_gone_concurrent() {
c34b1796 1168 let (tx, rx) = channel::<i32>();
85aaf69f 1169 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1170 rx.recv().unwrap();
1171 });
1172 while tx.send(1).is_ok() {}
1173 }
1174
1175 #[test]
1176 fn port_gone_concurrent_shared() {
c34b1796 1177 let (tx, rx) = channel::<i32>();
1a4d82fc 1178 let tx2 = tx.clone();
85aaf69f 1179 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1180 rx.recv().unwrap();
1181 });
1182 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1183 }
1184
1185 #[test]
1186 fn smoke_chan_gone() {
c34b1796 1187 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1188 drop(tx);
1189 assert!(rx.recv().is_err());
1190 }
1191
1192 #[test]
1193 fn smoke_chan_gone_shared() {
1194 let (tx, rx) = channel::<()>();
1195 let tx2 = tx.clone();
1196 drop(tx);
1197 drop(tx2);
1198 assert!(rx.recv().is_err());
1199 }
1200
1201 #[test]
1202 fn chan_gone_concurrent() {
c34b1796 1203 let (tx, rx) = channel::<i32>();
85aaf69f 1204 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1205 tx.send(1).unwrap();
1206 tx.send(1).unwrap();
1207 });
1208 while rx.recv().is_ok() {}
1209 }
1210
1211 #[test]
1212 fn stress() {
c34b1796 1213 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1214 let t = thread::spawn(move|| {
1215 for _ in 0..10000 { tx.send(1).unwrap(); }
1a4d82fc 1216 });
85aaf69f 1217 for _ in 0..10000 {
1a4d82fc
JJ
1218 assert_eq!(rx.recv().unwrap(), 1);
1219 }
1220 t.join().ok().unwrap();
1221 }
1222
1223 #[test]
1224 fn stress_shared() {
c34b1796
AL
1225 const AMT: u32 = 10000;
1226 const NTHREADS: u32 = 8;
1227 let (tx, rx) = channel::<i32>();
1a4d82fc 1228
85aaf69f
SL
1229 let t = thread::spawn(move|| {
1230 for _ in 0..AMT * NTHREADS {
1a4d82fc
JJ
1231 assert_eq!(rx.recv().unwrap(), 1);
1232 }
1233 match rx.try_recv() {
1234 Ok(..) => panic!(),
1235 _ => {}
1236 }
1237 });
1238
85aaf69f 1239 for _ in 0..NTHREADS {
1a4d82fc 1240 let tx = tx.clone();
85aaf69f
SL
1241 thread::spawn(move|| {
1242 for _ in 0..AMT { tx.send(1).unwrap(); }
1a4d82fc
JJ
1243 });
1244 }
1245 drop(tx);
1246 t.join().ok().unwrap();
1247 }
1248
1249 #[test]
1250 fn send_from_outside_runtime() {
1251 let (tx1, rx1) = channel::<()>();
c34b1796 1252 let (tx2, rx2) = channel::<i32>();
85aaf69f 1253 let t1 = thread::spawn(move|| {
1a4d82fc 1254 tx1.send(()).unwrap();
85aaf69f 1255 for _ in 0..40 {
1a4d82fc
JJ
1256 assert_eq!(rx2.recv().unwrap(), 1);
1257 }
1258 });
1259 rx1.recv().unwrap();
85aaf69f
SL
1260 let t2 = thread::spawn(move|| {
1261 for _ in 0..40 {
1a4d82fc
JJ
1262 tx2.send(1).unwrap();
1263 }
1264 });
1265 t1.join().ok().unwrap();
1266 t2.join().ok().unwrap();
1267 }
1268
1269 #[test]
1270 fn recv_from_outside_runtime() {
c34b1796 1271 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1272 let t = thread::spawn(move|| {
1273 for _ in 0..40 {
1a4d82fc
JJ
1274 assert_eq!(rx.recv().unwrap(), 1);
1275 }
1276 });
85aaf69f 1277 for _ in 0..40 {
1a4d82fc
JJ
1278 tx.send(1).unwrap();
1279 }
1280 t.join().ok().unwrap();
1281 }
1282
1283 #[test]
1284 fn no_runtime() {
c34b1796
AL
1285 let (tx1, rx1) = channel::<i32>();
1286 let (tx2, rx2) = channel::<i32>();
85aaf69f 1287 let t1 = thread::spawn(move|| {
1a4d82fc
JJ
1288 assert_eq!(rx1.recv().unwrap(), 1);
1289 tx2.send(2).unwrap();
1290 });
85aaf69f 1291 let t2 = thread::spawn(move|| {
1a4d82fc
JJ
1292 tx1.send(1).unwrap();
1293 assert_eq!(rx2.recv().unwrap(), 2);
1294 });
1295 t1.join().ok().unwrap();
1296 t2.join().ok().unwrap();
1297 }
1298
1299 #[test]
1300 fn oneshot_single_thread_close_port_first() {
1301 // Simple test of closing without sending
c34b1796 1302 let (_tx, rx) = channel::<i32>();
1a4d82fc
JJ
1303 drop(rx);
1304 }
1305
1306 #[test]
1307 fn oneshot_single_thread_close_chan_first() {
1308 // Simple test of closing without sending
c34b1796 1309 let (tx, _rx) = channel::<i32>();
1a4d82fc
JJ
1310 drop(tx);
1311 }
1312
1313 #[test]
1314 fn oneshot_single_thread_send_port_close() {
1315 // Testing that the sender cleans up the payload if receiver is closed
c34b1796 1316 let (tx, rx) = channel::<Box<i32>>();
1a4d82fc
JJ
1317 drop(rx);
1318 assert!(tx.send(box 0).is_err());
1319 }
1320
1321 #[test]
1322 fn oneshot_single_thread_recv_chan_close() {
1323 // Receiving on a closed chan will panic
85aaf69f 1324 let res = thread::spawn(move|| {
c34b1796 1325 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1326 drop(tx);
1327 rx.recv().unwrap();
1328 }).join();
1329 // What is our res?
1330 assert!(res.is_err());
1331 }
1332
1333 #[test]
1334 fn oneshot_single_thread_send_then_recv() {
c34b1796 1335 let (tx, rx) = channel::<Box<i32>>();
1a4d82fc
JJ
1336 tx.send(box 10).unwrap();
1337 assert!(rx.recv().unwrap() == box 10);
1338 }
1339
1340 #[test]
1341 fn oneshot_single_thread_try_send_open() {
c34b1796 1342 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1343 assert!(tx.send(10).is_ok());
1344 assert!(rx.recv().unwrap() == 10);
1345 }
1346
1347 #[test]
1348 fn oneshot_single_thread_try_send_closed() {
c34b1796 1349 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1350 drop(rx);
1351 assert!(tx.send(10).is_err());
1352 }
1353
1354 #[test]
1355 fn oneshot_single_thread_try_recv_open() {
c34b1796 1356 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1357 tx.send(10).unwrap();
1358 assert!(rx.recv() == Ok(10));
1359 }
1360
1361 #[test]
1362 fn oneshot_single_thread_try_recv_closed() {
c34b1796 1363 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1364 drop(tx);
1365 assert!(rx.recv().is_err());
1366 }
1367
1368 #[test]
1369 fn oneshot_single_thread_peek_data() {
c34b1796 1370 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1371 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1372 tx.send(10).unwrap();
1373 assert_eq!(rx.try_recv(), Ok(10));
1374 }
1375
1376 #[test]
1377 fn oneshot_single_thread_peek_close() {
c34b1796 1378 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1379 drop(tx);
1380 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1381 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1382 }
1383
1384 #[test]
1385 fn oneshot_single_thread_peek_open() {
c34b1796 1386 let (_tx, rx) = channel::<i32>();
1a4d82fc
JJ
1387 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1388 }
1389
1390 #[test]
1391 fn oneshot_multi_task_recv_then_send() {
c34b1796 1392 let (tx, rx) = channel::<Box<i32>>();
85aaf69f 1393 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1394 assert!(rx.recv().unwrap() == box 10);
1395 });
1396
1397 tx.send(box 10).unwrap();
1398 }
1399
1400 #[test]
1401 fn oneshot_multi_task_recv_then_close() {
c34b1796 1402 let (tx, rx) = channel::<Box<i32>>();
85aaf69f 1403 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1404 drop(tx);
1405 });
85aaf69f 1406 let res = thread::spawn(move|| {
1a4d82fc
JJ
1407 assert!(rx.recv().unwrap() == box 10);
1408 }).join();
1409 assert!(res.is_err());
1410 }
1411
1412 #[test]
1413 fn oneshot_multi_thread_close_stress() {
85aaf69f 1414 for _ in 0..stress_factor() {
c34b1796 1415 let (tx, rx) = channel::<i32>();
85aaf69f 1416 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1417 drop(rx);
1418 });
1419 drop(tx);
1420 }
1421 }
1422
1423 #[test]
1424 fn oneshot_multi_thread_send_close_stress() {
85aaf69f 1425 for _ in 0..stress_factor() {
c34b1796 1426 let (tx, rx) = channel::<i32>();
85aaf69f 1427 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1428 drop(rx);
1429 });
85aaf69f 1430 let _ = thread::spawn(move|| {
1a4d82fc
JJ
1431 tx.send(1).unwrap();
1432 }).join();
1433 }
1434 }
1435
1436 #[test]
1437 fn oneshot_multi_thread_recv_close_stress() {
85aaf69f 1438 for _ in 0..stress_factor() {
c34b1796 1439 let (tx, rx) = channel::<i32>();
85aaf69f
SL
1440 thread::spawn(move|| {
1441 let res = thread::spawn(move|| {
1a4d82fc
JJ
1442 rx.recv().unwrap();
1443 }).join();
1444 assert!(res.is_err());
1445 });
85aaf69f
SL
1446 let _t = thread::spawn(move|| {
1447 thread::spawn(move|| {
1a4d82fc
JJ
1448 drop(tx);
1449 });
1450 });
1451 }
1452 }
1453
1454 #[test]
1455 fn oneshot_multi_thread_send_recv_stress() {
85aaf69f 1456 for _ in 0..stress_factor() {
c34b1796 1457 let (tx, rx) = channel::<Box<isize>>();
85aaf69f
SL
1458 let _t = thread::spawn(move|| {
1459 tx.send(box 10).unwrap();
1a4d82fc 1460 });
85aaf69f 1461 assert!(rx.recv().unwrap() == box 10);
1a4d82fc
JJ
1462 }
1463 }
1464
1465 #[test]
1466 fn stream_send_recv_stress() {
85aaf69f 1467 for _ in 0..stress_factor() {
1a4d82fc
JJ
1468 let (tx, rx) = channel();
1469
1470 send(tx, 0);
1471 recv(rx, 0);
1472
c34b1796 1473 fn send(tx: Sender<Box<i32>>, i: i32) {
1a4d82fc
JJ
1474 if i == 10 { return }
1475
85aaf69f 1476 thread::spawn(move|| {
1a4d82fc
JJ
1477 tx.send(box i).unwrap();
1478 send(tx, i + 1);
1479 });
1480 }
1481
c34b1796 1482 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1a4d82fc
JJ
1483 if i == 10 { return }
1484
85aaf69f 1485 thread::spawn(move|| {
1a4d82fc
JJ
1486 assert!(rx.recv().unwrap() == box i);
1487 recv(rx, i + 1);
1488 });
1489 }
1490 }
1491 }
1492
1493 #[test]
1494 fn recv_a_lot() {
1495 // Regression test that we don't run out of stack in scheduler context
1496 let (tx, rx) = channel();
85aaf69f
SL
1497 for _ in 0..10000 { tx.send(()).unwrap(); }
1498 for _ in 0..10000 { rx.recv().unwrap(); }
1a4d82fc
JJ
1499 }
1500
1501 #[test]
1502 fn shared_chan_stress() {
1503 let (tx, rx) = channel();
1504 let total = stress_factor() + 100;
85aaf69f 1505 for _ in 0..total {
1a4d82fc 1506 let tx = tx.clone();
85aaf69f 1507 thread::spawn(move|| {
1a4d82fc
JJ
1508 tx.send(()).unwrap();
1509 });
1510 }
1511
85aaf69f 1512 for _ in 0..total {
1a4d82fc
JJ
1513 rx.recv().unwrap();
1514 }
1515 }
1516
1517 #[test]
1518 fn test_nested_recv_iter() {
c34b1796
AL
1519 let (tx, rx) = channel::<i32>();
1520 let (total_tx, total_rx) = channel::<i32>();
1a4d82fc 1521
85aaf69f 1522 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1523 let mut acc = 0;
1524 for x in rx.iter() {
1525 acc += x;
1526 }
1527 total_tx.send(acc).unwrap();
1528 });
1529
1530 tx.send(3).unwrap();
1531 tx.send(1).unwrap();
1532 tx.send(2).unwrap();
1533 drop(tx);
1534 assert_eq!(total_rx.recv().unwrap(), 6);
1535 }
1536
1537 #[test]
1538 fn test_recv_iter_break() {
c34b1796 1539 let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
1540 let (count_tx, count_rx) = channel();
1541
85aaf69f 1542 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1543 let mut count = 0;
1544 for x in rx.iter() {
1545 if count >= 3 {
1546 break;
1547 } else {
1548 count += x;
1549 }
1550 }
1551 count_tx.send(count).unwrap();
1552 });
1553
1554 tx.send(2).unwrap();
1555 tx.send(2).unwrap();
1556 tx.send(2).unwrap();
1557 let _ = tx.send(2);
1558 drop(tx);
1559 assert_eq!(count_rx.recv().unwrap(), 4);
1560 }
1561
1562 #[test]
1563 fn try_recv_states() {
c34b1796 1564 let (tx1, rx1) = channel::<i32>();
1a4d82fc
JJ
1565 let (tx2, rx2) = channel::<()>();
1566 let (tx3, rx3) = channel::<()>();
85aaf69f 1567 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1568 rx2.recv().unwrap();
1569 tx1.send(1).unwrap();
1570 tx3.send(()).unwrap();
1571 rx2.recv().unwrap();
1572 drop(tx1);
1573 tx3.send(()).unwrap();
1574 });
1575
1576 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1577 tx2.send(()).unwrap();
1578 rx3.recv().unwrap();
1579 assert_eq!(rx1.try_recv(), Ok(1));
1580 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1581 tx2.send(()).unwrap();
1582 rx3.recv().unwrap();
1583 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1584 }
1585
1586 // This bug used to end up in a livelock inside of the Receiver destructor
1587 // because the internal state of the Shared packet was corrupted
1588 #[test]
1589 fn destroy_upgraded_shared_port_when_sender_still_active() {
1590 let (tx, rx) = channel();
1591 let (tx2, rx2) = channel();
85aaf69f 1592 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1593 rx.recv().unwrap(); // wait on a oneshot
1594 drop(rx); // destroy a shared
1595 tx2.send(()).unwrap();
1596 });
1597 // make sure the other task has gone to sleep
85aaf69f 1598 for _ in 0..5000 { thread::yield_now(); }
1a4d82fc
JJ
1599
1600 // upgrade to a shared chan and send a message
1601 let t = tx.clone();
1602 drop(tx);
1603 t.send(()).unwrap();
1604
1605 // wait for the child task to exit before we exit
1606 rx2.recv().unwrap();
1607 }
1608}
1609
1610#[cfg(test)]
1611mod sync_tests {
1612 use prelude::v1::*;
1613
85aaf69f
SL
1614 use std::env;
1615 use thread;
1a4d82fc
JJ
1616 use super::*;
1617
c34b1796 1618 pub fn stress_factor() -> usize {
85aaf69f
SL
1619 match env::var("RUST_TEST_STRESS") {
1620 Ok(val) => val.parse().unwrap(),
1621 Err(..) => 1,
1a4d82fc
JJ
1622 }
1623 }
1624
1625 #[test]
1626 fn smoke() {
c34b1796 1627 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1628 tx.send(1).unwrap();
1629 assert_eq!(rx.recv().unwrap(), 1);
1630 }
1631
1632 #[test]
1633 fn drop_full() {
c34b1796 1634 let (tx, _rx) = sync_channel::<Box<isize>>(1);
85aaf69f 1635 tx.send(box 1).unwrap();
1a4d82fc
JJ
1636 }
1637
1638 #[test]
1639 fn smoke_shared() {
c34b1796 1640 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1641 tx.send(1).unwrap();
1642 assert_eq!(rx.recv().unwrap(), 1);
1643 let tx = tx.clone();
1644 tx.send(1).unwrap();
1645 assert_eq!(rx.recv().unwrap(), 1);
1646 }
1647
1648 #[test]
1649 fn smoke_threads() {
c34b1796 1650 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1651 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1652 tx.send(1).unwrap();
1653 });
1654 assert_eq!(rx.recv().unwrap(), 1);
1655 }
1656
1657 #[test]
1658 fn smoke_port_gone() {
c34b1796 1659 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1660 drop(rx);
1661 assert!(tx.send(1).is_err());
1662 }
1663
1664 #[test]
1665 fn smoke_shared_port_gone2() {
c34b1796 1666 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1667 drop(rx);
1668 let tx2 = tx.clone();
1669 drop(tx);
1670 assert!(tx2.send(1).is_err());
1671 }
1672
1673 #[test]
1674 fn port_gone_concurrent() {
c34b1796 1675 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1676 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1677 rx.recv().unwrap();
1678 });
1679 while tx.send(1).is_ok() {}
1680 }
1681
1682 #[test]
1683 fn port_gone_concurrent_shared() {
c34b1796 1684 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc 1685 let tx2 = tx.clone();
85aaf69f 1686 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1687 rx.recv().unwrap();
1688 });
1689 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1690 }
1691
1692 #[test]
1693 fn smoke_chan_gone() {
c34b1796 1694 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1695 drop(tx);
1696 assert!(rx.recv().is_err());
1697 }
1698
1699 #[test]
1700 fn smoke_chan_gone_shared() {
1701 let (tx, rx) = sync_channel::<()>(0);
1702 let tx2 = tx.clone();
1703 drop(tx);
1704 drop(tx2);
1705 assert!(rx.recv().is_err());
1706 }
1707
1708 #[test]
1709 fn chan_gone_concurrent() {
c34b1796 1710 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1711 thread::spawn(move|| {
1a4d82fc
JJ
1712 tx.send(1).unwrap();
1713 tx.send(1).unwrap();
1714 });
1715 while rx.recv().is_ok() {}
1716 }
1717
1718 #[test]
1719 fn stress() {
c34b1796 1720 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f
SL
1721 thread::spawn(move|| {
1722 for _ in 0..10000 { tx.send(1).unwrap(); }
1a4d82fc 1723 });
85aaf69f 1724 for _ in 0..10000 {
1a4d82fc
JJ
1725 assert_eq!(rx.recv().unwrap(), 1);
1726 }
1727 }
1728
1729 #[test]
1730 fn stress_shared() {
c34b1796
AL
1731 const AMT: u32 = 1000;
1732 const NTHREADS: u32 = 8;
1733 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1734 let (dtx, drx) = sync_channel::<()>(0);
1735
85aaf69f
SL
1736 thread::spawn(move|| {
1737 for _ in 0..AMT * NTHREADS {
1a4d82fc
JJ
1738 assert_eq!(rx.recv().unwrap(), 1);
1739 }
1740 match rx.try_recv() {
1741 Ok(..) => panic!(),
1742 _ => {}
1743 }
1744 dtx.send(()).unwrap();
1745 });
1746
85aaf69f 1747 for _ in 0..NTHREADS {
1a4d82fc 1748 let tx = tx.clone();
85aaf69f
SL
1749 thread::spawn(move|| {
1750 for _ in 0..AMT { tx.send(1).unwrap(); }
1a4d82fc
JJ
1751 });
1752 }
1753 drop(tx);
1754 drx.recv().unwrap();
1755 }
1756
1757 #[test]
1758 fn oneshot_single_thread_close_port_first() {
1759 // Simple test of closing without sending
c34b1796 1760 let (_tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1761 drop(rx);
1762 }
1763
1764 #[test]
1765 fn oneshot_single_thread_close_chan_first() {
1766 // Simple test of closing without sending
c34b1796 1767 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1768 drop(tx);
1769 }
1770
1771 #[test]
1772 fn oneshot_single_thread_send_port_close() {
1773 // Testing that the sender cleans up the payload if receiver is closed
c34b1796 1774 let (tx, rx) = sync_channel::<Box<i32>>(0);
1a4d82fc
JJ
1775 drop(rx);
1776 assert!(tx.send(box 0).is_err());
1777 }
1778
1779 #[test]
1780 fn oneshot_single_thread_recv_chan_close() {
1781 // Receiving on a closed chan will panic
85aaf69f 1782 let res = thread::spawn(move|| {
c34b1796 1783 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1784 drop(tx);
1785 rx.recv().unwrap();
1786 }).join();
1787 // What is our res?
1788 assert!(res.is_err());
1789 }
1790
1791 #[test]
1792 fn oneshot_single_thread_send_then_recv() {
c34b1796 1793 let (tx, rx) = sync_channel::<Box<i32>>(1);
1a4d82fc
JJ
1794 tx.send(box 10).unwrap();
1795 assert!(rx.recv().unwrap() == box 10);
1796 }
1797
1798 #[test]
1799 fn oneshot_single_thread_try_send_open() {
c34b1796 1800 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1801 assert_eq!(tx.try_send(10), Ok(()));
1802 assert!(rx.recv().unwrap() == 10);
1803 }
1804
1805 #[test]
1806 fn oneshot_single_thread_try_send_closed() {
c34b1796 1807 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1808 drop(rx);
1809 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1810 }
1811
1812 #[test]
1813 fn oneshot_single_thread_try_send_closed2() {
c34b1796 1814 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1815 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1816 }
1817
1818 #[test]
1819 fn oneshot_single_thread_try_recv_open() {
c34b1796 1820 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1821 tx.send(10).unwrap();
1822 assert!(rx.recv() == Ok(10));
1823 }
1824
1825 #[test]
1826 fn oneshot_single_thread_try_recv_closed() {
c34b1796 1827 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1828 drop(tx);
1829 assert!(rx.recv().is_err());
1830 }
1831
1832 #[test]
1833 fn oneshot_single_thread_peek_data() {
c34b1796 1834 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
1835 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1836 tx.send(10).unwrap();
1837 assert_eq!(rx.try_recv(), Ok(10));
1838 }
1839
1840 #[test]
1841 fn oneshot_single_thread_peek_close() {
c34b1796 1842 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1843 drop(tx);
1844 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1845 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1846 }
1847
1848 #[test]
1849 fn oneshot_single_thread_peek_open() {
c34b1796 1850 let (_tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
1851 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1852 }
1853
1854 #[test]
1855 fn oneshot_multi_task_recv_then_send() {
c34b1796 1856 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f 1857 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1858 assert!(rx.recv().unwrap() == box 10);
1859 });
1860
1861 tx.send(box 10).unwrap();
1862 }
1863
1864 #[test]
1865 fn oneshot_multi_task_recv_then_close() {
c34b1796 1866 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f 1867 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1868 drop(tx);
1869 });
85aaf69f 1870 let res = thread::spawn(move|| {
1a4d82fc
JJ
1871 assert!(rx.recv().unwrap() == box 10);
1872 }).join();
1873 assert!(res.is_err());
1874 }
1875
1876 #[test]
1877 fn oneshot_multi_thread_close_stress() {
85aaf69f 1878 for _ in 0..stress_factor() {
c34b1796 1879 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1880 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1881 drop(rx);
1882 });
1883 drop(tx);
1884 }
1885 }
1886
1887 #[test]
1888 fn oneshot_multi_thread_send_close_stress() {
85aaf69f 1889 for _ in 0..stress_factor() {
c34b1796 1890 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 1891 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1892 drop(rx);
1893 });
85aaf69f 1894 let _ = thread::spawn(move || {
1a4d82fc
JJ
1895 tx.send(1).unwrap();
1896 }).join();
1897 }
1898 }
1899
1900 #[test]
1901 fn oneshot_multi_thread_recv_close_stress() {
85aaf69f 1902 for _ in 0..stress_factor() {
c34b1796 1903 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f
SL
1904 let _t = thread::spawn(move|| {
1905 let res = thread::spawn(move|| {
1a4d82fc
JJ
1906 rx.recv().unwrap();
1907 }).join();
1908 assert!(res.is_err());
1909 });
85aaf69f
SL
1910 let _t = thread::spawn(move|| {
1911 thread::spawn(move|| {
1a4d82fc
JJ
1912 drop(tx);
1913 });
1914 });
1915 }
1916 }
1917
1918 #[test]
1919 fn oneshot_multi_thread_send_recv_stress() {
85aaf69f 1920 for _ in 0..stress_factor() {
c34b1796 1921 let (tx, rx) = sync_channel::<Box<i32>>(0);
85aaf69f
SL
1922 let _t = thread::spawn(move|| {
1923 tx.send(box 10).unwrap();
1a4d82fc 1924 });
85aaf69f 1925 assert!(rx.recv().unwrap() == box 10);
1a4d82fc
JJ
1926 }
1927 }
1928
1929 #[test]
1930 fn stream_send_recv_stress() {
85aaf69f 1931 for _ in 0..stress_factor() {
c34b1796 1932 let (tx, rx) = sync_channel::<Box<i32>>(0);
1a4d82fc
JJ
1933
1934 send(tx, 0);
1935 recv(rx, 0);
1936
c34b1796 1937 fn send(tx: SyncSender<Box<i32>>, i: i32) {
1a4d82fc
JJ
1938 if i == 10 { return }
1939
85aaf69f 1940 thread::spawn(move|| {
1a4d82fc
JJ
1941 tx.send(box i).unwrap();
1942 send(tx, i + 1);
1943 });
1944 }
1945
c34b1796 1946 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1a4d82fc
JJ
1947 if i == 10 { return }
1948
85aaf69f 1949 thread::spawn(move|| {
1a4d82fc
JJ
1950 assert!(rx.recv().unwrap() == box i);
1951 recv(rx, i + 1);
1952 });
1953 }
1954 }
1955 }
1956
1957 #[test]
1958 fn recv_a_lot() {
1959 // Regression test that we don't run out of stack in scheduler context
1960 let (tx, rx) = sync_channel(10000);
85aaf69f
SL
1961 for _ in 0..10000 { tx.send(()).unwrap(); }
1962 for _ in 0..10000 { rx.recv().unwrap(); }
1a4d82fc
JJ
1963 }
1964
1965 #[test]
1966 fn shared_chan_stress() {
1967 let (tx, rx) = sync_channel(0);
1968 let total = stress_factor() + 100;
85aaf69f 1969 for _ in 0..total {
1a4d82fc 1970 let tx = tx.clone();
85aaf69f 1971 thread::spawn(move|| {
1a4d82fc
JJ
1972 tx.send(()).unwrap();
1973 });
1974 }
1975
85aaf69f 1976 for _ in 0..total {
1a4d82fc
JJ
1977 rx.recv().unwrap();
1978 }
1979 }
1980
1981 #[test]
1982 fn test_nested_recv_iter() {
c34b1796
AL
1983 let (tx, rx) = sync_channel::<i32>(0);
1984 let (total_tx, total_rx) = sync_channel::<i32>(0);
1a4d82fc 1985
85aaf69f 1986 let _t = thread::spawn(move|| {
1a4d82fc
JJ
1987 let mut acc = 0;
1988 for x in rx.iter() {
1989 acc += x;
1990 }
1991 total_tx.send(acc).unwrap();
1992 });
1993
1994 tx.send(3).unwrap();
1995 tx.send(1).unwrap();
1996 tx.send(2).unwrap();
1997 drop(tx);
1998 assert_eq!(total_rx.recv().unwrap(), 6);
1999 }
2000
2001 #[test]
2002 fn test_recv_iter_break() {
c34b1796 2003 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2004 let (count_tx, count_rx) = sync_channel(0);
2005
85aaf69f 2006 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2007 let mut count = 0;
2008 for x in rx.iter() {
2009 if count >= 3 {
2010 break;
2011 } else {
2012 count += x;
2013 }
2014 }
2015 count_tx.send(count).unwrap();
2016 });
2017
2018 tx.send(2).unwrap();
2019 tx.send(2).unwrap();
2020 tx.send(2).unwrap();
2021 let _ = tx.try_send(2);
2022 drop(tx);
2023 assert_eq!(count_rx.recv().unwrap(), 4);
2024 }
2025
2026 #[test]
2027 fn try_recv_states() {
c34b1796 2028 let (tx1, rx1) = sync_channel::<i32>(1);
1a4d82fc
JJ
2029 let (tx2, rx2) = sync_channel::<()>(1);
2030 let (tx3, rx3) = sync_channel::<()>(1);
85aaf69f 2031 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2032 rx2.recv().unwrap();
2033 tx1.send(1).unwrap();
2034 tx3.send(()).unwrap();
2035 rx2.recv().unwrap();
2036 drop(tx1);
2037 tx3.send(()).unwrap();
2038 });
2039
2040 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2041 tx2.send(()).unwrap();
2042 rx3.recv().unwrap();
2043 assert_eq!(rx1.try_recv(), Ok(1));
2044 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2045 tx2.send(()).unwrap();
2046 rx3.recv().unwrap();
2047 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2048 }
2049
2050 // This bug used to end up in a livelock inside of the Receiver destructor
2051 // because the internal state of the Shared packet was corrupted
2052 #[test]
2053 fn destroy_upgraded_shared_port_when_sender_still_active() {
2054 let (tx, rx) = sync_channel::<()>(0);
2055 let (tx2, rx2) = sync_channel::<()>(0);
85aaf69f 2056 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2057 rx.recv().unwrap(); // wait on a oneshot
2058 drop(rx); // destroy a shared
2059 tx2.send(()).unwrap();
2060 });
2061 // make sure the other task has gone to sleep
85aaf69f 2062 for _ in 0..5000 { thread::yield_now(); }
1a4d82fc
JJ
2063
2064 // upgrade to a shared chan and send a message
2065 let t = tx.clone();
2066 drop(tx);
2067 t.send(()).unwrap();
2068
2069 // wait for the child task to exit before we exit
2070 rx2.recv().unwrap();
2071 }
2072
2073 #[test]
2074 fn send1() {
c34b1796 2075 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2076 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
1a4d82fc
JJ
2077 assert_eq!(tx.send(1), Ok(()));
2078 }
2079
2080 #[test]
2081 fn send2() {
c34b1796 2082 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 2083 let _t = thread::spawn(move|| { drop(rx); });
1a4d82fc
JJ
2084 assert!(tx.send(1).is_err());
2085 }
2086
2087 #[test]
2088 fn send3() {
c34b1796 2089 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc 2090 assert_eq!(tx.send(1), Ok(()));
85aaf69f 2091 let _t =thread::spawn(move|| { drop(rx); });
1a4d82fc
JJ
2092 assert!(tx.send(1).is_err());
2093 }
2094
2095 #[test]
2096 fn send4() {
c34b1796 2097 let (tx, rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2098 let tx2 = tx.clone();
2099 let (done, donerx) = channel();
2100 let done2 = done.clone();
85aaf69f 2101 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2102 assert!(tx.send(1).is_err());
2103 done.send(()).unwrap();
2104 });
85aaf69f 2105 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2106 assert!(tx2.send(2).is_err());
2107 done2.send(()).unwrap();
2108 });
2109 drop(rx);
2110 donerx.recv().unwrap();
2111 donerx.recv().unwrap();
2112 }
2113
2114 #[test]
2115 fn try_send1() {
c34b1796 2116 let (tx, _rx) = sync_channel::<i32>(0);
1a4d82fc
JJ
2117 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2118 }
2119
2120 #[test]
2121 fn try_send2() {
c34b1796 2122 let (tx, _rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2123 assert_eq!(tx.try_send(1), Ok(()));
2124 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2125 }
2126
2127 #[test]
2128 fn try_send3() {
c34b1796 2129 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
2130 assert_eq!(tx.try_send(1), Ok(()));
2131 drop(rx);
2132 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2133 }
2134
2135 #[test]
2136 fn issue_15761() {
2137 fn repro() {
2138 let (tx1, rx1) = sync_channel::<()>(3);
2139 let (tx2, rx2) = sync_channel::<()>(3);
2140
85aaf69f 2141 let _t = thread::spawn(move|| {
1a4d82fc
JJ
2142 rx1.recv().unwrap();
2143 tx2.try_send(()).unwrap();
2144 });
2145
2146 tx1.try_send(()).unwrap();
2147 rx2.recv().unwrap();
2148 }
2149
85aaf69f 2150 for _ in 0..100 {
1a4d82fc
JJ
2151 repro()
2152 }
2153 }
2154}