]> git.proxmox.com Git - rustc.git/blob - src/libstd/sync/mpsc/mod.rs
Imported Upstream version 1.9.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
29 //! infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
37 //! a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //! tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //! let tx = tx.clone();
79 //! thread::spawn(move|| {
80 //! tx.send(i).unwrap();
81 //! });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //! // This will wait for the parent thread to start receiving
111 //! tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115
116 #![stable(feature = "rust1", since = "1.0.0")]
117
118 // A description of how Rust's channel implementation works
119 //
120 // Channels are supposed to be the basic building block for all other
121 // concurrent primitives that are used in Rust. As a result, the channel type
122 // needs to be highly optimized, flexible, and broad enough for use everywhere.
123 //
124 // The choice of implementation of all channels is to be built on lock-free data
125 // structures. The channels themselves are then consequently also lock-free data
126 // structures. As always with lock-free code, this is a very "here be dragons"
127 // territory, especially because I'm unaware of any academic papers that have
128 // gone into great length about channels of these flavors.
129 //
130 // ## Flavors of channels
131 //
132 // From the perspective of a consumer of this library, there is only one flavor
133 // of channel. This channel can be used as a stream and cloned to allow multiple
134 // senders. Under the hood, however, there are actually three flavors of
135 // channels in play.
136 //
137 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
138 // They contain as few atomics as possible and involve one and
139 // exactly one allocation.
140 // * Streams - these channels are optimized for the non-shared use case. They
141 // use a different concurrent queue that is more tailored for this
142 // use case. The initial allocation of this flavor of channel is not
143 // optimized.
144 // * Shared - this is the most general form of channel that this module offers,
145 // a channel with multiple senders. This type is as optimized as it
146 // can be, but the previous two types mentioned are much faster for
147 // their use-cases.
148 //
149 // ## Concurrent queues
150 //
151 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
152 // recv() obviously blocks. This means that under the hood there must be some
153 // shared and concurrent queue holding all of the actual data.
154 //
155 // With two flavors of channels, two flavors of queues are also used. We have
156 // chosen to use queues from a well-known author that are abbreviated as SPSC
157 // and MPSC (single producer, single consumer and multiple producer, single
158 // consumer). SPSC queues are used for streams while MPSC queues are used for
159 // shared channels.
160 //
161 // ### SPSC optimizations
162 //
163 // The SPSC queue found online is essentially a linked list of nodes where one
164 // half of the nodes are the "queue of data" and the other half of nodes are a
165 // cache of unused nodes. The unused nodes are used such that an allocation is
166 // not required on every push() and a free doesn't need to happen on every
167 // pop().
168 //
169 // As found online, however, the cache of nodes is of an infinite size. This
170 // means that if a channel at one point in its life had 50k items in the queue,
171 // then the queue will always have the capacity for 50k items. I believed that
172 // this was an unnecessary limitation of the implementation, so I have altered
173 // the queue to optionally have a bound on the cache size.
174 //
175 // By default, streams will have an unbounded SPSC queue with a small-ish cache
176 // size. The hope is that the cache is still large enough to have very fast
177 // send() operations while not too large such that millions of channels can
178 // coexist at once.
179 //
180 // ### MPSC optimizations
181 //
182 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183 // a linked list under the hood to earn its unboundedness, but I have not put
184 // forth much effort into having a cache of nodes similar to the SPSC queue.
185 //
186 // For now, I believe that this is "ok" because shared channels are not the most
187 // common type, but soon we may wish to revisit this queue choice and determine
188 // another candidate for backend storage of shared channels.
189 //
190 // ## Overview of the Implementation
191 //
192 // Now that there's a little background on the concurrent queues used, it's
193 // worth going into much more detail about the channels themselves. The basic
194 // pseudocode for a send/recv are:
195 //
196 //
197 // send(t) recv()
198 // queue.push(t) return if queue.pop()
199 // if increment() == -1 deschedule {
200 // wakeup() if decrement() > 0
201 // cancel_deschedule()
202 // }
203 // queue.pop()
204 //
205 // As mentioned before, there are no locks in this implementation, only atomic
206 // instructions are used.
207 //
208 // ### The internal atomic counter
209 //
210 // Every channel has a shared counter with each half to keep track of the size
211 // of the queue. This counter is used to abort descheduling by the receiver and
212 // to know when to wake up on the sending side.
213 //
214 // As seen in the pseudocode, senders will increment this count and receivers
215 // will decrement the count. The theory behind this is that if a sender sees a
216 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217 // then it doesn't need to block.
218 //
219 // The recv() method has a beginning call to pop(), and if successful, it needs
220 // to decrement the count. It is a crucial implementation detail that this
221 // decrement does *not* happen to the shared counter. If this were the case,
222 // then it would be possible for the counter to be very negative when there were
223 // no receivers waiting, in which case the senders would have to determine when
224 // it was actually appropriate to wake up a receiver.
225 //
226 // Instead, the "steal count" is kept track of separately (not atomically
227 // because it's only used by receivers), and then the decrement() call when
228 // descheduling will lump in all of the recent steals into one large decrement.
229 //
230 // The implication of this is that if a sender sees a -1 count, then there's
231 // guaranteed to be a waiter waiting!
232 //
233 // ## Native Implementation
234 //
235 // A major goal of these channels is to work seamlessly on and off the runtime.
236 // All of the previous race conditions have been worded in terms of
237 // scheduler-isms (which is obviously not available without the runtime).
238 //
239 // For now, native usage of channels (off the runtime) will fall back onto
240 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
242 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243 // condition variable.
244 //
245 // ## Select
246 //
247 // Being able to support selection over channels has greatly influenced this
248 // design, and not only does selection need to work inside the runtime, but also
249 // outside the runtime.
250 //
251 // The implementation is fairly straightforward. The goal of select() is not to
252 // return some data, but only to return which channel can receive data without
253 // blocking. The implementation is essentially the entire blocking procedure
254 // followed by an increment as soon as its woken up. The cancellation procedure
255 // involves an increment and swapping out of to_wake to acquire ownership of the
256 // thread to unblock.
257 //
258 // Sadly this current implementation requires multiple allocations, so I have
259 // seen the throughput of select() be much worse than it should be. I do not
260 // believe that there is anything fundamental that needs to change about these
261 // channels, however, in order to support a more efficient select().
262 //
263 // # Conclusion
264 //
265 // And now that you've seen all the races that I found and attempted to fix,
266 // here's the code for you to find some more!
267
268 use sync::Arc;
269 use error;
270 use fmt;
271 use mem;
272 use cell::UnsafeCell;
273 use marker::Reflect;
274
275 #[unstable(feature = "mpsc_select", issue = "27800")]
276 pub use self::select::{Select, Handle};
277 use self::select::StartResult;
278 use self::select::StartResult::*;
279 use self::blocking::SignalToken;
280
281 mod blocking;
282 mod oneshot;
283 mod select;
284 mod shared;
285 mod stream;
286 mod sync;
287 mod mpsc_queue;
288 mod spsc_queue;
289
290 /// The receiving-half of Rust's channel type. This half can only be owned by
291 /// one thread
292 #[stable(feature = "rust1", since = "1.0.0")]
293 pub struct Receiver<T> {
294 inner: UnsafeCell<Flavor<T>>,
295 }
296
297 // The receiver port can be sent from place to place, so long as it
298 // is not used to receive non-sendable things.
299 #[stable(feature = "rust1", since = "1.0.0")]
300 unsafe impl<T: Send> Send for Receiver<T> { }
301
302 #[stable(feature = "rust1", since = "1.0.0")]
303 impl<T> !Sync for Receiver<T> { }
304
305 /// An iterator over messages on a receiver, this iterator will block
306 /// whenever `next` is called, waiting for a new message, and `None` will be
307 /// returned when the corresponding channel has hung up.
308 #[stable(feature = "rust1", since = "1.0.0")]
309 pub struct Iter<'a, T: 'a> {
310 rx: &'a Receiver<T>
311 }
312
313 /// An owning iterator over messages on a receiver, this iterator will block
314 /// whenever `next` is called, waiting for a new message, and `None` will be
315 /// returned when the corresponding channel has hung up.
316 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
317 pub struct IntoIter<T> {
318 rx: Receiver<T>
319 }
320
321 /// The sending-half of Rust's asynchronous channel type. This half can only be
322 /// owned by one thread, but it can be cloned to send to other threads.
323 #[stable(feature = "rust1", since = "1.0.0")]
324 pub struct Sender<T> {
325 inner: UnsafeCell<Flavor<T>>,
326 }
327
328 // The send port can be sent from place to place, so long as it
329 // is not used to send non-sendable things.
330 #[stable(feature = "rust1", since = "1.0.0")]
331 unsafe impl<T: Send> Send for Sender<T> { }
332
333 #[stable(feature = "rust1", since = "1.0.0")]
334 impl<T> !Sync for Sender<T> { }
335
336 /// The sending-half of Rust's synchronous channel type. This half can only be
337 /// owned by one thread, but it can be cloned to send to other threads.
338 #[stable(feature = "rust1", since = "1.0.0")]
339 pub struct SyncSender<T> {
340 inner: Arc<UnsafeCell<sync::Packet<T>>>,
341 }
342
343 #[stable(feature = "rust1", since = "1.0.0")]
344 unsafe impl<T: Send> Send for SyncSender<T> {}
345
346 #[stable(feature = "rust1", since = "1.0.0")]
347 impl<T> !Sync for SyncSender<T> {}
348
349 /// An error returned from the `send` function on channels.
350 ///
351 /// A `send` operation can only fail if the receiving end of a channel is
352 /// disconnected, implying that the data could never be received. The error
353 /// contains the data being sent as a payload so it can be recovered.
354 #[stable(feature = "rust1", since = "1.0.0")]
355 #[derive(PartialEq, Eq, Clone, Copy)]
356 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
357
358 /// An error returned from the `recv` function on a `Receiver`.
359 ///
360 /// The `recv` operation can only fail if the sending half of a channel is
361 /// disconnected, implying that no further messages will ever be received.
362 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
363 #[stable(feature = "rust1", since = "1.0.0")]
364 pub struct RecvError;
365
366 /// This enumeration is the list of the possible reasons that `try_recv` could
367 /// not return data when called.
368 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
369 #[stable(feature = "rust1", since = "1.0.0")]
370 pub enum TryRecvError {
371 /// This channel is currently empty, but the sender(s) have not yet
372 /// disconnected, so data may yet become available.
373 #[stable(feature = "rust1", since = "1.0.0")]
374 Empty,
375
376 /// This channel's sending half has become disconnected, and there will
377 /// never be any more data received on this channel
378 #[stable(feature = "rust1", since = "1.0.0")]
379 Disconnected,
380 }
381
382 /// This enumeration is the list of the possible error outcomes for the
383 /// `SyncSender::try_send` method.
384 #[stable(feature = "rust1", since = "1.0.0")]
385 #[derive(PartialEq, Eq, Clone, Copy)]
386 pub enum TrySendError<T> {
387 /// The data could not be sent on the channel because it would require that
388 /// the callee block to send the data.
389 ///
390 /// If this is a buffered channel, then the buffer is full at this time. If
391 /// this is not a buffered channel, then there is no receiver available to
392 /// acquire the data.
393 #[stable(feature = "rust1", since = "1.0.0")]
394 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
395
396 /// This channel's receiving half has disconnected, so the data could not be
397 /// sent. The data is returned back to the callee in this case.
398 #[stable(feature = "rust1", since = "1.0.0")]
399 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
400 }
401
402 enum Flavor<T> {
403 Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
404 Stream(Arc<UnsafeCell<stream::Packet<T>>>),
405 Shared(Arc<UnsafeCell<shared::Packet<T>>>),
406 Sync(Arc<UnsafeCell<sync::Packet<T>>>),
407 }
408
409 #[doc(hidden)]
410 trait UnsafeFlavor<T> {
411 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
412 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
413 &mut *self.inner_unsafe().get()
414 }
415 unsafe fn inner(&self) -> &Flavor<T> {
416 &*self.inner_unsafe().get()
417 }
418 }
419 impl<T> UnsafeFlavor<T> for Sender<T> {
420 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
421 &self.inner
422 }
423 }
424 impl<T> UnsafeFlavor<T> for Receiver<T> {
425 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
426 &self.inner
427 }
428 }
429
430 /// Creates a new asynchronous channel, returning the sender/receiver halves.
431 ///
432 /// All data sent on the sender will become available on the receiver, and no
433 /// send will block the calling thread (this channel has an "infinite buffer").
434 ///
435 /// # Examples
436 ///
437 /// ```
438 /// use std::sync::mpsc::channel;
439 /// use std::thread;
440 ///
441 /// // tx is the sending half (tx for transmission), and rx is the receiving
442 /// // half (rx for receiving).
443 /// let (tx, rx) = channel();
444 ///
445 /// // Spawn off an expensive computation
446 /// thread::spawn(move|| {
447 /// # fn expensive_computation() {}
448 /// tx.send(expensive_computation()).unwrap();
449 /// });
450 ///
451 /// // Do some useful work for awhile
452 ///
453 /// // Let's see what that answer was
454 /// println!("{:?}", rx.recv().unwrap());
455 /// ```
456 #[stable(feature = "rust1", since = "1.0.0")]
457 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
458 let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
459 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
460 }
461
462 /// Creates a new synchronous, bounded channel.
463 ///
464 /// Like asynchronous channels, the `Receiver` will block until a message
465 /// becomes available. These channels differ greatly in the semantics of the
466 /// sender from asynchronous channels, however.
467 ///
468 /// This channel has an internal buffer on which messages will be queued. When
469 /// the internal buffer becomes full, future sends will *block* waiting for the
470 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
471 /// becomes "rendezvous channel" where each send will not return until a recv
472 /// is paired with it.
473 ///
474 /// As with asynchronous channels, all senders will panic in `send` if the
475 /// `Receiver` has been destroyed.
476 ///
477 /// # Examples
478 ///
479 /// ```
480 /// use std::sync::mpsc::sync_channel;
481 /// use std::thread;
482 ///
483 /// let (tx, rx) = sync_channel(1);
484 ///
485 /// // this returns immediately
486 /// tx.send(1).unwrap();
487 ///
488 /// thread::spawn(move|| {
489 /// // this will block until the previous message has been received
490 /// tx.send(2).unwrap();
491 /// });
492 ///
493 /// assert_eq!(rx.recv().unwrap(), 1);
494 /// assert_eq!(rx.recv().unwrap(), 2);
495 /// ```
496 #[stable(feature = "rust1", since = "1.0.0")]
497 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
498 let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
499 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
500 }
501
502 ////////////////////////////////////////////////////////////////////////////////
503 // Sender
504 ////////////////////////////////////////////////////////////////////////////////
505
506 impl<T> Sender<T> {
507 fn new(inner: Flavor<T>) -> Sender<T> {
508 Sender {
509 inner: UnsafeCell::new(inner),
510 }
511 }
512
513 /// Attempts to send a value on this channel, returning it back if it could
514 /// not be sent.
515 ///
516 /// A successful send occurs when it is determined that the other end of
517 /// the channel has not hung up already. An unsuccessful send would be one
518 /// where the corresponding receiver has already been deallocated. Note
519 /// that a return value of `Err` means that the data will never be
520 /// received, but a return value of `Ok` does *not* mean that the data
521 /// will be received. It is possible for the corresponding receiver to
522 /// hang up immediately after this function returns `Ok`.
523 ///
524 /// This method will never block the current thread.
525 ///
526 /// # Examples
527 ///
528 /// ```
529 /// use std::sync::mpsc::channel;
530 ///
531 /// let (tx, rx) = channel();
532 ///
533 /// // This send is always successful
534 /// tx.send(1).unwrap();
535 ///
536 /// // This send will fail because the receiver is gone
537 /// drop(rx);
538 /// assert_eq!(tx.send(1).err().unwrap().0, 1);
539 /// ```
540 #[stable(feature = "rust1", since = "1.0.0")]
541 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
542 let (new_inner, ret) = match *unsafe { self.inner() } {
543 Flavor::Oneshot(ref p) => {
544 unsafe {
545 let p = p.get();
546 if !(*p).sent() {
547 return (*p).send(t).map_err(SendError);
548 } else {
549 let a =
550 Arc::new(UnsafeCell::new(stream::Packet::new()));
551 let rx = Receiver::new(Flavor::Stream(a.clone()));
552 match (*p).upgrade(rx) {
553 oneshot::UpSuccess => {
554 let ret = (*a.get()).send(t);
555 (a, ret)
556 }
557 oneshot::UpDisconnected => (a, Err(t)),
558 oneshot::UpWoke(token) => {
559 // This send cannot panic because the thread is
560 // asleep (we're looking at it), so the receiver
561 // can't go away.
562 (*a.get()).send(t).ok().unwrap();
563 token.signal();
564 (a, Ok(()))
565 }
566 }
567 }
568 }
569 }
570 Flavor::Stream(ref p) => return unsafe {
571 (*p.get()).send(t).map_err(SendError)
572 },
573 Flavor::Shared(ref p) => return unsafe {
574 (*p.get()).send(t).map_err(SendError)
575 },
576 Flavor::Sync(..) => unreachable!(),
577 };
578
579 unsafe {
580 let tmp = Sender::new(Flavor::Stream(new_inner));
581 mem::swap(self.inner_mut(), tmp.inner_mut());
582 }
583 ret.map_err(SendError)
584 }
585 }
586
587 #[stable(feature = "rust1", since = "1.0.0")]
588 impl<T> Clone for Sender<T> {
589 fn clone(&self) -> Sender<T> {
590 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
591 Flavor::Oneshot(ref p) => {
592 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
593 unsafe {
594 let guard = (*a.get()).postinit_lock();
595 let rx = Receiver::new(Flavor::Shared(a.clone()));
596 match (*p.get()).upgrade(rx) {
597 oneshot::UpSuccess |
598 oneshot::UpDisconnected => (a, None, guard),
599 oneshot::UpWoke(task) => (a, Some(task), guard)
600 }
601 }
602 }
603 Flavor::Stream(ref p) => {
604 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
605 unsafe {
606 let guard = (*a.get()).postinit_lock();
607 let rx = Receiver::new(Flavor::Shared(a.clone()));
608 match (*p.get()).upgrade(rx) {
609 stream::UpSuccess |
610 stream::UpDisconnected => (a, None, guard),
611 stream::UpWoke(task) => (a, Some(task), guard),
612 }
613 }
614 }
615 Flavor::Shared(ref p) => {
616 unsafe { (*p.get()).clone_chan(); }
617 return Sender::new(Flavor::Shared(p.clone()));
618 }
619 Flavor::Sync(..) => unreachable!(),
620 };
621
622 unsafe {
623 (*packet.get()).inherit_blocker(sleeper, guard);
624
625 let tmp = Sender::new(Flavor::Shared(packet.clone()));
626 mem::swap(self.inner_mut(), tmp.inner_mut());
627 }
628 Sender::new(Flavor::Shared(packet))
629 }
630 }
631
632 #[stable(feature = "rust1", since = "1.0.0")]
633 impl<T> Drop for Sender<T> {
634 fn drop(&mut self) {
635 match *unsafe { self.inner_mut() } {
636 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
637 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
638 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
639 Flavor::Sync(..) => unreachable!(),
640 }
641 }
642 }
643
644 #[stable(feature = "mpsc_debug", since = "1.7.0")]
645 impl<T> fmt::Debug for Sender<T> {
646 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
647 write!(f, "Sender {{ .. }}")
648 }
649 }
650
651 ////////////////////////////////////////////////////////////////////////////////
652 // SyncSender
653 ////////////////////////////////////////////////////////////////////////////////
654
655 impl<T> SyncSender<T> {
656 fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
657 SyncSender { inner: inner }
658 }
659
660 /// Sends a value on this synchronous channel.
661 ///
662 /// This function will *block* until space in the internal buffer becomes
663 /// available or a receiver is available to hand off the message to.
664 ///
665 /// Note that a successful send does *not* guarantee that the receiver will
666 /// ever see the data if there is a buffer on this channel. Items may be
667 /// enqueued in the internal buffer for the receiver to receive at a later
668 /// time. If the buffer size is 0, however, it can be guaranteed that the
669 /// receiver has indeed received the data if this function returns success.
670 ///
671 /// This function will never panic, but it may return `Err` if the
672 /// `Receiver` has disconnected and is no longer able to receive
673 /// information.
674 #[stable(feature = "rust1", since = "1.0.0")]
675 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
676 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
677 }
678
679 /// Attempts to send a value on this channel without blocking.
680 ///
681 /// This method differs from `send` by returning immediately if the
682 /// channel's buffer is full or no receiver is waiting to acquire some
683 /// data. Compared with `send`, this function has two failure cases
684 /// instead of one (one for disconnection, one for a full buffer).
685 ///
686 /// See `SyncSender::send` for notes about guarantees of whether the
687 /// receiver has received the data or not if this function is successful.
688 #[stable(feature = "rust1", since = "1.0.0")]
689 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
690 unsafe { (*self.inner.get()).try_send(t) }
691 }
692 }
693
694 #[stable(feature = "rust1", since = "1.0.0")]
695 impl<T> Clone for SyncSender<T> {
696 fn clone(&self) -> SyncSender<T> {
697 unsafe { (*self.inner.get()).clone_chan(); }
698 SyncSender::new(self.inner.clone())
699 }
700 }
701
702 #[stable(feature = "rust1", since = "1.0.0")]
703 impl<T> Drop for SyncSender<T> {
704 fn drop(&mut self) {
705 unsafe { (*self.inner.get()).drop_chan(); }
706 }
707 }
708
709 #[stable(feature = "mpsc_debug", since = "1.7.0")]
710 impl<T> fmt::Debug for SyncSender<T> {
711 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712 write!(f, "SyncSender {{ .. }}")
713 }
714 }
715
716 ////////////////////////////////////////////////////////////////////////////////
717 // Receiver
718 ////////////////////////////////////////////////////////////////////////////////
719
720 impl<T> Receiver<T> {
721 fn new(inner: Flavor<T>) -> Receiver<T> {
722 Receiver { inner: UnsafeCell::new(inner) }
723 }
724
725 /// Attempts to return a pending value on this receiver without blocking
726 ///
727 /// This method will never block the caller in order to wait for data to
728 /// become available. Instead, this will always return immediately with a
729 /// possible option of pending data on the channel.
730 ///
731 /// This is useful for a flavor of "optimistic check" before deciding to
732 /// block on a receiver.
733 #[stable(feature = "rust1", since = "1.0.0")]
734 pub fn try_recv(&self) -> Result<T, TryRecvError> {
735 loop {
736 let new_port = match *unsafe { self.inner() } {
737 Flavor::Oneshot(ref p) => {
738 match unsafe { (*p.get()).try_recv() } {
739 Ok(t) => return Ok(t),
740 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
741 Err(oneshot::Disconnected) => {
742 return Err(TryRecvError::Disconnected)
743 }
744 Err(oneshot::Upgraded(rx)) => rx,
745 }
746 }
747 Flavor::Stream(ref p) => {
748 match unsafe { (*p.get()).try_recv() } {
749 Ok(t) => return Ok(t),
750 Err(stream::Empty) => return Err(TryRecvError::Empty),
751 Err(stream::Disconnected) => {
752 return Err(TryRecvError::Disconnected)
753 }
754 Err(stream::Upgraded(rx)) => rx,
755 }
756 }
757 Flavor::Shared(ref p) => {
758 match unsafe { (*p.get()).try_recv() } {
759 Ok(t) => return Ok(t),
760 Err(shared::Empty) => return Err(TryRecvError::Empty),
761 Err(shared::Disconnected) => {
762 return Err(TryRecvError::Disconnected)
763 }
764 }
765 }
766 Flavor::Sync(ref p) => {
767 match unsafe { (*p.get()).try_recv() } {
768 Ok(t) => return Ok(t),
769 Err(sync::Empty) => return Err(TryRecvError::Empty),
770 Err(sync::Disconnected) => {
771 return Err(TryRecvError::Disconnected)
772 }
773 }
774 }
775 };
776 unsafe {
777 mem::swap(self.inner_mut(),
778 new_port.inner_mut());
779 }
780 }
781 }
782
783 /// Attempts to wait for a value on this receiver, returning an error if the
784 /// corresponding channel has hung up.
785 ///
786 /// This function will always block the current thread if there is no data
787 /// available and it's possible for more data to be sent. Once a message is
788 /// sent to the corresponding `Sender`, then this receiver will wake up and
789 /// return that message.
790 ///
791 /// If the corresponding `Sender` has disconnected, or it disconnects while
792 /// this call is blocking, this call will wake up and return `Err` to
793 /// indicate that no more messages can ever be received on this channel.
794 /// However, since channels are buffered, messages sent before the disconnect
795 /// will still be properly received.
796 ///
797 /// # Examples
798 ///
799 /// ```
800 /// use std::sync::mpsc;
801 /// use std::thread;
802 ///
803 /// let (send, recv) = mpsc::channel();
804 /// let handle = thread::spawn(move || {
805 /// send.send(1u8).unwrap();
806 /// });
807 ///
808 /// handle.join().unwrap();
809 ///
810 /// assert_eq!(Ok(1), recv.recv());
811 /// ```
812 ///
813 /// Buffering behavior:
814 ///
815 /// ```
816 /// use std::sync::mpsc;
817 /// use std::thread;
818 /// use std::sync::mpsc::RecvError;
819 ///
820 /// let (send, recv) = mpsc::channel();
821 /// let handle = thread::spawn(move || {
822 /// send.send(1u8).unwrap();
823 /// send.send(2).unwrap();
824 /// send.send(3).unwrap();
825 /// drop(send);
826 /// });
827 ///
828 /// // wait for the thread to join so we ensure the sender is dropped
829 /// handle.join().unwrap();
830 ///
831 /// assert_eq!(Ok(1), recv.recv());
832 /// assert_eq!(Ok(2), recv.recv());
833 /// assert_eq!(Ok(3), recv.recv());
834 /// assert_eq!(Err(RecvError), recv.recv());
835 /// ```
836 #[stable(feature = "rust1", since = "1.0.0")]
837 pub fn recv(&self) -> Result<T, RecvError> {
838 loop {
839 let new_port = match *unsafe { self.inner() } {
840 Flavor::Oneshot(ref p) => {
841 match unsafe { (*p.get()).recv() } {
842 Ok(t) => return Ok(t),
843 Err(oneshot::Empty) => return unreachable!(),
844 Err(oneshot::Disconnected) => return Err(RecvError),
845 Err(oneshot::Upgraded(rx)) => rx,
846 }
847 }
848 Flavor::Stream(ref p) => {
849 match unsafe { (*p.get()).recv() } {
850 Ok(t) => return Ok(t),
851 Err(stream::Empty) => return unreachable!(),
852 Err(stream::Disconnected) => return Err(RecvError),
853 Err(stream::Upgraded(rx)) => rx,
854 }
855 }
856 Flavor::Shared(ref p) => {
857 match unsafe { (*p.get()).recv() } {
858 Ok(t) => return Ok(t),
859 Err(shared::Empty) => return unreachable!(),
860 Err(shared::Disconnected) => return Err(RecvError),
861 }
862 }
863 Flavor::Sync(ref p) => return unsafe {
864 (*p.get()).recv().map_err(|()| RecvError)
865 }
866 };
867 unsafe {
868 mem::swap(self.inner_mut(), new_port.inner_mut());
869 }
870 }
871 }
872
873 /// Returns an iterator that will block waiting for messages, but never
874 /// `panic!`. It will return `None` when the channel has hung up.
875 #[stable(feature = "rust1", since = "1.0.0")]
876 pub fn iter(&self) -> Iter<T> {
877 Iter { rx: self }
878 }
879 }
880
881 impl<T> select::Packet for Receiver<T> {
882 fn can_recv(&self) -> bool {
883 loop {
884 let new_port = match *unsafe { self.inner() } {
885 Flavor::Oneshot(ref p) => {
886 match unsafe { (*p.get()).can_recv() } {
887 Ok(ret) => return ret,
888 Err(upgrade) => upgrade,
889 }
890 }
891 Flavor::Stream(ref p) => {
892 match unsafe { (*p.get()).can_recv() } {
893 Ok(ret) => return ret,
894 Err(upgrade) => upgrade,
895 }
896 }
897 Flavor::Shared(ref p) => {
898 return unsafe { (*p.get()).can_recv() };
899 }
900 Flavor::Sync(ref p) => {
901 return unsafe { (*p.get()).can_recv() };
902 }
903 };
904 unsafe {
905 mem::swap(self.inner_mut(),
906 new_port.inner_mut());
907 }
908 }
909 }
910
911 fn start_selection(&self, mut token: SignalToken) -> StartResult {
912 loop {
913 let (t, new_port) = match *unsafe { self.inner() } {
914 Flavor::Oneshot(ref p) => {
915 match unsafe { (*p.get()).start_selection(token) } {
916 oneshot::SelSuccess => return Installed,
917 oneshot::SelCanceled => return Abort,
918 oneshot::SelUpgraded(t, rx) => (t, rx),
919 }
920 }
921 Flavor::Stream(ref p) => {
922 match unsafe { (*p.get()).start_selection(token) } {
923 stream::SelSuccess => return Installed,
924 stream::SelCanceled => return Abort,
925 stream::SelUpgraded(t, rx) => (t, rx),
926 }
927 }
928 Flavor::Shared(ref p) => {
929 return unsafe { (*p.get()).start_selection(token) };
930 }
931 Flavor::Sync(ref p) => {
932 return unsafe { (*p.get()).start_selection(token) };
933 }
934 };
935 token = t;
936 unsafe {
937 mem::swap(self.inner_mut(), new_port.inner_mut());
938 }
939 }
940 }
941
942 fn abort_selection(&self) -> bool {
943 let mut was_upgrade = false;
944 loop {
945 let result = match *unsafe { self.inner() } {
946 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
947 Flavor::Stream(ref p) => unsafe {
948 (*p.get()).abort_selection(was_upgrade)
949 },
950 Flavor::Shared(ref p) => return unsafe {
951 (*p.get()).abort_selection(was_upgrade)
952 },
953 Flavor::Sync(ref p) => return unsafe {
954 (*p.get()).abort_selection()
955 },
956 };
957 let new_port = match result { Ok(b) => return b, Err(p) => p };
958 was_upgrade = true;
959 unsafe {
960 mem::swap(self.inner_mut(),
961 new_port.inner_mut());
962 }
963 }
964 }
965 }
966
967 #[stable(feature = "rust1", since = "1.0.0")]
968 impl<'a, T> Iterator for Iter<'a, T> {
969 type Item = T;
970
971 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
972 }
973
974 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
975 impl<'a, T> IntoIterator for &'a Receiver<T> {
976 type Item = T;
977 type IntoIter = Iter<'a, T>;
978
979 fn into_iter(self) -> Iter<'a, T> { self.iter() }
980 }
981
982 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
983 impl<T> Iterator for IntoIter<T> {
984 type Item = T;
985 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
986 }
987
988 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
989 impl <T> IntoIterator for Receiver<T> {
990 type Item = T;
991 type IntoIter = IntoIter<T>;
992
993 fn into_iter(self) -> IntoIter<T> {
994 IntoIter { rx: self }
995 }
996 }
997
998 #[stable(feature = "rust1", since = "1.0.0")]
999 impl<T> Drop for Receiver<T> {
1000 fn drop(&mut self) {
1001 match *unsafe { self.inner_mut() } {
1002 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
1003 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
1004 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
1005 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
1006 }
1007 }
1008 }
1009
1010 #[stable(feature = "mpsc_debug", since = "1.7.0")]
1011 impl<T> fmt::Debug for Receiver<T> {
1012 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1013 write!(f, "Receiver {{ .. }}")
1014 }
1015 }
1016
1017 #[stable(feature = "rust1", since = "1.0.0")]
1018 impl<T> fmt::Debug for SendError<T> {
1019 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1020 "SendError(..)".fmt(f)
1021 }
1022 }
1023
1024 #[stable(feature = "rust1", since = "1.0.0")]
1025 impl<T> fmt::Display for SendError<T> {
1026 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1027 "sending on a closed channel".fmt(f)
1028 }
1029 }
1030
1031 #[stable(feature = "rust1", since = "1.0.0")]
1032 impl<T: Send + Reflect> error::Error for SendError<T> {
1033 fn description(&self) -> &str {
1034 "sending on a closed channel"
1035 }
1036
1037 fn cause(&self) -> Option<&error::Error> {
1038 None
1039 }
1040 }
1041
1042 #[stable(feature = "rust1", since = "1.0.0")]
1043 impl<T> fmt::Debug for TrySendError<T> {
1044 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1045 match *self {
1046 TrySendError::Full(..) => "Full(..)".fmt(f),
1047 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1048 }
1049 }
1050 }
1051
1052 #[stable(feature = "rust1", since = "1.0.0")]
1053 impl<T> fmt::Display for TrySendError<T> {
1054 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1055 match *self {
1056 TrySendError::Full(..) => {
1057 "sending on a full channel".fmt(f)
1058 }
1059 TrySendError::Disconnected(..) => {
1060 "sending on a closed channel".fmt(f)
1061 }
1062 }
1063 }
1064 }
1065
1066 #[stable(feature = "rust1", since = "1.0.0")]
1067 impl<T: Send + Reflect> error::Error for TrySendError<T> {
1068
1069 fn description(&self) -> &str {
1070 match *self {
1071 TrySendError::Full(..) => {
1072 "sending on a full channel"
1073 }
1074 TrySendError::Disconnected(..) => {
1075 "sending on a closed channel"
1076 }
1077 }
1078 }
1079
1080 fn cause(&self) -> Option<&error::Error> {
1081 None
1082 }
1083 }
1084
1085 #[stable(feature = "rust1", since = "1.0.0")]
1086 impl fmt::Display for RecvError {
1087 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1088 "receiving on a closed channel".fmt(f)
1089 }
1090 }
1091
1092 #[stable(feature = "rust1", since = "1.0.0")]
1093 impl error::Error for RecvError {
1094
1095 fn description(&self) -> &str {
1096 "receiving on a closed channel"
1097 }
1098
1099 fn cause(&self) -> Option<&error::Error> {
1100 None
1101 }
1102 }
1103
1104 #[stable(feature = "rust1", since = "1.0.0")]
1105 impl fmt::Display for TryRecvError {
1106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1107 match *self {
1108 TryRecvError::Empty => {
1109 "receiving on an empty channel".fmt(f)
1110 }
1111 TryRecvError::Disconnected => {
1112 "receiving on a closed channel".fmt(f)
1113 }
1114 }
1115 }
1116 }
1117
1118 #[stable(feature = "rust1", since = "1.0.0")]
1119 impl error::Error for TryRecvError {
1120
1121 fn description(&self) -> &str {
1122 match *self {
1123 TryRecvError::Empty => {
1124 "receiving on an empty channel"
1125 }
1126 TryRecvError::Disconnected => {
1127 "receiving on a closed channel"
1128 }
1129 }
1130 }
1131
1132 fn cause(&self) -> Option<&error::Error> {
1133 None
1134 }
1135 }
1136
1137 #[cfg(test)]
1138 mod tests {
1139 use prelude::v1::*;
1140
1141 use env;
1142 use super::*;
1143 use thread;
1144
1145 pub fn stress_factor() -> usize {
1146 match env::var("RUST_TEST_STRESS") {
1147 Ok(val) => val.parse().unwrap(),
1148 Err(..) => 1,
1149 }
1150 }
1151
1152 #[test]
1153 fn smoke() {
1154 let (tx, rx) = channel::<i32>();
1155 tx.send(1).unwrap();
1156 assert_eq!(rx.recv().unwrap(), 1);
1157 }
1158
1159 #[test]
1160 fn drop_full() {
1161 let (tx, _rx) = channel::<Box<isize>>();
1162 tx.send(box 1).unwrap();
1163 }
1164
1165 #[test]
1166 fn drop_full_shared() {
1167 let (tx, _rx) = channel::<Box<isize>>();
1168 drop(tx.clone());
1169 drop(tx.clone());
1170 tx.send(box 1).unwrap();
1171 }
1172
1173 #[test]
1174 fn smoke_shared() {
1175 let (tx, rx) = channel::<i32>();
1176 tx.send(1).unwrap();
1177 assert_eq!(rx.recv().unwrap(), 1);
1178 let tx = tx.clone();
1179 tx.send(1).unwrap();
1180 assert_eq!(rx.recv().unwrap(), 1);
1181 }
1182
1183 #[test]
1184 fn smoke_threads() {
1185 let (tx, rx) = channel::<i32>();
1186 let _t = thread::spawn(move|| {
1187 tx.send(1).unwrap();
1188 });
1189 assert_eq!(rx.recv().unwrap(), 1);
1190 }
1191
1192 #[test]
1193 fn smoke_port_gone() {
1194 let (tx, rx) = channel::<i32>();
1195 drop(rx);
1196 assert!(tx.send(1).is_err());
1197 }
1198
1199 #[test]
1200 fn smoke_shared_port_gone() {
1201 let (tx, rx) = channel::<i32>();
1202 drop(rx);
1203 assert!(tx.send(1).is_err())
1204 }
1205
1206 #[test]
1207 fn smoke_shared_port_gone2() {
1208 let (tx, rx) = channel::<i32>();
1209 drop(rx);
1210 let tx2 = tx.clone();
1211 drop(tx);
1212 assert!(tx2.send(1).is_err());
1213 }
1214
1215 #[test]
1216 fn port_gone_concurrent() {
1217 let (tx, rx) = channel::<i32>();
1218 let _t = thread::spawn(move|| {
1219 rx.recv().unwrap();
1220 });
1221 while tx.send(1).is_ok() {}
1222 }
1223
1224 #[test]
1225 fn port_gone_concurrent_shared() {
1226 let (tx, rx) = channel::<i32>();
1227 let tx2 = tx.clone();
1228 let _t = thread::spawn(move|| {
1229 rx.recv().unwrap();
1230 });
1231 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1232 }
1233
1234 #[test]
1235 fn smoke_chan_gone() {
1236 let (tx, rx) = channel::<i32>();
1237 drop(tx);
1238 assert!(rx.recv().is_err());
1239 }
1240
1241 #[test]
1242 fn smoke_chan_gone_shared() {
1243 let (tx, rx) = channel::<()>();
1244 let tx2 = tx.clone();
1245 drop(tx);
1246 drop(tx2);
1247 assert!(rx.recv().is_err());
1248 }
1249
1250 #[test]
1251 fn chan_gone_concurrent() {
1252 let (tx, rx) = channel::<i32>();
1253 let _t = thread::spawn(move|| {
1254 tx.send(1).unwrap();
1255 tx.send(1).unwrap();
1256 });
1257 while rx.recv().is_ok() {}
1258 }
1259
1260 #[test]
1261 fn stress() {
1262 let (tx, rx) = channel::<i32>();
1263 let t = thread::spawn(move|| {
1264 for _ in 0..10000 { tx.send(1).unwrap(); }
1265 });
1266 for _ in 0..10000 {
1267 assert_eq!(rx.recv().unwrap(), 1);
1268 }
1269 t.join().ok().unwrap();
1270 }
1271
1272 #[test]
1273 fn stress_shared() {
1274 const AMT: u32 = 10000;
1275 const NTHREADS: u32 = 8;
1276 let (tx, rx) = channel::<i32>();
1277
1278 let t = thread::spawn(move|| {
1279 for _ in 0..AMT * NTHREADS {
1280 assert_eq!(rx.recv().unwrap(), 1);
1281 }
1282 match rx.try_recv() {
1283 Ok(..) => panic!(),
1284 _ => {}
1285 }
1286 });
1287
1288 for _ in 0..NTHREADS {
1289 let tx = tx.clone();
1290 thread::spawn(move|| {
1291 for _ in 0..AMT { tx.send(1).unwrap(); }
1292 });
1293 }
1294 drop(tx);
1295 t.join().ok().unwrap();
1296 }
1297
1298 #[test]
1299 fn send_from_outside_runtime() {
1300 let (tx1, rx1) = channel::<()>();
1301 let (tx2, rx2) = channel::<i32>();
1302 let t1 = thread::spawn(move|| {
1303 tx1.send(()).unwrap();
1304 for _ in 0..40 {
1305 assert_eq!(rx2.recv().unwrap(), 1);
1306 }
1307 });
1308 rx1.recv().unwrap();
1309 let t2 = thread::spawn(move|| {
1310 for _ in 0..40 {
1311 tx2.send(1).unwrap();
1312 }
1313 });
1314 t1.join().ok().unwrap();
1315 t2.join().ok().unwrap();
1316 }
1317
1318 #[test]
1319 fn recv_from_outside_runtime() {
1320 let (tx, rx) = channel::<i32>();
1321 let t = thread::spawn(move|| {
1322 for _ in 0..40 {
1323 assert_eq!(rx.recv().unwrap(), 1);
1324 }
1325 });
1326 for _ in 0..40 {
1327 tx.send(1).unwrap();
1328 }
1329 t.join().ok().unwrap();
1330 }
1331
1332 #[test]
1333 fn no_runtime() {
1334 let (tx1, rx1) = channel::<i32>();
1335 let (tx2, rx2) = channel::<i32>();
1336 let t1 = thread::spawn(move|| {
1337 assert_eq!(rx1.recv().unwrap(), 1);
1338 tx2.send(2).unwrap();
1339 });
1340 let t2 = thread::spawn(move|| {
1341 tx1.send(1).unwrap();
1342 assert_eq!(rx2.recv().unwrap(), 2);
1343 });
1344 t1.join().ok().unwrap();
1345 t2.join().ok().unwrap();
1346 }
1347
1348 #[test]
1349 fn oneshot_single_thread_close_port_first() {
1350 // Simple test of closing without sending
1351 let (_tx, rx) = channel::<i32>();
1352 drop(rx);
1353 }
1354
1355 #[test]
1356 fn oneshot_single_thread_close_chan_first() {
1357 // Simple test of closing without sending
1358 let (tx, _rx) = channel::<i32>();
1359 drop(tx);
1360 }
1361
1362 #[test]
1363 fn oneshot_single_thread_send_port_close() {
1364 // Testing that the sender cleans up the payload if receiver is closed
1365 let (tx, rx) = channel::<Box<i32>>();
1366 drop(rx);
1367 assert!(tx.send(box 0).is_err());
1368 }
1369
1370 #[test]
1371 fn oneshot_single_thread_recv_chan_close() {
1372 // Receiving on a closed chan will panic
1373 let res = thread::spawn(move|| {
1374 let (tx, rx) = channel::<i32>();
1375 drop(tx);
1376 rx.recv().unwrap();
1377 }).join();
1378 // What is our res?
1379 assert!(res.is_err());
1380 }
1381
1382 #[test]
1383 fn oneshot_single_thread_send_then_recv() {
1384 let (tx, rx) = channel::<Box<i32>>();
1385 tx.send(box 10).unwrap();
1386 assert!(rx.recv().unwrap() == box 10);
1387 }
1388
1389 #[test]
1390 fn oneshot_single_thread_try_send_open() {
1391 let (tx, rx) = channel::<i32>();
1392 assert!(tx.send(10).is_ok());
1393 assert!(rx.recv().unwrap() == 10);
1394 }
1395
1396 #[test]
1397 fn oneshot_single_thread_try_send_closed() {
1398 let (tx, rx) = channel::<i32>();
1399 drop(rx);
1400 assert!(tx.send(10).is_err());
1401 }
1402
1403 #[test]
1404 fn oneshot_single_thread_try_recv_open() {
1405 let (tx, rx) = channel::<i32>();
1406 tx.send(10).unwrap();
1407 assert!(rx.recv() == Ok(10));
1408 }
1409
1410 #[test]
1411 fn oneshot_single_thread_try_recv_closed() {
1412 let (tx, rx) = channel::<i32>();
1413 drop(tx);
1414 assert!(rx.recv().is_err());
1415 }
1416
1417 #[test]
1418 fn oneshot_single_thread_peek_data() {
1419 let (tx, rx) = channel::<i32>();
1420 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1421 tx.send(10).unwrap();
1422 assert_eq!(rx.try_recv(), Ok(10));
1423 }
1424
1425 #[test]
1426 fn oneshot_single_thread_peek_close() {
1427 let (tx, rx) = channel::<i32>();
1428 drop(tx);
1429 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1430 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1431 }
1432
1433 #[test]
1434 fn oneshot_single_thread_peek_open() {
1435 let (_tx, rx) = channel::<i32>();
1436 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1437 }
1438
1439 #[test]
1440 fn oneshot_multi_task_recv_then_send() {
1441 let (tx, rx) = channel::<Box<i32>>();
1442 let _t = thread::spawn(move|| {
1443 assert!(rx.recv().unwrap() == box 10);
1444 });
1445
1446 tx.send(box 10).unwrap();
1447 }
1448
1449 #[test]
1450 fn oneshot_multi_task_recv_then_close() {
1451 let (tx, rx) = channel::<Box<i32>>();
1452 let _t = thread::spawn(move|| {
1453 drop(tx);
1454 });
1455 let res = thread::spawn(move|| {
1456 assert!(rx.recv().unwrap() == box 10);
1457 }).join();
1458 assert!(res.is_err());
1459 }
1460
1461 #[test]
1462 fn oneshot_multi_thread_close_stress() {
1463 for _ in 0..stress_factor() {
1464 let (tx, rx) = channel::<i32>();
1465 let _t = thread::spawn(move|| {
1466 drop(rx);
1467 });
1468 drop(tx);
1469 }
1470 }
1471
1472 #[test]
1473 fn oneshot_multi_thread_send_close_stress() {
1474 for _ in 0..stress_factor() {
1475 let (tx, rx) = channel::<i32>();
1476 let _t = thread::spawn(move|| {
1477 drop(rx);
1478 });
1479 let _ = thread::spawn(move|| {
1480 tx.send(1).unwrap();
1481 }).join();
1482 }
1483 }
1484
1485 #[test]
1486 fn oneshot_multi_thread_recv_close_stress() {
1487 for _ in 0..stress_factor() {
1488 let (tx, rx) = channel::<i32>();
1489 thread::spawn(move|| {
1490 let res = thread::spawn(move|| {
1491 rx.recv().unwrap();
1492 }).join();
1493 assert!(res.is_err());
1494 });
1495 let _t = thread::spawn(move|| {
1496 thread::spawn(move|| {
1497 drop(tx);
1498 });
1499 });
1500 }
1501 }
1502
1503 #[test]
1504 fn oneshot_multi_thread_send_recv_stress() {
1505 for _ in 0..stress_factor() {
1506 let (tx, rx) = channel::<Box<isize>>();
1507 let _t = thread::spawn(move|| {
1508 tx.send(box 10).unwrap();
1509 });
1510 assert!(rx.recv().unwrap() == box 10);
1511 }
1512 }
1513
1514 #[test]
1515 fn stream_send_recv_stress() {
1516 for _ in 0..stress_factor() {
1517 let (tx, rx) = channel();
1518
1519 send(tx, 0);
1520 recv(rx, 0);
1521
1522 fn send(tx: Sender<Box<i32>>, i: i32) {
1523 if i == 10 { return }
1524
1525 thread::spawn(move|| {
1526 tx.send(box i).unwrap();
1527 send(tx, i + 1);
1528 });
1529 }
1530
1531 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1532 if i == 10 { return }
1533
1534 thread::spawn(move|| {
1535 assert!(rx.recv().unwrap() == box i);
1536 recv(rx, i + 1);
1537 });
1538 }
1539 }
1540 }
1541
1542 #[test]
1543 fn recv_a_lot() {
1544 // Regression test that we don't run out of stack in scheduler context
1545 let (tx, rx) = channel();
1546 for _ in 0..10000 { tx.send(()).unwrap(); }
1547 for _ in 0..10000 { rx.recv().unwrap(); }
1548 }
1549
1550 #[test]
1551 fn shared_chan_stress() {
1552 let (tx, rx) = channel();
1553 let total = stress_factor() + 100;
1554 for _ in 0..total {
1555 let tx = tx.clone();
1556 thread::spawn(move|| {
1557 tx.send(()).unwrap();
1558 });
1559 }
1560
1561 for _ in 0..total {
1562 rx.recv().unwrap();
1563 }
1564 }
1565
1566 #[test]
1567 fn test_nested_recv_iter() {
1568 let (tx, rx) = channel::<i32>();
1569 let (total_tx, total_rx) = channel::<i32>();
1570
1571 let _t = thread::spawn(move|| {
1572 let mut acc = 0;
1573 for x in rx.iter() {
1574 acc += x;
1575 }
1576 total_tx.send(acc).unwrap();
1577 });
1578
1579 tx.send(3).unwrap();
1580 tx.send(1).unwrap();
1581 tx.send(2).unwrap();
1582 drop(tx);
1583 assert_eq!(total_rx.recv().unwrap(), 6);
1584 }
1585
1586 #[test]
1587 fn test_recv_iter_break() {
1588 let (tx, rx) = channel::<i32>();
1589 let (count_tx, count_rx) = channel();
1590
1591 let _t = thread::spawn(move|| {
1592 let mut count = 0;
1593 for x in rx.iter() {
1594 if count >= 3 {
1595 break;
1596 } else {
1597 count += x;
1598 }
1599 }
1600 count_tx.send(count).unwrap();
1601 });
1602
1603 tx.send(2).unwrap();
1604 tx.send(2).unwrap();
1605 tx.send(2).unwrap();
1606 let _ = tx.send(2);
1607 drop(tx);
1608 assert_eq!(count_rx.recv().unwrap(), 4);
1609 }
1610
1611 #[test]
1612 fn test_recv_into_iter_owned() {
1613 let mut iter = {
1614 let (tx, rx) = channel::<i32>();
1615 tx.send(1).unwrap();
1616 tx.send(2).unwrap();
1617
1618 rx.into_iter()
1619 };
1620 assert_eq!(iter.next().unwrap(), 1);
1621 assert_eq!(iter.next().unwrap(), 2);
1622 assert_eq!(iter.next().is_none(), true);
1623 }
1624
1625 #[test]
1626 fn test_recv_into_iter_borrowed() {
1627 let (tx, rx) = channel::<i32>();
1628 tx.send(1).unwrap();
1629 tx.send(2).unwrap();
1630 drop(tx);
1631 let mut iter = (&rx).into_iter();
1632 assert_eq!(iter.next().unwrap(), 1);
1633 assert_eq!(iter.next().unwrap(), 2);
1634 assert_eq!(iter.next().is_none(), true);
1635 }
1636
1637 #[test]
1638 fn try_recv_states() {
1639 let (tx1, rx1) = channel::<i32>();
1640 let (tx2, rx2) = channel::<()>();
1641 let (tx3, rx3) = channel::<()>();
1642 let _t = thread::spawn(move|| {
1643 rx2.recv().unwrap();
1644 tx1.send(1).unwrap();
1645 tx3.send(()).unwrap();
1646 rx2.recv().unwrap();
1647 drop(tx1);
1648 tx3.send(()).unwrap();
1649 });
1650
1651 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1652 tx2.send(()).unwrap();
1653 rx3.recv().unwrap();
1654 assert_eq!(rx1.try_recv(), Ok(1));
1655 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1656 tx2.send(()).unwrap();
1657 rx3.recv().unwrap();
1658 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1659 }
1660
1661 // This bug used to end up in a livelock inside of the Receiver destructor
1662 // because the internal state of the Shared packet was corrupted
1663 #[test]
1664 fn destroy_upgraded_shared_port_when_sender_still_active() {
1665 let (tx, rx) = channel();
1666 let (tx2, rx2) = channel();
1667 let _t = thread::spawn(move|| {
1668 rx.recv().unwrap(); // wait on a oneshot
1669 drop(rx); // destroy a shared
1670 tx2.send(()).unwrap();
1671 });
1672 // make sure the other thread has gone to sleep
1673 for _ in 0..5000 { thread::yield_now(); }
1674
1675 // upgrade to a shared chan and send a message
1676 let t = tx.clone();
1677 drop(tx);
1678 t.send(()).unwrap();
1679
1680 // wait for the child thread to exit before we exit
1681 rx2.recv().unwrap();
1682 }
1683 }
1684
1685 #[cfg(test)]
1686 mod sync_tests {
1687 use prelude::v1::*;
1688
1689 use env;
1690 use thread;
1691 use super::*;
1692
1693 pub fn stress_factor() -> usize {
1694 match env::var("RUST_TEST_STRESS") {
1695 Ok(val) => val.parse().unwrap(),
1696 Err(..) => 1,
1697 }
1698 }
1699
1700 #[test]
1701 fn smoke() {
1702 let (tx, rx) = sync_channel::<i32>(1);
1703 tx.send(1).unwrap();
1704 assert_eq!(rx.recv().unwrap(), 1);
1705 }
1706
1707 #[test]
1708 fn drop_full() {
1709 let (tx, _rx) = sync_channel::<Box<isize>>(1);
1710 tx.send(box 1).unwrap();
1711 }
1712
1713 #[test]
1714 fn smoke_shared() {
1715 let (tx, rx) = sync_channel::<i32>(1);
1716 tx.send(1).unwrap();
1717 assert_eq!(rx.recv().unwrap(), 1);
1718 let tx = tx.clone();
1719 tx.send(1).unwrap();
1720 assert_eq!(rx.recv().unwrap(), 1);
1721 }
1722
1723 #[test]
1724 fn smoke_threads() {
1725 let (tx, rx) = sync_channel::<i32>(0);
1726 let _t = thread::spawn(move|| {
1727 tx.send(1).unwrap();
1728 });
1729 assert_eq!(rx.recv().unwrap(), 1);
1730 }
1731
1732 #[test]
1733 fn smoke_port_gone() {
1734 let (tx, rx) = sync_channel::<i32>(0);
1735 drop(rx);
1736 assert!(tx.send(1).is_err());
1737 }
1738
1739 #[test]
1740 fn smoke_shared_port_gone2() {
1741 let (tx, rx) = sync_channel::<i32>(0);
1742 drop(rx);
1743 let tx2 = tx.clone();
1744 drop(tx);
1745 assert!(tx2.send(1).is_err());
1746 }
1747
1748 #[test]
1749 fn port_gone_concurrent() {
1750 let (tx, rx) = sync_channel::<i32>(0);
1751 let _t = thread::spawn(move|| {
1752 rx.recv().unwrap();
1753 });
1754 while tx.send(1).is_ok() {}
1755 }
1756
1757 #[test]
1758 fn port_gone_concurrent_shared() {
1759 let (tx, rx) = sync_channel::<i32>(0);
1760 let tx2 = tx.clone();
1761 let _t = thread::spawn(move|| {
1762 rx.recv().unwrap();
1763 });
1764 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1765 }
1766
1767 #[test]
1768 fn smoke_chan_gone() {
1769 let (tx, rx) = sync_channel::<i32>(0);
1770 drop(tx);
1771 assert!(rx.recv().is_err());
1772 }
1773
1774 #[test]
1775 fn smoke_chan_gone_shared() {
1776 let (tx, rx) = sync_channel::<()>(0);
1777 let tx2 = tx.clone();
1778 drop(tx);
1779 drop(tx2);
1780 assert!(rx.recv().is_err());
1781 }
1782
1783 #[test]
1784 fn chan_gone_concurrent() {
1785 let (tx, rx) = sync_channel::<i32>(0);
1786 thread::spawn(move|| {
1787 tx.send(1).unwrap();
1788 tx.send(1).unwrap();
1789 });
1790 while rx.recv().is_ok() {}
1791 }
1792
1793 #[test]
1794 fn stress() {
1795 let (tx, rx) = sync_channel::<i32>(0);
1796 thread::spawn(move|| {
1797 for _ in 0..10000 { tx.send(1).unwrap(); }
1798 });
1799 for _ in 0..10000 {
1800 assert_eq!(rx.recv().unwrap(), 1);
1801 }
1802 }
1803
1804 #[test]
1805 fn stress_shared() {
1806 const AMT: u32 = 1000;
1807 const NTHREADS: u32 = 8;
1808 let (tx, rx) = sync_channel::<i32>(0);
1809 let (dtx, drx) = sync_channel::<()>(0);
1810
1811 thread::spawn(move|| {
1812 for _ in 0..AMT * NTHREADS {
1813 assert_eq!(rx.recv().unwrap(), 1);
1814 }
1815 match rx.try_recv() {
1816 Ok(..) => panic!(),
1817 _ => {}
1818 }
1819 dtx.send(()).unwrap();
1820 });
1821
1822 for _ in 0..NTHREADS {
1823 let tx = tx.clone();
1824 thread::spawn(move|| {
1825 for _ in 0..AMT { tx.send(1).unwrap(); }
1826 });
1827 }
1828 drop(tx);
1829 drx.recv().unwrap();
1830 }
1831
1832 #[test]
1833 fn oneshot_single_thread_close_port_first() {
1834 // Simple test of closing without sending
1835 let (_tx, rx) = sync_channel::<i32>(0);
1836 drop(rx);
1837 }
1838
1839 #[test]
1840 fn oneshot_single_thread_close_chan_first() {
1841 // Simple test of closing without sending
1842 let (tx, _rx) = sync_channel::<i32>(0);
1843 drop(tx);
1844 }
1845
1846 #[test]
1847 fn oneshot_single_thread_send_port_close() {
1848 // Testing that the sender cleans up the payload if receiver is closed
1849 let (tx, rx) = sync_channel::<Box<i32>>(0);
1850 drop(rx);
1851 assert!(tx.send(box 0).is_err());
1852 }
1853
1854 #[test]
1855 fn oneshot_single_thread_recv_chan_close() {
1856 // Receiving on a closed chan will panic
1857 let res = thread::spawn(move|| {
1858 let (tx, rx) = sync_channel::<i32>(0);
1859 drop(tx);
1860 rx.recv().unwrap();
1861 }).join();
1862 // What is our res?
1863 assert!(res.is_err());
1864 }
1865
1866 #[test]
1867 fn oneshot_single_thread_send_then_recv() {
1868 let (tx, rx) = sync_channel::<Box<i32>>(1);
1869 tx.send(box 10).unwrap();
1870 assert!(rx.recv().unwrap() == box 10);
1871 }
1872
1873 #[test]
1874 fn oneshot_single_thread_try_send_open() {
1875 let (tx, rx) = sync_channel::<i32>(1);
1876 assert_eq!(tx.try_send(10), Ok(()));
1877 assert!(rx.recv().unwrap() == 10);
1878 }
1879
1880 #[test]
1881 fn oneshot_single_thread_try_send_closed() {
1882 let (tx, rx) = sync_channel::<i32>(0);
1883 drop(rx);
1884 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1885 }
1886
1887 #[test]
1888 fn oneshot_single_thread_try_send_closed2() {
1889 let (tx, _rx) = sync_channel::<i32>(0);
1890 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1891 }
1892
1893 #[test]
1894 fn oneshot_single_thread_try_recv_open() {
1895 let (tx, rx) = sync_channel::<i32>(1);
1896 tx.send(10).unwrap();
1897 assert!(rx.recv() == Ok(10));
1898 }
1899
1900 #[test]
1901 fn oneshot_single_thread_try_recv_closed() {
1902 let (tx, rx) = sync_channel::<i32>(0);
1903 drop(tx);
1904 assert!(rx.recv().is_err());
1905 }
1906
1907 #[test]
1908 fn oneshot_single_thread_peek_data() {
1909 let (tx, rx) = sync_channel::<i32>(1);
1910 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1911 tx.send(10).unwrap();
1912 assert_eq!(rx.try_recv(), Ok(10));
1913 }
1914
1915 #[test]
1916 fn oneshot_single_thread_peek_close() {
1917 let (tx, rx) = sync_channel::<i32>(0);
1918 drop(tx);
1919 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1920 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1921 }
1922
1923 #[test]
1924 fn oneshot_single_thread_peek_open() {
1925 let (_tx, rx) = sync_channel::<i32>(0);
1926 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1927 }
1928
1929 #[test]
1930 fn oneshot_multi_task_recv_then_send() {
1931 let (tx, rx) = sync_channel::<Box<i32>>(0);
1932 let _t = thread::spawn(move|| {
1933 assert!(rx.recv().unwrap() == box 10);
1934 });
1935
1936 tx.send(box 10).unwrap();
1937 }
1938
1939 #[test]
1940 fn oneshot_multi_task_recv_then_close() {
1941 let (tx, rx) = sync_channel::<Box<i32>>(0);
1942 let _t = thread::spawn(move|| {
1943 drop(tx);
1944 });
1945 let res = thread::spawn(move|| {
1946 assert!(rx.recv().unwrap() == box 10);
1947 }).join();
1948 assert!(res.is_err());
1949 }
1950
1951 #[test]
1952 fn oneshot_multi_thread_close_stress() {
1953 for _ in 0..stress_factor() {
1954 let (tx, rx) = sync_channel::<i32>(0);
1955 let _t = thread::spawn(move|| {
1956 drop(rx);
1957 });
1958 drop(tx);
1959 }
1960 }
1961
1962 #[test]
1963 fn oneshot_multi_thread_send_close_stress() {
1964 for _ in 0..stress_factor() {
1965 let (tx, rx) = sync_channel::<i32>(0);
1966 let _t = thread::spawn(move|| {
1967 drop(rx);
1968 });
1969 let _ = thread::spawn(move || {
1970 tx.send(1).unwrap();
1971 }).join();
1972 }
1973 }
1974
1975 #[test]
1976 fn oneshot_multi_thread_recv_close_stress() {
1977 for _ in 0..stress_factor() {
1978 let (tx, rx) = sync_channel::<i32>(0);
1979 let _t = thread::spawn(move|| {
1980 let res = thread::spawn(move|| {
1981 rx.recv().unwrap();
1982 }).join();
1983 assert!(res.is_err());
1984 });
1985 let _t = thread::spawn(move|| {
1986 thread::spawn(move|| {
1987 drop(tx);
1988 });
1989 });
1990 }
1991 }
1992
1993 #[test]
1994 fn oneshot_multi_thread_send_recv_stress() {
1995 for _ in 0..stress_factor() {
1996 let (tx, rx) = sync_channel::<Box<i32>>(0);
1997 let _t = thread::spawn(move|| {
1998 tx.send(box 10).unwrap();
1999 });
2000 assert!(rx.recv().unwrap() == box 10);
2001 }
2002 }
2003
2004 #[test]
2005 fn stream_send_recv_stress() {
2006 for _ in 0..stress_factor() {
2007 let (tx, rx) = sync_channel::<Box<i32>>(0);
2008
2009 send(tx, 0);
2010 recv(rx, 0);
2011
2012 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2013 if i == 10 { return }
2014
2015 thread::spawn(move|| {
2016 tx.send(box i).unwrap();
2017 send(tx, i + 1);
2018 });
2019 }
2020
2021 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2022 if i == 10 { return }
2023
2024 thread::spawn(move|| {
2025 assert!(rx.recv().unwrap() == box i);
2026 recv(rx, i + 1);
2027 });
2028 }
2029 }
2030 }
2031
2032 #[test]
2033 fn recv_a_lot() {
2034 // Regression test that we don't run out of stack in scheduler context
2035 let (tx, rx) = sync_channel(10000);
2036 for _ in 0..10000 { tx.send(()).unwrap(); }
2037 for _ in 0..10000 { rx.recv().unwrap(); }
2038 }
2039
2040 #[test]
2041 fn shared_chan_stress() {
2042 let (tx, rx) = sync_channel(0);
2043 let total = stress_factor() + 100;
2044 for _ in 0..total {
2045 let tx = tx.clone();
2046 thread::spawn(move|| {
2047 tx.send(()).unwrap();
2048 });
2049 }
2050
2051 for _ in 0..total {
2052 rx.recv().unwrap();
2053 }
2054 }
2055
2056 #[test]
2057 fn test_nested_recv_iter() {
2058 let (tx, rx) = sync_channel::<i32>(0);
2059 let (total_tx, total_rx) = sync_channel::<i32>(0);
2060
2061 let _t = thread::spawn(move|| {
2062 let mut acc = 0;
2063 for x in rx.iter() {
2064 acc += x;
2065 }
2066 total_tx.send(acc).unwrap();
2067 });
2068
2069 tx.send(3).unwrap();
2070 tx.send(1).unwrap();
2071 tx.send(2).unwrap();
2072 drop(tx);
2073 assert_eq!(total_rx.recv().unwrap(), 6);
2074 }
2075
2076 #[test]
2077 fn test_recv_iter_break() {
2078 let (tx, rx) = sync_channel::<i32>(0);
2079 let (count_tx, count_rx) = sync_channel(0);
2080
2081 let _t = thread::spawn(move|| {
2082 let mut count = 0;
2083 for x in rx.iter() {
2084 if count >= 3 {
2085 break;
2086 } else {
2087 count += x;
2088 }
2089 }
2090 count_tx.send(count).unwrap();
2091 });
2092
2093 tx.send(2).unwrap();
2094 tx.send(2).unwrap();
2095 tx.send(2).unwrap();
2096 let _ = tx.try_send(2);
2097 drop(tx);
2098 assert_eq!(count_rx.recv().unwrap(), 4);
2099 }
2100
2101 #[test]
2102 fn try_recv_states() {
2103 let (tx1, rx1) = sync_channel::<i32>(1);
2104 let (tx2, rx2) = sync_channel::<()>(1);
2105 let (tx3, rx3) = sync_channel::<()>(1);
2106 let _t = thread::spawn(move|| {
2107 rx2.recv().unwrap();
2108 tx1.send(1).unwrap();
2109 tx3.send(()).unwrap();
2110 rx2.recv().unwrap();
2111 drop(tx1);
2112 tx3.send(()).unwrap();
2113 });
2114
2115 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2116 tx2.send(()).unwrap();
2117 rx3.recv().unwrap();
2118 assert_eq!(rx1.try_recv(), Ok(1));
2119 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2120 tx2.send(()).unwrap();
2121 rx3.recv().unwrap();
2122 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2123 }
2124
2125 // This bug used to end up in a livelock inside of the Receiver destructor
2126 // because the internal state of the Shared packet was corrupted
2127 #[test]
2128 fn destroy_upgraded_shared_port_when_sender_still_active() {
2129 let (tx, rx) = sync_channel::<()>(0);
2130 let (tx2, rx2) = sync_channel::<()>(0);
2131 let _t = thread::spawn(move|| {
2132 rx.recv().unwrap(); // wait on a oneshot
2133 drop(rx); // destroy a shared
2134 tx2.send(()).unwrap();
2135 });
2136 // make sure the other thread has gone to sleep
2137 for _ in 0..5000 { thread::yield_now(); }
2138
2139 // upgrade to a shared chan and send a message
2140 let t = tx.clone();
2141 drop(tx);
2142 t.send(()).unwrap();
2143
2144 // wait for the child thread to exit before we exit
2145 rx2.recv().unwrap();
2146 }
2147
2148 #[test]
2149 fn send1() {
2150 let (tx, rx) = sync_channel::<i32>(0);
2151 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2152 assert_eq!(tx.send(1), Ok(()));
2153 }
2154
2155 #[test]
2156 fn send2() {
2157 let (tx, rx) = sync_channel::<i32>(0);
2158 let _t = thread::spawn(move|| { drop(rx); });
2159 assert!(tx.send(1).is_err());
2160 }
2161
2162 #[test]
2163 fn send3() {
2164 let (tx, rx) = sync_channel::<i32>(1);
2165 assert_eq!(tx.send(1), Ok(()));
2166 let _t =thread::spawn(move|| { drop(rx); });
2167 assert!(tx.send(1).is_err());
2168 }
2169
2170 #[test]
2171 fn send4() {
2172 let (tx, rx) = sync_channel::<i32>(0);
2173 let tx2 = tx.clone();
2174 let (done, donerx) = channel();
2175 let done2 = done.clone();
2176 let _t = thread::spawn(move|| {
2177 assert!(tx.send(1).is_err());
2178 done.send(()).unwrap();
2179 });
2180 let _t = thread::spawn(move|| {
2181 assert!(tx2.send(2).is_err());
2182 done2.send(()).unwrap();
2183 });
2184 drop(rx);
2185 donerx.recv().unwrap();
2186 donerx.recv().unwrap();
2187 }
2188
2189 #[test]
2190 fn try_send1() {
2191 let (tx, _rx) = sync_channel::<i32>(0);
2192 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2193 }
2194
2195 #[test]
2196 fn try_send2() {
2197 let (tx, _rx) = sync_channel::<i32>(1);
2198 assert_eq!(tx.try_send(1), Ok(()));
2199 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2200 }
2201
2202 #[test]
2203 fn try_send3() {
2204 let (tx, rx) = sync_channel::<i32>(1);
2205 assert_eq!(tx.try_send(1), Ok(()));
2206 drop(rx);
2207 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2208 }
2209
2210 #[test]
2211 fn issue_15761() {
2212 fn repro() {
2213 let (tx1, rx1) = sync_channel::<()>(3);
2214 let (tx2, rx2) = sync_channel::<()>(3);
2215
2216 let _t = thread::spawn(move|| {
2217 rx1.recv().unwrap();
2218 tx2.try_send(()).unwrap();
2219 });
2220
2221 tx1.try_send(()).unwrap();
2222 rx2.recv().unwrap();
2223 }
2224
2225 for _ in 0..100 {
2226 repro()
2227 }
2228 }
2229
2230 #[test]
2231 fn fmt_debug_sender() {
2232 let (tx, _) = channel::<i32>();
2233 assert_eq!(format!("{:?}", tx), "Sender { .. }");
2234 }
2235
2236 #[test]
2237 fn fmt_debug_recv() {
2238 let (_, rx) = channel::<i32>();
2239 assert_eq!(format!("{:?}", rx), "Receiver { .. }");
2240 }
2241
2242 #[test]
2243 fn fmt_debug_sync_sender() {
2244 let (tx, _) = sync_channel::<i32>(1);
2245 assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
2246 }
2247 }