]> git.proxmox.com Git - rustc.git/blob - src/libstd/sync/mpsc/mod.rs
Imported Upstream version 1.3.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
29 //! infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
37 //! a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //! tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //! let tx = tx.clone();
79 //! thread::spawn(move|| {
80 //! tx.send(i).unwrap();
81 //! });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //! // This will wait for the parent thread to start receiving
111 //! tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115
116 #![stable(feature = "rust1", since = "1.0.0")]
117
118 // A description of how Rust's channel implementation works
119 //
120 // Channels are supposed to be the basic building block for all other
121 // concurrent primitives that are used in Rust. As a result, the channel type
122 // needs to be highly optimized, flexible, and broad enough for use everywhere.
123 //
124 // The choice of implementation of all channels is to be built on lock-free data
125 // structures. The channels themselves are then consequently also lock-free data
126 // structures. As always with lock-free code, this is a very "here be dragons"
127 // territory, especially because I'm unaware of any academic papers that have
128 // gone into great length about channels of these flavors.
129 //
130 // ## Flavors of channels
131 //
132 // From the perspective of a consumer of this library, there is only one flavor
133 // of channel. This channel can be used as a stream and cloned to allow multiple
134 // senders. Under the hood, however, there are actually three flavors of
135 // channels in play.
136 //
137 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
138 // They contain as few atomics as possible and involve one and
139 // exactly one allocation.
140 // * Streams - these channels are optimized for the non-shared use case. They
141 // use a different concurrent queue that is more tailored for this
142 // use case. The initial allocation of this flavor of channel is not
143 // optimized.
144 // * Shared - this is the most general form of channel that this module offers,
145 // a channel with multiple senders. This type is as optimized as it
146 // can be, but the previous two types mentioned are much faster for
147 // their use-cases.
148 //
149 // ## Concurrent queues
150 //
151 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
152 // recv() obviously blocks. This means that under the hood there must be some
153 // shared and concurrent queue holding all of the actual data.
154 //
155 // With two flavors of channels, two flavors of queues are also used. We have
156 // chosen to use queues from a well-known author that are abbreviated as SPSC
157 // and MPSC (single producer, single consumer and multiple producer, single
158 // consumer). SPSC queues are used for streams while MPSC queues are used for
159 // shared channels.
160 //
161 // ### SPSC optimizations
162 //
163 // The SPSC queue found online is essentially a linked list of nodes where one
164 // half of the nodes are the "queue of data" and the other half of nodes are a
165 // cache of unused nodes. The unused nodes are used such that an allocation is
166 // not required on every push() and a free doesn't need to happen on every
167 // pop().
168 //
169 // As found online, however, the cache of nodes is of an infinite size. This
170 // means that if a channel at one point in its life had 50k items in the queue,
171 // then the queue will always have the capacity for 50k items. I believed that
172 // this was an unnecessary limitation of the implementation, so I have altered
173 // the queue to optionally have a bound on the cache size.
174 //
175 // By default, streams will have an unbounded SPSC queue with a small-ish cache
176 // size. The hope is that the cache is still large enough to have very fast
177 // send() operations while not too large such that millions of channels can
178 // coexist at once.
179 //
180 // ### MPSC optimizations
181 //
182 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183 // a linked list under the hood to earn its unboundedness, but I have not put
184 // forth much effort into having a cache of nodes similar to the SPSC queue.
185 //
186 // For now, I believe that this is "ok" because shared channels are not the most
187 // common type, but soon we may wish to revisit this queue choice and determine
188 // another candidate for backend storage of shared channels.
189 //
190 // ## Overview of the Implementation
191 //
192 // Now that there's a little background on the concurrent queues used, it's
193 // worth going into much more detail about the channels themselves. The basic
194 // pseudocode for a send/recv are:
195 //
196 //
197 // send(t) recv()
198 // queue.push(t) return if queue.pop()
199 // if increment() == -1 deschedule {
200 // wakeup() if decrement() > 0
201 // cancel_deschedule()
202 // }
203 // queue.pop()
204 //
205 // As mentioned before, there are no locks in this implementation, only atomic
206 // instructions are used.
207 //
208 // ### The internal atomic counter
209 //
210 // Every channel has a shared counter with each half to keep track of the size
211 // of the queue. This counter is used to abort descheduling by the receiver and
212 // to know when to wake up on the sending side.
213 //
214 // As seen in the pseudocode, senders will increment this count and receivers
215 // will decrement the count. The theory behind this is that if a sender sees a
216 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217 // then it doesn't need to block.
218 //
219 // The recv() method has a beginning call to pop(), and if successful, it needs
220 // to decrement the count. It is a crucial implementation detail that this
221 // decrement does *not* happen to the shared counter. If this were the case,
222 // then it would be possible for the counter to be very negative when there were
223 // no receivers waiting, in which case the senders would have to determine when
224 // it was actually appropriate to wake up a receiver.
225 //
226 // Instead, the "steal count" is kept track of separately (not atomically
227 // because it's only used by receivers), and then the decrement() call when
228 // descheduling will lump in all of the recent steals into one large decrement.
229 //
230 // The implication of this is that if a sender sees a -1 count, then there's
231 // guaranteed to be a waiter waiting!
232 //
233 // ## Native Implementation
234 //
235 // A major goal of these channels is to work seamlessly on and off the runtime.
236 // All of the previous race conditions have been worded in terms of
237 // scheduler-isms (which is obviously not available without the runtime).
238 //
239 // For now, native usage of channels (off the runtime) will fall back onto
240 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
242 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243 // condition variable.
244 //
245 // ## Select
246 //
247 // Being able to support selection over channels has greatly influenced this
248 // design, and not only does selection need to work inside the runtime, but also
249 // outside the runtime.
250 //
251 // The implementation is fairly straightforward. The goal of select() is not to
252 // return some data, but only to return which channel can receive data without
253 // blocking. The implementation is essentially the entire blocking procedure
254 // followed by an increment as soon as its woken up. The cancellation procedure
255 // involves an increment and swapping out of to_wake to acquire ownership of the
256 // thread to unblock.
257 //
258 // Sadly this current implementation requires multiple allocations, so I have
259 // seen the throughput of select() be much worse than it should be. I do not
260 // believe that there is anything fundamental that needs to change about these
261 // channels, however, in order to support a more efficient select().
262 //
263 // # Conclusion
264 //
265 // And now that you've seen all the races that I found and attempted to fix,
266 // here's the code for you to find some more!
267
268 use prelude::v1::*;
269
270 use sync::Arc;
271 use error;
272 use fmt;
273 use mem;
274 use cell::UnsafeCell;
275 use marker::Reflect;
276
277 pub use self::select::{Select, Handle};
278 use self::select::StartResult;
279 use self::select::StartResult::*;
280 use self::blocking::SignalToken;
281
282 mod blocking;
283 mod oneshot;
284 mod select;
285 mod shared;
286 mod stream;
287 mod sync;
288 mod mpsc_queue;
289 mod spsc_queue;
290
291 /// The receiving-half of Rust's channel type. This half can only be owned by
292 /// one thread
293 #[stable(feature = "rust1", since = "1.0.0")]
294 pub struct Receiver<T> {
295 inner: UnsafeCell<Flavor<T>>,
296 }
297
298 // The receiver port can be sent from place to place, so long as it
299 // is not used to receive non-sendable things.
300 unsafe impl<T: Send> Send for Receiver<T> { }
301
302 /// An iterator over messages on a receiver, this iterator will block
303 /// whenever `next` is called, waiting for a new message, and `None` will be
304 /// returned when the corresponding channel has hung up.
305 #[stable(feature = "rust1", since = "1.0.0")]
306 pub struct Iter<'a, T: 'a> {
307 rx: &'a Receiver<T>
308 }
309
310 /// An owning iterator over messages on a receiver, this iterator will block
311 /// whenever `next` is called, waiting for a new message, and `None` will be
312 /// returned when the corresponding channel has hung up.
313 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
314 pub struct IntoIter<T> {
315 rx: Receiver<T>
316 }
317
318 /// The sending-half of Rust's asynchronous channel type. This half can only be
319 /// owned by one thread, but it can be cloned to send to other threads.
320 #[stable(feature = "rust1", since = "1.0.0")]
321 pub struct Sender<T> {
322 inner: UnsafeCell<Flavor<T>>,
323 }
324
325 // The send port can be sent from place to place, so long as it
326 // is not used to send non-sendable things.
327 unsafe impl<T: Send> Send for Sender<T> { }
328
329 /// The sending-half of Rust's synchronous channel type. This half can only be
330 /// owned by one thread, but it can be cloned to send to other threads.
331 #[stable(feature = "rust1", since = "1.0.0")]
332 pub struct SyncSender<T> {
333 inner: Arc<UnsafeCell<sync::Packet<T>>>,
334 }
335
336 unsafe impl<T: Send> Send for SyncSender<T> {}
337
338 impl<T> !Sync for SyncSender<T> {}
339
340 /// An error returned from the `send` function on channels.
341 ///
342 /// A `send` operation can only fail if the receiving end of a channel is
343 /// disconnected, implying that the data could never be received. The error
344 /// contains the data being sent as a payload so it can be recovered.
345 #[stable(feature = "rust1", since = "1.0.0")]
346 #[derive(PartialEq, Eq, Clone, Copy)]
347 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
348
349 /// An error returned from the `recv` function on a `Receiver`.
350 ///
351 /// The `recv` operation can only fail if the sending half of a channel is
352 /// disconnected, implying that no further messages will ever be received.
353 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
354 #[stable(feature = "rust1", since = "1.0.0")]
355 pub struct RecvError;
356
357 /// This enumeration is the list of the possible reasons that `try_recv` could
358 /// not return data when called.
359 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
360 #[stable(feature = "rust1", since = "1.0.0")]
361 pub enum TryRecvError {
362 /// This channel is currently empty, but the sender(s) have not yet
363 /// disconnected, so data may yet become available.
364 #[stable(feature = "rust1", since = "1.0.0")]
365 Empty,
366
367 /// This channel's sending half has become disconnected, and there will
368 /// never be any more data received on this channel
369 #[stable(feature = "rust1", since = "1.0.0")]
370 Disconnected,
371 }
372
373 /// This enumeration is the list of the possible error outcomes for the
374 /// `SyncSender::try_send` method.
375 #[stable(feature = "rust1", since = "1.0.0")]
376 #[derive(PartialEq, Eq, Clone, Copy)]
377 pub enum TrySendError<T> {
378 /// The data could not be sent on the channel because it would require that
379 /// the callee block to send the data.
380 ///
381 /// If this is a buffered channel, then the buffer is full at this time. If
382 /// this is not a buffered channel, then there is no receiver available to
383 /// acquire the data.
384 #[stable(feature = "rust1", since = "1.0.0")]
385 Full(T),
386
387 /// This channel's receiving half has disconnected, so the data could not be
388 /// sent. The data is returned back to the callee in this case.
389 #[stable(feature = "rust1", since = "1.0.0")]
390 Disconnected(T),
391 }
392
393 enum Flavor<T> {
394 Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
395 Stream(Arc<UnsafeCell<stream::Packet<T>>>),
396 Shared(Arc<UnsafeCell<shared::Packet<T>>>),
397 Sync(Arc<UnsafeCell<sync::Packet<T>>>),
398 }
399
400 #[doc(hidden)]
401 trait UnsafeFlavor<T> {
402 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
403 unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
404 &mut *self.inner_unsafe().get()
405 }
406 unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
407 &*self.inner_unsafe().get()
408 }
409 }
410 impl<T> UnsafeFlavor<T> for Sender<T> {
411 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
412 &self.inner
413 }
414 }
415 impl<T> UnsafeFlavor<T> for Receiver<T> {
416 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
417 &self.inner
418 }
419 }
420
421 /// Creates a new asynchronous channel, returning the sender/receiver halves.
422 ///
423 /// All data sent on the sender will become available on the receiver, and no
424 /// send will block the calling thread (this channel has an "infinite buffer").
425 ///
426 /// # Examples
427 ///
428 /// ```
429 /// use std::sync::mpsc::channel;
430 /// use std::thread;
431 ///
432 /// // tx is is the sending half (tx for transmission), and rx is the receiving
433 /// // half (rx for receiving).
434 /// let (tx, rx) = channel();
435 ///
436 /// // Spawn off an expensive computation
437 /// thread::spawn(move|| {
438 /// # fn expensive_computation() {}
439 /// tx.send(expensive_computation()).unwrap();
440 /// });
441 ///
442 /// // Do some useful work for awhile
443 ///
444 /// // Let's see what that answer was
445 /// println!("{:?}", rx.recv().unwrap());
446 /// ```
447 #[stable(feature = "rust1", since = "1.0.0")]
448 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
449 let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
450 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
451 }
452
453 /// Creates a new synchronous, bounded channel.
454 ///
455 /// Like asynchronous channels, the `Receiver` will block until a message
456 /// becomes available. These channels differ greatly in the semantics of the
457 /// sender from asynchronous channels, however.
458 ///
459 /// This channel has an internal buffer on which messages will be queued. When
460 /// the internal buffer becomes full, future sends will *block* waiting for the
461 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
462 /// becomes "rendezvous channel" where each send will not return until a recv
463 /// is paired with it.
464 ///
465 /// As with asynchronous channels, all senders will panic in `send` if the
466 /// `Receiver` has been destroyed.
467 ///
468 /// # Examples
469 ///
470 /// ```
471 /// use std::sync::mpsc::sync_channel;
472 /// use std::thread;
473 ///
474 /// let (tx, rx) = sync_channel(1);
475 ///
476 /// // this returns immediately
477 /// tx.send(1).unwrap();
478 ///
479 /// thread::spawn(move|| {
480 /// // this will block until the previous message has been received
481 /// tx.send(2).unwrap();
482 /// });
483 ///
484 /// assert_eq!(rx.recv().unwrap(), 1);
485 /// assert_eq!(rx.recv().unwrap(), 2);
486 /// ```
487 #[stable(feature = "rust1", since = "1.0.0")]
488 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
489 let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
490 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
491 }
492
493 ////////////////////////////////////////////////////////////////////////////////
494 // Sender
495 ////////////////////////////////////////////////////////////////////////////////
496
497 impl<T> Sender<T> {
498 fn new(inner: Flavor<T>) -> Sender<T> {
499 Sender {
500 inner: UnsafeCell::new(inner),
501 }
502 }
503
504 /// Attempts to send a value on this channel, returning it back if it could
505 /// not be sent.
506 ///
507 /// A successful send occurs when it is determined that the other end of
508 /// the channel has not hung up already. An unsuccessful send would be one
509 /// where the corresponding receiver has already been deallocated. Note
510 /// that a return value of `Err` means that the data will never be
511 /// received, but a return value of `Ok` does *not* mean that the data
512 /// will be received. It is possible for the corresponding receiver to
513 /// hang up immediately after this function returns `Ok`.
514 ///
515 /// This method will never block the current thread.
516 ///
517 /// # Examples
518 ///
519 /// ```
520 /// use std::sync::mpsc::channel;
521 ///
522 /// let (tx, rx) = channel();
523 ///
524 /// // This send is always successful
525 /// tx.send(1).unwrap();
526 ///
527 /// // This send will fail because the receiver is gone
528 /// drop(rx);
529 /// assert_eq!(tx.send(1).err().unwrap().0, 1);
530 /// ```
531 #[stable(feature = "rust1", since = "1.0.0")]
532 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
533 let (new_inner, ret) = match *unsafe { self.inner() } {
534 Flavor::Oneshot(ref p) => {
535 unsafe {
536 let p = p.get();
537 if !(*p).sent() {
538 return (*p).send(t).map_err(SendError);
539 } else {
540 let a =
541 Arc::new(UnsafeCell::new(stream::Packet::new()));
542 let rx = Receiver::new(Flavor::Stream(a.clone()));
543 match (*p).upgrade(rx) {
544 oneshot::UpSuccess => {
545 let ret = (*a.get()).send(t);
546 (a, ret)
547 }
548 oneshot::UpDisconnected => (a, Err(t)),
549 oneshot::UpWoke(token) => {
550 // This send cannot panic because the thread is
551 // asleep (we're looking at it), so the receiver
552 // can't go away.
553 (*a.get()).send(t).ok().unwrap();
554 token.signal();
555 (a, Ok(()))
556 }
557 }
558 }
559 }
560 }
561 Flavor::Stream(ref p) => return unsafe {
562 (*p.get()).send(t).map_err(SendError)
563 },
564 Flavor::Shared(ref p) => return unsafe {
565 (*p.get()).send(t).map_err(SendError)
566 },
567 Flavor::Sync(..) => unreachable!(),
568 };
569
570 unsafe {
571 let tmp = Sender::new(Flavor::Stream(new_inner));
572 mem::swap(self.inner_mut(), tmp.inner_mut());
573 }
574 ret.map_err(SendError)
575 }
576 }
577
578 #[stable(feature = "rust1", since = "1.0.0")]
579 impl<T> Clone for Sender<T> {
580 fn clone(&self) -> Sender<T> {
581 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
582 Flavor::Oneshot(ref p) => {
583 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
584 unsafe {
585 let guard = (*a.get()).postinit_lock();
586 let rx = Receiver::new(Flavor::Shared(a.clone()));
587 match (*p.get()).upgrade(rx) {
588 oneshot::UpSuccess |
589 oneshot::UpDisconnected => (a, None, guard),
590 oneshot::UpWoke(task) => (a, Some(task), guard)
591 }
592 }
593 }
594 Flavor::Stream(ref p) => {
595 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
596 unsafe {
597 let guard = (*a.get()).postinit_lock();
598 let rx = Receiver::new(Flavor::Shared(a.clone()));
599 match (*p.get()).upgrade(rx) {
600 stream::UpSuccess |
601 stream::UpDisconnected => (a, None, guard),
602 stream::UpWoke(task) => (a, Some(task), guard),
603 }
604 }
605 }
606 Flavor::Shared(ref p) => {
607 unsafe { (*p.get()).clone_chan(); }
608 return Sender::new(Flavor::Shared(p.clone()));
609 }
610 Flavor::Sync(..) => unreachable!(),
611 };
612
613 unsafe {
614 (*packet.get()).inherit_blocker(sleeper, guard);
615
616 let tmp = Sender::new(Flavor::Shared(packet.clone()));
617 mem::swap(self.inner_mut(), tmp.inner_mut());
618 }
619 Sender::new(Flavor::Shared(packet))
620 }
621 }
622
623 #[stable(feature = "rust1", since = "1.0.0")]
624 impl<T> Drop for Sender<T> {
625 fn drop(&mut self) {
626 match *unsafe { self.inner_mut() } {
627 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
628 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
629 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
630 Flavor::Sync(..) => unreachable!(),
631 }
632 }
633 }
634
635 ////////////////////////////////////////////////////////////////////////////////
636 // SyncSender
637 ////////////////////////////////////////////////////////////////////////////////
638
639 impl<T> SyncSender<T> {
640 fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
641 SyncSender { inner: inner }
642 }
643
644 /// Sends a value on this synchronous channel.
645 ///
646 /// This function will *block* until space in the internal buffer becomes
647 /// available or a receiver is available to hand off the message to.
648 ///
649 /// Note that a successful send does *not* guarantee that the receiver will
650 /// ever see the data if there is a buffer on this channel. Items may be
651 /// enqueued in the internal buffer for the receiver to receive at a later
652 /// time. If the buffer size is 0, however, it can be guaranteed that the
653 /// receiver has indeed received the data if this function returns success.
654 ///
655 /// This function will never panic, but it may return `Err` if the
656 /// `Receiver` has disconnected and is no longer able to receive
657 /// information.
658 #[stable(feature = "rust1", since = "1.0.0")]
659 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
660 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
661 }
662
663 /// Attempts to send a value on this channel without blocking.
664 ///
665 /// This method differs from `send` by returning immediately if the
666 /// channel's buffer is full or no receiver is waiting to acquire some
667 /// data. Compared with `send`, this function has two failure cases
668 /// instead of one (one for disconnection, one for a full buffer).
669 ///
670 /// See `SyncSender::send` for notes about guarantees of whether the
671 /// receiver has received the data or not if this function is successful.
672 #[stable(feature = "rust1", since = "1.0.0")]
673 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
674 unsafe { (*self.inner.get()).try_send(t) }
675 }
676 }
677
678 #[stable(feature = "rust1", since = "1.0.0")]
679 impl<T> Clone for SyncSender<T> {
680 fn clone(&self) -> SyncSender<T> {
681 unsafe { (*self.inner.get()).clone_chan(); }
682 return SyncSender::new(self.inner.clone());
683 }
684 }
685
686 #[stable(feature = "rust1", since = "1.0.0")]
687 impl<T> Drop for SyncSender<T> {
688 fn drop(&mut self) {
689 unsafe { (*self.inner.get()).drop_chan(); }
690 }
691 }
692
693 ////////////////////////////////////////////////////////////////////////////////
694 // Receiver
695 ////////////////////////////////////////////////////////////////////////////////
696
697 impl<T> Receiver<T> {
698 fn new(inner: Flavor<T>) -> Receiver<T> {
699 Receiver { inner: UnsafeCell::new(inner) }
700 }
701
702 /// Attempts to return a pending value on this receiver without blocking
703 ///
704 /// This method will never block the caller in order to wait for data to
705 /// become available. Instead, this will always return immediately with a
706 /// possible option of pending data on the channel.
707 ///
708 /// This is useful for a flavor of "optimistic check" before deciding to
709 /// block on a receiver.
710 #[stable(feature = "rust1", since = "1.0.0")]
711 pub fn try_recv(&self) -> Result<T, TryRecvError> {
712 loop {
713 let new_port = match *unsafe { self.inner() } {
714 Flavor::Oneshot(ref p) => {
715 match unsafe { (*p.get()).try_recv() } {
716 Ok(t) => return Ok(t),
717 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
718 Err(oneshot::Disconnected) => {
719 return Err(TryRecvError::Disconnected)
720 }
721 Err(oneshot::Upgraded(rx)) => rx,
722 }
723 }
724 Flavor::Stream(ref p) => {
725 match unsafe { (*p.get()).try_recv() } {
726 Ok(t) => return Ok(t),
727 Err(stream::Empty) => return Err(TryRecvError::Empty),
728 Err(stream::Disconnected) => {
729 return Err(TryRecvError::Disconnected)
730 }
731 Err(stream::Upgraded(rx)) => rx,
732 }
733 }
734 Flavor::Shared(ref p) => {
735 match unsafe { (*p.get()).try_recv() } {
736 Ok(t) => return Ok(t),
737 Err(shared::Empty) => return Err(TryRecvError::Empty),
738 Err(shared::Disconnected) => {
739 return Err(TryRecvError::Disconnected)
740 }
741 }
742 }
743 Flavor::Sync(ref p) => {
744 match unsafe { (*p.get()).try_recv() } {
745 Ok(t) => return Ok(t),
746 Err(sync::Empty) => return Err(TryRecvError::Empty),
747 Err(sync::Disconnected) => {
748 return Err(TryRecvError::Disconnected)
749 }
750 }
751 }
752 };
753 unsafe {
754 mem::swap(self.inner_mut(),
755 new_port.inner_mut());
756 }
757 }
758 }
759
760 /// Attempts to wait for a value on this receiver, returning an error if the
761 /// corresponding channel has hung up.
762 ///
763 /// This function will always block the current thread if there is no data
764 /// available and it's possible for more data to be sent. Once a message is
765 /// sent to the corresponding `Sender`, then this receiver will wake up and
766 /// return that message.
767 ///
768 /// If the corresponding `Sender` has disconnected, or it disconnects while
769 /// this call is blocking, this call will wake up and return `Err` to
770 /// indicate that no more messages can ever be received on this channel.
771 /// However, since channels are buffered, messages sent before the disconnect
772 /// will still be properly received.
773 ///
774 /// # Examples
775 ///
776 /// ```
777 /// use std::sync::mpsc;
778 /// use std::thread;
779 ///
780 /// let (send, recv) = mpsc::channel();
781 /// let handle = thread::spawn(move || {
782 /// send.send(1u8).unwrap();
783 /// });
784 ///
785 /// handle.join().unwrap();
786 ///
787 /// assert_eq!(Ok(1), recv.recv());
788 /// ```
789 ///
790 /// Buffering behavior:
791 ///
792 /// ```
793 /// use std::sync::mpsc;
794 /// use std::thread;
795 /// use std::sync::mpsc::RecvError;
796 ///
797 /// let (send, recv) = mpsc::channel();
798 /// let handle = thread::spawn(move || {
799 /// send.send(1u8).unwrap();
800 /// send.send(2).unwrap();
801 /// send.send(3).unwrap();
802 /// drop(send);
803 /// });
804 ///
805 /// // wait for the thread to join so we ensure the sender is dropped
806 /// handle.join().unwrap();
807 ///
808 /// assert_eq!(Ok(1), recv.recv());
809 /// assert_eq!(Ok(2), recv.recv());
810 /// assert_eq!(Ok(3), recv.recv());
811 /// assert_eq!(Err(RecvError), recv.recv());
812 /// ```
813 #[stable(feature = "rust1", since = "1.0.0")]
814 pub fn recv(&self) -> Result<T, RecvError> {
815 loop {
816 let new_port = match *unsafe { self.inner() } {
817 Flavor::Oneshot(ref p) => {
818 match unsafe { (*p.get()).recv() } {
819 Ok(t) => return Ok(t),
820 Err(oneshot::Empty) => return unreachable!(),
821 Err(oneshot::Disconnected) => return Err(RecvError),
822 Err(oneshot::Upgraded(rx)) => rx,
823 }
824 }
825 Flavor::Stream(ref p) => {
826 match unsafe { (*p.get()).recv() } {
827 Ok(t) => return Ok(t),
828 Err(stream::Empty) => return unreachable!(),
829 Err(stream::Disconnected) => return Err(RecvError),
830 Err(stream::Upgraded(rx)) => rx,
831 }
832 }
833 Flavor::Shared(ref p) => {
834 match unsafe { (*p.get()).recv() } {
835 Ok(t) => return Ok(t),
836 Err(shared::Empty) => return unreachable!(),
837 Err(shared::Disconnected) => return Err(RecvError),
838 }
839 }
840 Flavor::Sync(ref p) => return unsafe {
841 (*p.get()).recv().map_err(|()| RecvError)
842 }
843 };
844 unsafe {
845 mem::swap(self.inner_mut(), new_port.inner_mut());
846 }
847 }
848 }
849
850 /// Returns an iterator that will block waiting for messages, but never
851 /// `panic!`. It will return `None` when the channel has hung up.
852 #[stable(feature = "rust1", since = "1.0.0")]
853 pub fn iter(&self) -> Iter<T> {
854 Iter { rx: self }
855 }
856 }
857
858 impl<T> select::Packet for Receiver<T> {
859 fn can_recv(&self) -> bool {
860 loop {
861 let new_port = match *unsafe { self.inner() } {
862 Flavor::Oneshot(ref p) => {
863 match unsafe { (*p.get()).can_recv() } {
864 Ok(ret) => return ret,
865 Err(upgrade) => upgrade,
866 }
867 }
868 Flavor::Stream(ref p) => {
869 match unsafe { (*p.get()).can_recv() } {
870 Ok(ret) => return ret,
871 Err(upgrade) => upgrade,
872 }
873 }
874 Flavor::Shared(ref p) => {
875 return unsafe { (*p.get()).can_recv() };
876 }
877 Flavor::Sync(ref p) => {
878 return unsafe { (*p.get()).can_recv() };
879 }
880 };
881 unsafe {
882 mem::swap(self.inner_mut(),
883 new_port.inner_mut());
884 }
885 }
886 }
887
888 fn start_selection(&self, mut token: SignalToken) -> StartResult {
889 loop {
890 let (t, new_port) = match *unsafe { self.inner() } {
891 Flavor::Oneshot(ref p) => {
892 match unsafe { (*p.get()).start_selection(token) } {
893 oneshot::SelSuccess => return Installed,
894 oneshot::SelCanceled => return Abort,
895 oneshot::SelUpgraded(t, rx) => (t, rx),
896 }
897 }
898 Flavor::Stream(ref p) => {
899 match unsafe { (*p.get()).start_selection(token) } {
900 stream::SelSuccess => return Installed,
901 stream::SelCanceled => return Abort,
902 stream::SelUpgraded(t, rx) => (t, rx),
903 }
904 }
905 Flavor::Shared(ref p) => {
906 return unsafe { (*p.get()).start_selection(token) };
907 }
908 Flavor::Sync(ref p) => {
909 return unsafe { (*p.get()).start_selection(token) };
910 }
911 };
912 token = t;
913 unsafe {
914 mem::swap(self.inner_mut(), new_port.inner_mut());
915 }
916 }
917 }
918
919 fn abort_selection(&self) -> bool {
920 let mut was_upgrade = false;
921 loop {
922 let result = match *unsafe { self.inner() } {
923 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
924 Flavor::Stream(ref p) => unsafe {
925 (*p.get()).abort_selection(was_upgrade)
926 },
927 Flavor::Shared(ref p) => return unsafe {
928 (*p.get()).abort_selection(was_upgrade)
929 },
930 Flavor::Sync(ref p) => return unsafe {
931 (*p.get()).abort_selection()
932 },
933 };
934 let new_port = match result { Ok(b) => return b, Err(p) => p };
935 was_upgrade = true;
936 unsafe {
937 mem::swap(self.inner_mut(),
938 new_port.inner_mut());
939 }
940 }
941 }
942 }
943
944 #[stable(feature = "rust1", since = "1.0.0")]
945 impl<'a, T> Iterator for Iter<'a, T> {
946 type Item = T;
947
948 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
949 }
950
951 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
952 impl<'a, T> IntoIterator for &'a Receiver<T> {
953 type Item = T;
954 type IntoIter = Iter<'a, T>;
955
956 fn into_iter(self) -> Iter<'a, T> { self.iter() }
957 }
958
959 impl<T> Iterator for IntoIter<T> {
960 type Item = T;
961 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
962 }
963
964 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
965 impl <T> IntoIterator for Receiver<T> {
966 type Item = T;
967 type IntoIter = IntoIter<T>;
968
969 fn into_iter(self) -> IntoIter<T> {
970 IntoIter { rx: self }
971 }
972 }
973
974 #[stable(feature = "rust1", since = "1.0.0")]
975 impl<T> Drop for Receiver<T> {
976 fn drop(&mut self) {
977 match *unsafe { self.inner_mut() } {
978 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
979 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
980 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
981 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
982 }
983 }
984 }
985
986 #[stable(feature = "rust1", since = "1.0.0")]
987 impl<T> fmt::Debug for SendError<T> {
988 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
989 "SendError(..)".fmt(f)
990 }
991 }
992
993 #[stable(feature = "rust1", since = "1.0.0")]
994 impl<T> fmt::Display for SendError<T> {
995 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
996 "sending on a closed channel".fmt(f)
997 }
998 }
999
1000 #[stable(feature = "rust1", since = "1.0.0")]
1001 impl<T: Send + Reflect> error::Error for SendError<T> {
1002 fn description(&self) -> &str {
1003 "sending on a closed channel"
1004 }
1005
1006 fn cause(&self) -> Option<&error::Error> {
1007 None
1008 }
1009 }
1010
1011 #[stable(feature = "rust1", since = "1.0.0")]
1012 impl<T> fmt::Debug for TrySendError<T> {
1013 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1014 match *self {
1015 TrySendError::Full(..) => "Full(..)".fmt(f),
1016 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1017 }
1018 }
1019 }
1020
1021 #[stable(feature = "rust1", since = "1.0.0")]
1022 impl<T> fmt::Display for TrySendError<T> {
1023 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1024 match *self {
1025 TrySendError::Full(..) => {
1026 "sending on a full channel".fmt(f)
1027 }
1028 TrySendError::Disconnected(..) => {
1029 "sending on a closed channel".fmt(f)
1030 }
1031 }
1032 }
1033 }
1034
1035 #[stable(feature = "rust1", since = "1.0.0")]
1036 impl<T: Send + Reflect> error::Error for TrySendError<T> {
1037
1038 fn description(&self) -> &str {
1039 match *self {
1040 TrySendError::Full(..) => {
1041 "sending on a full channel"
1042 }
1043 TrySendError::Disconnected(..) => {
1044 "sending on a closed channel"
1045 }
1046 }
1047 }
1048
1049 fn cause(&self) -> Option<&error::Error> {
1050 None
1051 }
1052 }
1053
1054 #[stable(feature = "rust1", since = "1.0.0")]
1055 impl fmt::Display for RecvError {
1056 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1057 "receiving on a closed channel".fmt(f)
1058 }
1059 }
1060
1061 #[stable(feature = "rust1", since = "1.0.0")]
1062 impl error::Error for RecvError {
1063
1064 fn description(&self) -> &str {
1065 "receiving on a closed channel"
1066 }
1067
1068 fn cause(&self) -> Option<&error::Error> {
1069 None
1070 }
1071 }
1072
1073 #[stable(feature = "rust1", since = "1.0.0")]
1074 impl fmt::Display for TryRecvError {
1075 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1076 match *self {
1077 TryRecvError::Empty => {
1078 "receiving on an empty channel".fmt(f)
1079 }
1080 TryRecvError::Disconnected => {
1081 "receiving on a closed channel".fmt(f)
1082 }
1083 }
1084 }
1085 }
1086
1087 #[stable(feature = "rust1", since = "1.0.0")]
1088 impl error::Error for TryRecvError {
1089
1090 fn description(&self) -> &str {
1091 match *self {
1092 TryRecvError::Empty => {
1093 "receiving on an empty channel"
1094 }
1095 TryRecvError::Disconnected => {
1096 "receiving on a closed channel"
1097 }
1098 }
1099 }
1100
1101 fn cause(&self) -> Option<&error::Error> {
1102 None
1103 }
1104 }
1105
1106 #[cfg(test)]
1107 mod tests {
1108 use prelude::v1::*;
1109
1110 use env;
1111 use super::*;
1112 use thread;
1113
1114 pub fn stress_factor() -> usize {
1115 match env::var("RUST_TEST_STRESS") {
1116 Ok(val) => val.parse().unwrap(),
1117 Err(..) => 1,
1118 }
1119 }
1120
1121 #[test]
1122 fn smoke() {
1123 let (tx, rx) = channel::<i32>();
1124 tx.send(1).unwrap();
1125 assert_eq!(rx.recv().unwrap(), 1);
1126 }
1127
1128 #[test]
1129 fn drop_full() {
1130 let (tx, _rx) = channel::<Box<isize>>();
1131 tx.send(box 1).unwrap();
1132 }
1133
1134 #[test]
1135 fn drop_full_shared() {
1136 let (tx, _rx) = channel::<Box<isize>>();
1137 drop(tx.clone());
1138 drop(tx.clone());
1139 tx.send(box 1).unwrap();
1140 }
1141
1142 #[test]
1143 fn smoke_shared() {
1144 let (tx, rx) = channel::<i32>();
1145 tx.send(1).unwrap();
1146 assert_eq!(rx.recv().unwrap(), 1);
1147 let tx = tx.clone();
1148 tx.send(1).unwrap();
1149 assert_eq!(rx.recv().unwrap(), 1);
1150 }
1151
1152 #[test]
1153 fn smoke_threads() {
1154 let (tx, rx) = channel::<i32>();
1155 let _t = thread::spawn(move|| {
1156 tx.send(1).unwrap();
1157 });
1158 assert_eq!(rx.recv().unwrap(), 1);
1159 }
1160
1161 #[test]
1162 fn smoke_port_gone() {
1163 let (tx, rx) = channel::<i32>();
1164 drop(rx);
1165 assert!(tx.send(1).is_err());
1166 }
1167
1168 #[test]
1169 fn smoke_shared_port_gone() {
1170 let (tx, rx) = channel::<i32>();
1171 drop(rx);
1172 assert!(tx.send(1).is_err())
1173 }
1174
1175 #[test]
1176 fn smoke_shared_port_gone2() {
1177 let (tx, rx) = channel::<i32>();
1178 drop(rx);
1179 let tx2 = tx.clone();
1180 drop(tx);
1181 assert!(tx2.send(1).is_err());
1182 }
1183
1184 #[test]
1185 fn port_gone_concurrent() {
1186 let (tx, rx) = channel::<i32>();
1187 let _t = thread::spawn(move|| {
1188 rx.recv().unwrap();
1189 });
1190 while tx.send(1).is_ok() {}
1191 }
1192
1193 #[test]
1194 fn port_gone_concurrent_shared() {
1195 let (tx, rx) = channel::<i32>();
1196 let tx2 = tx.clone();
1197 let _t = thread::spawn(move|| {
1198 rx.recv().unwrap();
1199 });
1200 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1201 }
1202
1203 #[test]
1204 fn smoke_chan_gone() {
1205 let (tx, rx) = channel::<i32>();
1206 drop(tx);
1207 assert!(rx.recv().is_err());
1208 }
1209
1210 #[test]
1211 fn smoke_chan_gone_shared() {
1212 let (tx, rx) = channel::<()>();
1213 let tx2 = tx.clone();
1214 drop(tx);
1215 drop(tx2);
1216 assert!(rx.recv().is_err());
1217 }
1218
1219 #[test]
1220 fn chan_gone_concurrent() {
1221 let (tx, rx) = channel::<i32>();
1222 let _t = thread::spawn(move|| {
1223 tx.send(1).unwrap();
1224 tx.send(1).unwrap();
1225 });
1226 while rx.recv().is_ok() {}
1227 }
1228
1229 #[test]
1230 fn stress() {
1231 let (tx, rx) = channel::<i32>();
1232 let t = thread::spawn(move|| {
1233 for _ in 0..10000 { tx.send(1).unwrap(); }
1234 });
1235 for _ in 0..10000 {
1236 assert_eq!(rx.recv().unwrap(), 1);
1237 }
1238 t.join().ok().unwrap();
1239 }
1240
1241 #[test]
1242 fn stress_shared() {
1243 const AMT: u32 = 10000;
1244 const NTHREADS: u32 = 8;
1245 let (tx, rx) = channel::<i32>();
1246
1247 let t = thread::spawn(move|| {
1248 for _ in 0..AMT * NTHREADS {
1249 assert_eq!(rx.recv().unwrap(), 1);
1250 }
1251 match rx.try_recv() {
1252 Ok(..) => panic!(),
1253 _ => {}
1254 }
1255 });
1256
1257 for _ in 0..NTHREADS {
1258 let tx = tx.clone();
1259 thread::spawn(move|| {
1260 for _ in 0..AMT { tx.send(1).unwrap(); }
1261 });
1262 }
1263 drop(tx);
1264 t.join().ok().unwrap();
1265 }
1266
1267 #[test]
1268 fn send_from_outside_runtime() {
1269 let (tx1, rx1) = channel::<()>();
1270 let (tx2, rx2) = channel::<i32>();
1271 let t1 = thread::spawn(move|| {
1272 tx1.send(()).unwrap();
1273 for _ in 0..40 {
1274 assert_eq!(rx2.recv().unwrap(), 1);
1275 }
1276 });
1277 rx1.recv().unwrap();
1278 let t2 = thread::spawn(move|| {
1279 for _ in 0..40 {
1280 tx2.send(1).unwrap();
1281 }
1282 });
1283 t1.join().ok().unwrap();
1284 t2.join().ok().unwrap();
1285 }
1286
1287 #[test]
1288 fn recv_from_outside_runtime() {
1289 let (tx, rx) = channel::<i32>();
1290 let t = thread::spawn(move|| {
1291 for _ in 0..40 {
1292 assert_eq!(rx.recv().unwrap(), 1);
1293 }
1294 });
1295 for _ in 0..40 {
1296 tx.send(1).unwrap();
1297 }
1298 t.join().ok().unwrap();
1299 }
1300
1301 #[test]
1302 fn no_runtime() {
1303 let (tx1, rx1) = channel::<i32>();
1304 let (tx2, rx2) = channel::<i32>();
1305 let t1 = thread::spawn(move|| {
1306 assert_eq!(rx1.recv().unwrap(), 1);
1307 tx2.send(2).unwrap();
1308 });
1309 let t2 = thread::spawn(move|| {
1310 tx1.send(1).unwrap();
1311 assert_eq!(rx2.recv().unwrap(), 2);
1312 });
1313 t1.join().ok().unwrap();
1314 t2.join().ok().unwrap();
1315 }
1316
1317 #[test]
1318 fn oneshot_single_thread_close_port_first() {
1319 // Simple test of closing without sending
1320 let (_tx, rx) = channel::<i32>();
1321 drop(rx);
1322 }
1323
1324 #[test]
1325 fn oneshot_single_thread_close_chan_first() {
1326 // Simple test of closing without sending
1327 let (tx, _rx) = channel::<i32>();
1328 drop(tx);
1329 }
1330
1331 #[test]
1332 fn oneshot_single_thread_send_port_close() {
1333 // Testing that the sender cleans up the payload if receiver is closed
1334 let (tx, rx) = channel::<Box<i32>>();
1335 drop(rx);
1336 assert!(tx.send(box 0).is_err());
1337 }
1338
1339 #[test]
1340 fn oneshot_single_thread_recv_chan_close() {
1341 // Receiving on a closed chan will panic
1342 let res = thread::spawn(move|| {
1343 let (tx, rx) = channel::<i32>();
1344 drop(tx);
1345 rx.recv().unwrap();
1346 }).join();
1347 // What is our res?
1348 assert!(res.is_err());
1349 }
1350
1351 #[test]
1352 fn oneshot_single_thread_send_then_recv() {
1353 let (tx, rx) = channel::<Box<i32>>();
1354 tx.send(box 10).unwrap();
1355 assert!(rx.recv().unwrap() == box 10);
1356 }
1357
1358 #[test]
1359 fn oneshot_single_thread_try_send_open() {
1360 let (tx, rx) = channel::<i32>();
1361 assert!(tx.send(10).is_ok());
1362 assert!(rx.recv().unwrap() == 10);
1363 }
1364
1365 #[test]
1366 fn oneshot_single_thread_try_send_closed() {
1367 let (tx, rx) = channel::<i32>();
1368 drop(rx);
1369 assert!(tx.send(10).is_err());
1370 }
1371
1372 #[test]
1373 fn oneshot_single_thread_try_recv_open() {
1374 let (tx, rx) = channel::<i32>();
1375 tx.send(10).unwrap();
1376 assert!(rx.recv() == Ok(10));
1377 }
1378
1379 #[test]
1380 fn oneshot_single_thread_try_recv_closed() {
1381 let (tx, rx) = channel::<i32>();
1382 drop(tx);
1383 assert!(rx.recv().is_err());
1384 }
1385
1386 #[test]
1387 fn oneshot_single_thread_peek_data() {
1388 let (tx, rx) = channel::<i32>();
1389 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1390 tx.send(10).unwrap();
1391 assert_eq!(rx.try_recv(), Ok(10));
1392 }
1393
1394 #[test]
1395 fn oneshot_single_thread_peek_close() {
1396 let (tx, rx) = channel::<i32>();
1397 drop(tx);
1398 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1399 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1400 }
1401
1402 #[test]
1403 fn oneshot_single_thread_peek_open() {
1404 let (_tx, rx) = channel::<i32>();
1405 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1406 }
1407
1408 #[test]
1409 fn oneshot_multi_task_recv_then_send() {
1410 let (tx, rx) = channel::<Box<i32>>();
1411 let _t = thread::spawn(move|| {
1412 assert!(rx.recv().unwrap() == box 10);
1413 });
1414
1415 tx.send(box 10).unwrap();
1416 }
1417
1418 #[test]
1419 fn oneshot_multi_task_recv_then_close() {
1420 let (tx, rx) = channel::<Box<i32>>();
1421 let _t = thread::spawn(move|| {
1422 drop(tx);
1423 });
1424 let res = thread::spawn(move|| {
1425 assert!(rx.recv().unwrap() == box 10);
1426 }).join();
1427 assert!(res.is_err());
1428 }
1429
1430 #[test]
1431 fn oneshot_multi_thread_close_stress() {
1432 for _ in 0..stress_factor() {
1433 let (tx, rx) = channel::<i32>();
1434 let _t = thread::spawn(move|| {
1435 drop(rx);
1436 });
1437 drop(tx);
1438 }
1439 }
1440
1441 #[test]
1442 fn oneshot_multi_thread_send_close_stress() {
1443 for _ in 0..stress_factor() {
1444 let (tx, rx) = channel::<i32>();
1445 let _t = thread::spawn(move|| {
1446 drop(rx);
1447 });
1448 let _ = thread::spawn(move|| {
1449 tx.send(1).unwrap();
1450 }).join();
1451 }
1452 }
1453
1454 #[test]
1455 fn oneshot_multi_thread_recv_close_stress() {
1456 for _ in 0..stress_factor() {
1457 let (tx, rx) = channel::<i32>();
1458 thread::spawn(move|| {
1459 let res = thread::spawn(move|| {
1460 rx.recv().unwrap();
1461 }).join();
1462 assert!(res.is_err());
1463 });
1464 let _t = thread::spawn(move|| {
1465 thread::spawn(move|| {
1466 drop(tx);
1467 });
1468 });
1469 }
1470 }
1471
1472 #[test]
1473 fn oneshot_multi_thread_send_recv_stress() {
1474 for _ in 0..stress_factor() {
1475 let (tx, rx) = channel::<Box<isize>>();
1476 let _t = thread::spawn(move|| {
1477 tx.send(box 10).unwrap();
1478 });
1479 assert!(rx.recv().unwrap() == box 10);
1480 }
1481 }
1482
1483 #[test]
1484 fn stream_send_recv_stress() {
1485 for _ in 0..stress_factor() {
1486 let (tx, rx) = channel();
1487
1488 send(tx, 0);
1489 recv(rx, 0);
1490
1491 fn send(tx: Sender<Box<i32>>, i: i32) {
1492 if i == 10 { return }
1493
1494 thread::spawn(move|| {
1495 tx.send(box i).unwrap();
1496 send(tx, i + 1);
1497 });
1498 }
1499
1500 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1501 if i == 10 { return }
1502
1503 thread::spawn(move|| {
1504 assert!(rx.recv().unwrap() == box i);
1505 recv(rx, i + 1);
1506 });
1507 }
1508 }
1509 }
1510
1511 #[test]
1512 fn recv_a_lot() {
1513 // Regression test that we don't run out of stack in scheduler context
1514 let (tx, rx) = channel();
1515 for _ in 0..10000 { tx.send(()).unwrap(); }
1516 for _ in 0..10000 { rx.recv().unwrap(); }
1517 }
1518
1519 #[test]
1520 fn shared_chan_stress() {
1521 let (tx, rx) = channel();
1522 let total = stress_factor() + 100;
1523 for _ in 0..total {
1524 let tx = tx.clone();
1525 thread::spawn(move|| {
1526 tx.send(()).unwrap();
1527 });
1528 }
1529
1530 for _ in 0..total {
1531 rx.recv().unwrap();
1532 }
1533 }
1534
1535 #[test]
1536 fn test_nested_recv_iter() {
1537 let (tx, rx) = channel::<i32>();
1538 let (total_tx, total_rx) = channel::<i32>();
1539
1540 let _t = thread::spawn(move|| {
1541 let mut acc = 0;
1542 for x in rx.iter() {
1543 acc += x;
1544 }
1545 total_tx.send(acc).unwrap();
1546 });
1547
1548 tx.send(3).unwrap();
1549 tx.send(1).unwrap();
1550 tx.send(2).unwrap();
1551 drop(tx);
1552 assert_eq!(total_rx.recv().unwrap(), 6);
1553 }
1554
1555 #[test]
1556 fn test_recv_iter_break() {
1557 let (tx, rx) = channel::<i32>();
1558 let (count_tx, count_rx) = channel();
1559
1560 let _t = thread::spawn(move|| {
1561 let mut count = 0;
1562 for x in rx.iter() {
1563 if count >= 3 {
1564 break;
1565 } else {
1566 count += x;
1567 }
1568 }
1569 count_tx.send(count).unwrap();
1570 });
1571
1572 tx.send(2).unwrap();
1573 tx.send(2).unwrap();
1574 tx.send(2).unwrap();
1575 let _ = tx.send(2);
1576 drop(tx);
1577 assert_eq!(count_rx.recv().unwrap(), 4);
1578 }
1579
1580 #[test]
1581 fn test_recv_into_iter_owned() {
1582 let mut iter = {
1583 let (tx, rx) = channel::<i32>();
1584 tx.send(1).unwrap();
1585 tx.send(2).unwrap();
1586
1587 rx.into_iter()
1588 };
1589 assert_eq!(iter.next().unwrap(), 1);
1590 assert_eq!(iter.next().unwrap(), 2);
1591 assert_eq!(iter.next().is_none(), true);
1592 }
1593
1594 #[test]
1595 fn test_recv_into_iter_borrowed() {
1596 let (tx, rx) = channel::<i32>();
1597 tx.send(1).unwrap();
1598 tx.send(2).unwrap();
1599 drop(tx);
1600 let mut iter = (&rx).into_iter();
1601 assert_eq!(iter.next().unwrap(), 1);
1602 assert_eq!(iter.next().unwrap(), 2);
1603 assert_eq!(iter.next().is_none(), true);
1604 }
1605
1606 #[test]
1607 fn try_recv_states() {
1608 let (tx1, rx1) = channel::<i32>();
1609 let (tx2, rx2) = channel::<()>();
1610 let (tx3, rx3) = channel::<()>();
1611 let _t = thread::spawn(move|| {
1612 rx2.recv().unwrap();
1613 tx1.send(1).unwrap();
1614 tx3.send(()).unwrap();
1615 rx2.recv().unwrap();
1616 drop(tx1);
1617 tx3.send(()).unwrap();
1618 });
1619
1620 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1621 tx2.send(()).unwrap();
1622 rx3.recv().unwrap();
1623 assert_eq!(rx1.try_recv(), Ok(1));
1624 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1625 tx2.send(()).unwrap();
1626 rx3.recv().unwrap();
1627 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1628 }
1629
1630 // This bug used to end up in a livelock inside of the Receiver destructor
1631 // because the internal state of the Shared packet was corrupted
1632 #[test]
1633 fn destroy_upgraded_shared_port_when_sender_still_active() {
1634 let (tx, rx) = channel();
1635 let (tx2, rx2) = channel();
1636 let _t = thread::spawn(move|| {
1637 rx.recv().unwrap(); // wait on a oneshot
1638 drop(rx); // destroy a shared
1639 tx2.send(()).unwrap();
1640 });
1641 // make sure the other thread has gone to sleep
1642 for _ in 0..5000 { thread::yield_now(); }
1643
1644 // upgrade to a shared chan and send a message
1645 let t = tx.clone();
1646 drop(tx);
1647 t.send(()).unwrap();
1648
1649 // wait for the child thread to exit before we exit
1650 rx2.recv().unwrap();
1651 }
1652 }
1653
1654 #[cfg(test)]
1655 mod sync_tests {
1656 use prelude::v1::*;
1657
1658 use env;
1659 use thread;
1660 use super::*;
1661
1662 pub fn stress_factor() -> usize {
1663 match env::var("RUST_TEST_STRESS") {
1664 Ok(val) => val.parse().unwrap(),
1665 Err(..) => 1,
1666 }
1667 }
1668
1669 #[test]
1670 fn smoke() {
1671 let (tx, rx) = sync_channel::<i32>(1);
1672 tx.send(1).unwrap();
1673 assert_eq!(rx.recv().unwrap(), 1);
1674 }
1675
1676 #[test]
1677 fn drop_full() {
1678 let (tx, _rx) = sync_channel::<Box<isize>>(1);
1679 tx.send(box 1).unwrap();
1680 }
1681
1682 #[test]
1683 fn smoke_shared() {
1684 let (tx, rx) = sync_channel::<i32>(1);
1685 tx.send(1).unwrap();
1686 assert_eq!(rx.recv().unwrap(), 1);
1687 let tx = tx.clone();
1688 tx.send(1).unwrap();
1689 assert_eq!(rx.recv().unwrap(), 1);
1690 }
1691
1692 #[test]
1693 fn smoke_threads() {
1694 let (tx, rx) = sync_channel::<i32>(0);
1695 let _t = thread::spawn(move|| {
1696 tx.send(1).unwrap();
1697 });
1698 assert_eq!(rx.recv().unwrap(), 1);
1699 }
1700
1701 #[test]
1702 fn smoke_port_gone() {
1703 let (tx, rx) = sync_channel::<i32>(0);
1704 drop(rx);
1705 assert!(tx.send(1).is_err());
1706 }
1707
1708 #[test]
1709 fn smoke_shared_port_gone2() {
1710 let (tx, rx) = sync_channel::<i32>(0);
1711 drop(rx);
1712 let tx2 = tx.clone();
1713 drop(tx);
1714 assert!(tx2.send(1).is_err());
1715 }
1716
1717 #[test]
1718 fn port_gone_concurrent() {
1719 let (tx, rx) = sync_channel::<i32>(0);
1720 let _t = thread::spawn(move|| {
1721 rx.recv().unwrap();
1722 });
1723 while tx.send(1).is_ok() {}
1724 }
1725
1726 #[test]
1727 fn port_gone_concurrent_shared() {
1728 let (tx, rx) = sync_channel::<i32>(0);
1729 let tx2 = tx.clone();
1730 let _t = thread::spawn(move|| {
1731 rx.recv().unwrap();
1732 });
1733 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1734 }
1735
1736 #[test]
1737 fn smoke_chan_gone() {
1738 let (tx, rx) = sync_channel::<i32>(0);
1739 drop(tx);
1740 assert!(rx.recv().is_err());
1741 }
1742
1743 #[test]
1744 fn smoke_chan_gone_shared() {
1745 let (tx, rx) = sync_channel::<()>(0);
1746 let tx2 = tx.clone();
1747 drop(tx);
1748 drop(tx2);
1749 assert!(rx.recv().is_err());
1750 }
1751
1752 #[test]
1753 fn chan_gone_concurrent() {
1754 let (tx, rx) = sync_channel::<i32>(0);
1755 thread::spawn(move|| {
1756 tx.send(1).unwrap();
1757 tx.send(1).unwrap();
1758 });
1759 while rx.recv().is_ok() {}
1760 }
1761
1762 #[test]
1763 fn stress() {
1764 let (tx, rx) = sync_channel::<i32>(0);
1765 thread::spawn(move|| {
1766 for _ in 0..10000 { tx.send(1).unwrap(); }
1767 });
1768 for _ in 0..10000 {
1769 assert_eq!(rx.recv().unwrap(), 1);
1770 }
1771 }
1772
1773 #[test]
1774 fn stress_shared() {
1775 const AMT: u32 = 1000;
1776 const NTHREADS: u32 = 8;
1777 let (tx, rx) = sync_channel::<i32>(0);
1778 let (dtx, drx) = sync_channel::<()>(0);
1779
1780 thread::spawn(move|| {
1781 for _ in 0..AMT * NTHREADS {
1782 assert_eq!(rx.recv().unwrap(), 1);
1783 }
1784 match rx.try_recv() {
1785 Ok(..) => panic!(),
1786 _ => {}
1787 }
1788 dtx.send(()).unwrap();
1789 });
1790
1791 for _ in 0..NTHREADS {
1792 let tx = tx.clone();
1793 thread::spawn(move|| {
1794 for _ in 0..AMT { tx.send(1).unwrap(); }
1795 });
1796 }
1797 drop(tx);
1798 drx.recv().unwrap();
1799 }
1800
1801 #[test]
1802 fn oneshot_single_thread_close_port_first() {
1803 // Simple test of closing without sending
1804 let (_tx, rx) = sync_channel::<i32>(0);
1805 drop(rx);
1806 }
1807
1808 #[test]
1809 fn oneshot_single_thread_close_chan_first() {
1810 // Simple test of closing without sending
1811 let (tx, _rx) = sync_channel::<i32>(0);
1812 drop(tx);
1813 }
1814
1815 #[test]
1816 fn oneshot_single_thread_send_port_close() {
1817 // Testing that the sender cleans up the payload if receiver is closed
1818 let (tx, rx) = sync_channel::<Box<i32>>(0);
1819 drop(rx);
1820 assert!(tx.send(box 0).is_err());
1821 }
1822
1823 #[test]
1824 fn oneshot_single_thread_recv_chan_close() {
1825 // Receiving on a closed chan will panic
1826 let res = thread::spawn(move|| {
1827 let (tx, rx) = sync_channel::<i32>(0);
1828 drop(tx);
1829 rx.recv().unwrap();
1830 }).join();
1831 // What is our res?
1832 assert!(res.is_err());
1833 }
1834
1835 #[test]
1836 fn oneshot_single_thread_send_then_recv() {
1837 let (tx, rx) = sync_channel::<Box<i32>>(1);
1838 tx.send(box 10).unwrap();
1839 assert!(rx.recv().unwrap() == box 10);
1840 }
1841
1842 #[test]
1843 fn oneshot_single_thread_try_send_open() {
1844 let (tx, rx) = sync_channel::<i32>(1);
1845 assert_eq!(tx.try_send(10), Ok(()));
1846 assert!(rx.recv().unwrap() == 10);
1847 }
1848
1849 #[test]
1850 fn oneshot_single_thread_try_send_closed() {
1851 let (tx, rx) = sync_channel::<i32>(0);
1852 drop(rx);
1853 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1854 }
1855
1856 #[test]
1857 fn oneshot_single_thread_try_send_closed2() {
1858 let (tx, _rx) = sync_channel::<i32>(0);
1859 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1860 }
1861
1862 #[test]
1863 fn oneshot_single_thread_try_recv_open() {
1864 let (tx, rx) = sync_channel::<i32>(1);
1865 tx.send(10).unwrap();
1866 assert!(rx.recv() == Ok(10));
1867 }
1868
1869 #[test]
1870 fn oneshot_single_thread_try_recv_closed() {
1871 let (tx, rx) = sync_channel::<i32>(0);
1872 drop(tx);
1873 assert!(rx.recv().is_err());
1874 }
1875
1876 #[test]
1877 fn oneshot_single_thread_peek_data() {
1878 let (tx, rx) = sync_channel::<i32>(1);
1879 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1880 tx.send(10).unwrap();
1881 assert_eq!(rx.try_recv(), Ok(10));
1882 }
1883
1884 #[test]
1885 fn oneshot_single_thread_peek_close() {
1886 let (tx, rx) = sync_channel::<i32>(0);
1887 drop(tx);
1888 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1889 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1890 }
1891
1892 #[test]
1893 fn oneshot_single_thread_peek_open() {
1894 let (_tx, rx) = sync_channel::<i32>(0);
1895 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1896 }
1897
1898 #[test]
1899 fn oneshot_multi_task_recv_then_send() {
1900 let (tx, rx) = sync_channel::<Box<i32>>(0);
1901 let _t = thread::spawn(move|| {
1902 assert!(rx.recv().unwrap() == box 10);
1903 });
1904
1905 tx.send(box 10).unwrap();
1906 }
1907
1908 #[test]
1909 fn oneshot_multi_task_recv_then_close() {
1910 let (tx, rx) = sync_channel::<Box<i32>>(0);
1911 let _t = thread::spawn(move|| {
1912 drop(tx);
1913 });
1914 let res = thread::spawn(move|| {
1915 assert!(rx.recv().unwrap() == box 10);
1916 }).join();
1917 assert!(res.is_err());
1918 }
1919
1920 #[test]
1921 fn oneshot_multi_thread_close_stress() {
1922 for _ in 0..stress_factor() {
1923 let (tx, rx) = sync_channel::<i32>(0);
1924 let _t = thread::spawn(move|| {
1925 drop(rx);
1926 });
1927 drop(tx);
1928 }
1929 }
1930
1931 #[test]
1932 fn oneshot_multi_thread_send_close_stress() {
1933 for _ in 0..stress_factor() {
1934 let (tx, rx) = sync_channel::<i32>(0);
1935 let _t = thread::spawn(move|| {
1936 drop(rx);
1937 });
1938 let _ = thread::spawn(move || {
1939 tx.send(1).unwrap();
1940 }).join();
1941 }
1942 }
1943
1944 #[test]
1945 fn oneshot_multi_thread_recv_close_stress() {
1946 for _ in 0..stress_factor() {
1947 let (tx, rx) = sync_channel::<i32>(0);
1948 let _t = thread::spawn(move|| {
1949 let res = thread::spawn(move|| {
1950 rx.recv().unwrap();
1951 }).join();
1952 assert!(res.is_err());
1953 });
1954 let _t = thread::spawn(move|| {
1955 thread::spawn(move|| {
1956 drop(tx);
1957 });
1958 });
1959 }
1960 }
1961
1962 #[test]
1963 fn oneshot_multi_thread_send_recv_stress() {
1964 for _ in 0..stress_factor() {
1965 let (tx, rx) = sync_channel::<Box<i32>>(0);
1966 let _t = thread::spawn(move|| {
1967 tx.send(box 10).unwrap();
1968 });
1969 assert!(rx.recv().unwrap() == box 10);
1970 }
1971 }
1972
1973 #[test]
1974 fn stream_send_recv_stress() {
1975 for _ in 0..stress_factor() {
1976 let (tx, rx) = sync_channel::<Box<i32>>(0);
1977
1978 send(tx, 0);
1979 recv(rx, 0);
1980
1981 fn send(tx: SyncSender<Box<i32>>, i: i32) {
1982 if i == 10 { return }
1983
1984 thread::spawn(move|| {
1985 tx.send(box i).unwrap();
1986 send(tx, i + 1);
1987 });
1988 }
1989
1990 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1991 if i == 10 { return }
1992
1993 thread::spawn(move|| {
1994 assert!(rx.recv().unwrap() == box i);
1995 recv(rx, i + 1);
1996 });
1997 }
1998 }
1999 }
2000
2001 #[test]
2002 fn recv_a_lot() {
2003 // Regression test that we don't run out of stack in scheduler context
2004 let (tx, rx) = sync_channel(10000);
2005 for _ in 0..10000 { tx.send(()).unwrap(); }
2006 for _ in 0..10000 { rx.recv().unwrap(); }
2007 }
2008
2009 #[test]
2010 fn shared_chan_stress() {
2011 let (tx, rx) = sync_channel(0);
2012 let total = stress_factor() + 100;
2013 for _ in 0..total {
2014 let tx = tx.clone();
2015 thread::spawn(move|| {
2016 tx.send(()).unwrap();
2017 });
2018 }
2019
2020 for _ in 0..total {
2021 rx.recv().unwrap();
2022 }
2023 }
2024
2025 #[test]
2026 fn test_nested_recv_iter() {
2027 let (tx, rx) = sync_channel::<i32>(0);
2028 let (total_tx, total_rx) = sync_channel::<i32>(0);
2029
2030 let _t = thread::spawn(move|| {
2031 let mut acc = 0;
2032 for x in rx.iter() {
2033 acc += x;
2034 }
2035 total_tx.send(acc).unwrap();
2036 });
2037
2038 tx.send(3).unwrap();
2039 tx.send(1).unwrap();
2040 tx.send(2).unwrap();
2041 drop(tx);
2042 assert_eq!(total_rx.recv().unwrap(), 6);
2043 }
2044
2045 #[test]
2046 fn test_recv_iter_break() {
2047 let (tx, rx) = sync_channel::<i32>(0);
2048 let (count_tx, count_rx) = sync_channel(0);
2049
2050 let _t = thread::spawn(move|| {
2051 let mut count = 0;
2052 for x in rx.iter() {
2053 if count >= 3 {
2054 break;
2055 } else {
2056 count += x;
2057 }
2058 }
2059 count_tx.send(count).unwrap();
2060 });
2061
2062 tx.send(2).unwrap();
2063 tx.send(2).unwrap();
2064 tx.send(2).unwrap();
2065 let _ = tx.try_send(2);
2066 drop(tx);
2067 assert_eq!(count_rx.recv().unwrap(), 4);
2068 }
2069
2070 #[test]
2071 fn try_recv_states() {
2072 let (tx1, rx1) = sync_channel::<i32>(1);
2073 let (tx2, rx2) = sync_channel::<()>(1);
2074 let (tx3, rx3) = sync_channel::<()>(1);
2075 let _t = thread::spawn(move|| {
2076 rx2.recv().unwrap();
2077 tx1.send(1).unwrap();
2078 tx3.send(()).unwrap();
2079 rx2.recv().unwrap();
2080 drop(tx1);
2081 tx3.send(()).unwrap();
2082 });
2083
2084 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2085 tx2.send(()).unwrap();
2086 rx3.recv().unwrap();
2087 assert_eq!(rx1.try_recv(), Ok(1));
2088 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2089 tx2.send(()).unwrap();
2090 rx3.recv().unwrap();
2091 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2092 }
2093
2094 // This bug used to end up in a livelock inside of the Receiver destructor
2095 // because the internal state of the Shared packet was corrupted
2096 #[test]
2097 fn destroy_upgraded_shared_port_when_sender_still_active() {
2098 let (tx, rx) = sync_channel::<()>(0);
2099 let (tx2, rx2) = sync_channel::<()>(0);
2100 let _t = thread::spawn(move|| {
2101 rx.recv().unwrap(); // wait on a oneshot
2102 drop(rx); // destroy a shared
2103 tx2.send(()).unwrap();
2104 });
2105 // make sure the other thread has gone to sleep
2106 for _ in 0..5000 { thread::yield_now(); }
2107
2108 // upgrade to a shared chan and send a message
2109 let t = tx.clone();
2110 drop(tx);
2111 t.send(()).unwrap();
2112
2113 // wait for the child thread to exit before we exit
2114 rx2.recv().unwrap();
2115 }
2116
2117 #[test]
2118 fn send1() {
2119 let (tx, rx) = sync_channel::<i32>(0);
2120 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2121 assert_eq!(tx.send(1), Ok(()));
2122 }
2123
2124 #[test]
2125 fn send2() {
2126 let (tx, rx) = sync_channel::<i32>(0);
2127 let _t = thread::spawn(move|| { drop(rx); });
2128 assert!(tx.send(1).is_err());
2129 }
2130
2131 #[test]
2132 fn send3() {
2133 let (tx, rx) = sync_channel::<i32>(1);
2134 assert_eq!(tx.send(1), Ok(()));
2135 let _t =thread::spawn(move|| { drop(rx); });
2136 assert!(tx.send(1).is_err());
2137 }
2138
2139 #[test]
2140 fn send4() {
2141 let (tx, rx) = sync_channel::<i32>(0);
2142 let tx2 = tx.clone();
2143 let (done, donerx) = channel();
2144 let done2 = done.clone();
2145 let _t = thread::spawn(move|| {
2146 assert!(tx.send(1).is_err());
2147 done.send(()).unwrap();
2148 });
2149 let _t = thread::spawn(move|| {
2150 assert!(tx2.send(2).is_err());
2151 done2.send(()).unwrap();
2152 });
2153 drop(rx);
2154 donerx.recv().unwrap();
2155 donerx.recv().unwrap();
2156 }
2157
2158 #[test]
2159 fn try_send1() {
2160 let (tx, _rx) = sync_channel::<i32>(0);
2161 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2162 }
2163
2164 #[test]
2165 fn try_send2() {
2166 let (tx, _rx) = sync_channel::<i32>(1);
2167 assert_eq!(tx.try_send(1), Ok(()));
2168 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2169 }
2170
2171 #[test]
2172 fn try_send3() {
2173 let (tx, rx) = sync_channel::<i32>(1);
2174 assert_eq!(tx.try_send(1), Ok(()));
2175 drop(rx);
2176 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2177 }
2178
2179 #[test]
2180 fn issue_15761() {
2181 fn repro() {
2182 let (tx1, rx1) = sync_channel::<()>(3);
2183 let (tx2, rx2) = sync_channel::<()>(3);
2184
2185 let _t = thread::spawn(move|| {
2186 rx1.recv().unwrap();
2187 tx2.try_send(()).unwrap();
2188 });
2189
2190 tx1.try_send(()).unwrap();
2191 rx2.recv().unwrap();
2192 }
2193
2194 for _ in 0..100 {
2195 repro()
2196 }
2197 }
2198 }