]> git.proxmox.com Git - rustc.git/blob - src/libstd/sync/mpsc/mod.rs
Imported Upstream version 1.6.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
29 //! infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
37 //! a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //! tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //! let tx = tx.clone();
79 //! thread::spawn(move|| {
80 //! tx.send(i).unwrap();
81 //! });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //! // This will wait for the parent thread to start receiving
111 //! tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115
116 #![stable(feature = "rust1", since = "1.0.0")]
117
118 // A description of how Rust's channel implementation works
119 //
120 // Channels are supposed to be the basic building block for all other
121 // concurrent primitives that are used in Rust. As a result, the channel type
122 // needs to be highly optimized, flexible, and broad enough for use everywhere.
123 //
124 // The choice of implementation of all channels is to be built on lock-free data
125 // structures. The channels themselves are then consequently also lock-free data
126 // structures. As always with lock-free code, this is a very "here be dragons"
127 // territory, especially because I'm unaware of any academic papers that have
128 // gone into great length about channels of these flavors.
129 //
130 // ## Flavors of channels
131 //
132 // From the perspective of a consumer of this library, there is only one flavor
133 // of channel. This channel can be used as a stream and cloned to allow multiple
134 // senders. Under the hood, however, there are actually three flavors of
135 // channels in play.
136 //
137 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
138 // They contain as few atomics as possible and involve one and
139 // exactly one allocation.
140 // * Streams - these channels are optimized for the non-shared use case. They
141 // use a different concurrent queue that is more tailored for this
142 // use case. The initial allocation of this flavor of channel is not
143 // optimized.
144 // * Shared - this is the most general form of channel that this module offers,
145 // a channel with multiple senders. This type is as optimized as it
146 // can be, but the previous two types mentioned are much faster for
147 // their use-cases.
148 //
149 // ## Concurrent queues
150 //
151 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
152 // recv() obviously blocks. This means that under the hood there must be some
153 // shared and concurrent queue holding all of the actual data.
154 //
155 // With two flavors of channels, two flavors of queues are also used. We have
156 // chosen to use queues from a well-known author that are abbreviated as SPSC
157 // and MPSC (single producer, single consumer and multiple producer, single
158 // consumer). SPSC queues are used for streams while MPSC queues are used for
159 // shared channels.
160 //
161 // ### SPSC optimizations
162 //
163 // The SPSC queue found online is essentially a linked list of nodes where one
164 // half of the nodes are the "queue of data" and the other half of nodes are a
165 // cache of unused nodes. The unused nodes are used such that an allocation is
166 // not required on every push() and a free doesn't need to happen on every
167 // pop().
168 //
169 // As found online, however, the cache of nodes is of an infinite size. This
170 // means that if a channel at one point in its life had 50k items in the queue,
171 // then the queue will always have the capacity for 50k items. I believed that
172 // this was an unnecessary limitation of the implementation, so I have altered
173 // the queue to optionally have a bound on the cache size.
174 //
175 // By default, streams will have an unbounded SPSC queue with a small-ish cache
176 // size. The hope is that the cache is still large enough to have very fast
177 // send() operations while not too large such that millions of channels can
178 // coexist at once.
179 //
180 // ### MPSC optimizations
181 //
182 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183 // a linked list under the hood to earn its unboundedness, but I have not put
184 // forth much effort into having a cache of nodes similar to the SPSC queue.
185 //
186 // For now, I believe that this is "ok" because shared channels are not the most
187 // common type, but soon we may wish to revisit this queue choice and determine
188 // another candidate for backend storage of shared channels.
189 //
190 // ## Overview of the Implementation
191 //
192 // Now that there's a little background on the concurrent queues used, it's
193 // worth going into much more detail about the channels themselves. The basic
194 // pseudocode for a send/recv are:
195 //
196 //
197 // send(t) recv()
198 // queue.push(t) return if queue.pop()
199 // if increment() == -1 deschedule {
200 // wakeup() if decrement() > 0
201 // cancel_deschedule()
202 // }
203 // queue.pop()
204 //
205 // As mentioned before, there are no locks in this implementation, only atomic
206 // instructions are used.
207 //
208 // ### The internal atomic counter
209 //
210 // Every channel has a shared counter with each half to keep track of the size
211 // of the queue. This counter is used to abort descheduling by the receiver and
212 // to know when to wake up on the sending side.
213 //
214 // As seen in the pseudocode, senders will increment this count and receivers
215 // will decrement the count. The theory behind this is that if a sender sees a
216 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217 // then it doesn't need to block.
218 //
219 // The recv() method has a beginning call to pop(), and if successful, it needs
220 // to decrement the count. It is a crucial implementation detail that this
221 // decrement does *not* happen to the shared counter. If this were the case,
222 // then it would be possible for the counter to be very negative when there were
223 // no receivers waiting, in which case the senders would have to determine when
224 // it was actually appropriate to wake up a receiver.
225 //
226 // Instead, the "steal count" is kept track of separately (not atomically
227 // because it's only used by receivers), and then the decrement() call when
228 // descheduling will lump in all of the recent steals into one large decrement.
229 //
230 // The implication of this is that if a sender sees a -1 count, then there's
231 // guaranteed to be a waiter waiting!
232 //
233 // ## Native Implementation
234 //
235 // A major goal of these channels is to work seamlessly on and off the runtime.
236 // All of the previous race conditions have been worded in terms of
237 // scheduler-isms (which is obviously not available without the runtime).
238 //
239 // For now, native usage of channels (off the runtime) will fall back onto
240 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
242 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243 // condition variable.
244 //
245 // ## Select
246 //
247 // Being able to support selection over channels has greatly influenced this
248 // design, and not only does selection need to work inside the runtime, but also
249 // outside the runtime.
250 //
251 // The implementation is fairly straightforward. The goal of select() is not to
252 // return some data, but only to return which channel can receive data without
253 // blocking. The implementation is essentially the entire blocking procedure
254 // followed by an increment as soon as its woken up. The cancellation procedure
255 // involves an increment and swapping out of to_wake to acquire ownership of the
256 // thread to unblock.
257 //
258 // Sadly this current implementation requires multiple allocations, so I have
259 // seen the throughput of select() be much worse than it should be. I do not
260 // believe that there is anything fundamental that needs to change about these
261 // channels, however, in order to support a more efficient select().
262 //
263 // # Conclusion
264 //
265 // And now that you've seen all the races that I found and attempted to fix,
266 // here's the code for you to find some more!
267
268 use sync::Arc;
269 use error;
270 use fmt;
271 use mem;
272 use cell::UnsafeCell;
273 use marker::Reflect;
274
275 #[unstable(feature = "mpsc_select", issue = "27800")]
276 pub use self::select::{Select, Handle};
277 use self::select::StartResult;
278 use self::select::StartResult::*;
279 use self::blocking::SignalToken;
280
281 mod blocking;
282 mod oneshot;
283 mod select;
284 mod shared;
285 mod stream;
286 mod sync;
287 mod mpsc_queue;
288 mod spsc_queue;
289
290 /// The receiving-half of Rust's channel type. This half can only be owned by
291 /// one thread
292 #[stable(feature = "rust1", since = "1.0.0")]
293 pub struct Receiver<T> {
294 inner: UnsafeCell<Flavor<T>>,
295 }
296
297 // The receiver port can be sent from place to place, so long as it
298 // is not used to receive non-sendable things.
299 #[stable(feature = "rust1", since = "1.0.0")]
300 unsafe impl<T: Send> Send for Receiver<T> { }
301
302 /// An iterator over messages on a receiver, this iterator will block
303 /// whenever `next` is called, waiting for a new message, and `None` will be
304 /// returned when the corresponding channel has hung up.
305 #[stable(feature = "rust1", since = "1.0.0")]
306 pub struct Iter<'a, T: 'a> {
307 rx: &'a Receiver<T>
308 }
309
310 /// An owning iterator over messages on a receiver, this iterator will block
311 /// whenever `next` is called, waiting for a new message, and `None` will be
312 /// returned when the corresponding channel has hung up.
313 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
314 pub struct IntoIter<T> {
315 rx: Receiver<T>
316 }
317
318 /// The sending-half of Rust's asynchronous channel type. This half can only be
319 /// owned by one thread, but it can be cloned to send to other threads.
320 #[stable(feature = "rust1", since = "1.0.0")]
321 pub struct Sender<T> {
322 inner: UnsafeCell<Flavor<T>>,
323 }
324
325 // The send port can be sent from place to place, so long as it
326 // is not used to send non-sendable things.
327 #[stable(feature = "rust1", since = "1.0.0")]
328 unsafe impl<T: Send> Send for Sender<T> { }
329
330 /// The sending-half of Rust's synchronous channel type. This half can only be
331 /// owned by one thread, but it can be cloned to send to other threads.
332 #[stable(feature = "rust1", since = "1.0.0")]
333 pub struct SyncSender<T> {
334 inner: Arc<UnsafeCell<sync::Packet<T>>>,
335 }
336
337 #[stable(feature = "rust1", since = "1.0.0")]
338 unsafe impl<T: Send> Send for SyncSender<T> {}
339
340 #[stable(feature = "rust1", since = "1.0.0")]
341 impl<T> !Sync for SyncSender<T> {}
342
343 /// An error returned from the `send` function on channels.
344 ///
345 /// A `send` operation can only fail if the receiving end of a channel is
346 /// disconnected, implying that the data could never be received. The error
347 /// contains the data being sent as a payload so it can be recovered.
348 #[stable(feature = "rust1", since = "1.0.0")]
349 #[derive(PartialEq, Eq, Clone, Copy)]
350 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
351
352 /// An error returned from the `recv` function on a `Receiver`.
353 ///
354 /// The `recv` operation can only fail if the sending half of a channel is
355 /// disconnected, implying that no further messages will ever be received.
356 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
357 #[stable(feature = "rust1", since = "1.0.0")]
358 pub struct RecvError;
359
360 /// This enumeration is the list of the possible reasons that `try_recv` could
361 /// not return data when called.
362 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
363 #[stable(feature = "rust1", since = "1.0.0")]
364 pub enum TryRecvError {
365 /// This channel is currently empty, but the sender(s) have not yet
366 /// disconnected, so data may yet become available.
367 #[stable(feature = "rust1", since = "1.0.0")]
368 Empty,
369
370 /// This channel's sending half has become disconnected, and there will
371 /// never be any more data received on this channel
372 #[stable(feature = "rust1", since = "1.0.0")]
373 Disconnected,
374 }
375
376 /// This enumeration is the list of the possible error outcomes for the
377 /// `SyncSender::try_send` method.
378 #[stable(feature = "rust1", since = "1.0.0")]
379 #[derive(PartialEq, Eq, Clone, Copy)]
380 pub enum TrySendError<T> {
381 /// The data could not be sent on the channel because it would require that
382 /// the callee block to send the data.
383 ///
384 /// If this is a buffered channel, then the buffer is full at this time. If
385 /// this is not a buffered channel, then there is no receiver available to
386 /// acquire the data.
387 #[stable(feature = "rust1", since = "1.0.0")]
388 Full(T),
389
390 /// This channel's receiving half has disconnected, so the data could not be
391 /// sent. The data is returned back to the callee in this case.
392 #[stable(feature = "rust1", since = "1.0.0")]
393 Disconnected(T),
394 }
395
396 enum Flavor<T> {
397 Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
398 Stream(Arc<UnsafeCell<stream::Packet<T>>>),
399 Shared(Arc<UnsafeCell<shared::Packet<T>>>),
400 Sync(Arc<UnsafeCell<sync::Packet<T>>>),
401 }
402
403 #[doc(hidden)]
404 trait UnsafeFlavor<T> {
405 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
406 unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
407 &mut *self.inner_unsafe().get()
408 }
409 unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
410 &*self.inner_unsafe().get()
411 }
412 }
413 impl<T> UnsafeFlavor<T> for Sender<T> {
414 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
415 &self.inner
416 }
417 }
418 impl<T> UnsafeFlavor<T> for Receiver<T> {
419 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
420 &self.inner
421 }
422 }
423
424 /// Creates a new asynchronous channel, returning the sender/receiver halves.
425 ///
426 /// All data sent on the sender will become available on the receiver, and no
427 /// send will block the calling thread (this channel has an "infinite buffer").
428 ///
429 /// # Examples
430 ///
431 /// ```
432 /// use std::sync::mpsc::channel;
433 /// use std::thread;
434 ///
435 /// // tx is the sending half (tx for transmission), and rx is the receiving
436 /// // half (rx for receiving).
437 /// let (tx, rx) = channel();
438 ///
439 /// // Spawn off an expensive computation
440 /// thread::spawn(move|| {
441 /// # fn expensive_computation() {}
442 /// tx.send(expensive_computation()).unwrap();
443 /// });
444 ///
445 /// // Do some useful work for awhile
446 ///
447 /// // Let's see what that answer was
448 /// println!("{:?}", rx.recv().unwrap());
449 /// ```
450 #[stable(feature = "rust1", since = "1.0.0")]
451 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
452 let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
453 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
454 }
455
456 /// Creates a new synchronous, bounded channel.
457 ///
458 /// Like asynchronous channels, the `Receiver` will block until a message
459 /// becomes available. These channels differ greatly in the semantics of the
460 /// sender from asynchronous channels, however.
461 ///
462 /// This channel has an internal buffer on which messages will be queued. When
463 /// the internal buffer becomes full, future sends will *block* waiting for the
464 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
465 /// becomes "rendezvous channel" where each send will not return until a recv
466 /// is paired with it.
467 ///
468 /// As with asynchronous channels, all senders will panic in `send` if the
469 /// `Receiver` has been destroyed.
470 ///
471 /// # Examples
472 ///
473 /// ```
474 /// use std::sync::mpsc::sync_channel;
475 /// use std::thread;
476 ///
477 /// let (tx, rx) = sync_channel(1);
478 ///
479 /// // this returns immediately
480 /// tx.send(1).unwrap();
481 ///
482 /// thread::spawn(move|| {
483 /// // this will block until the previous message has been received
484 /// tx.send(2).unwrap();
485 /// });
486 ///
487 /// assert_eq!(rx.recv().unwrap(), 1);
488 /// assert_eq!(rx.recv().unwrap(), 2);
489 /// ```
490 #[stable(feature = "rust1", since = "1.0.0")]
491 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
492 let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
493 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
494 }
495
496 ////////////////////////////////////////////////////////////////////////////////
497 // Sender
498 ////////////////////////////////////////////////////////////////////////////////
499
500 impl<T> Sender<T> {
501 fn new(inner: Flavor<T>) -> Sender<T> {
502 Sender {
503 inner: UnsafeCell::new(inner),
504 }
505 }
506
507 /// Attempts to send a value on this channel, returning it back if it could
508 /// not be sent.
509 ///
510 /// A successful send occurs when it is determined that the other end of
511 /// the channel has not hung up already. An unsuccessful send would be one
512 /// where the corresponding receiver has already been deallocated. Note
513 /// that a return value of `Err` means that the data will never be
514 /// received, but a return value of `Ok` does *not* mean that the data
515 /// will be received. It is possible for the corresponding receiver to
516 /// hang up immediately after this function returns `Ok`.
517 ///
518 /// This method will never block the current thread.
519 ///
520 /// # Examples
521 ///
522 /// ```
523 /// use std::sync::mpsc::channel;
524 ///
525 /// let (tx, rx) = channel();
526 ///
527 /// // This send is always successful
528 /// tx.send(1).unwrap();
529 ///
530 /// // This send will fail because the receiver is gone
531 /// drop(rx);
532 /// assert_eq!(tx.send(1).err().unwrap().0, 1);
533 /// ```
534 #[stable(feature = "rust1", since = "1.0.0")]
535 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
536 let (new_inner, ret) = match *unsafe { self.inner() } {
537 Flavor::Oneshot(ref p) => {
538 unsafe {
539 let p = p.get();
540 if !(*p).sent() {
541 return (*p).send(t).map_err(SendError);
542 } else {
543 let a =
544 Arc::new(UnsafeCell::new(stream::Packet::new()));
545 let rx = Receiver::new(Flavor::Stream(a.clone()));
546 match (*p).upgrade(rx) {
547 oneshot::UpSuccess => {
548 let ret = (*a.get()).send(t);
549 (a, ret)
550 }
551 oneshot::UpDisconnected => (a, Err(t)),
552 oneshot::UpWoke(token) => {
553 // This send cannot panic because the thread is
554 // asleep (we're looking at it), so the receiver
555 // can't go away.
556 (*a.get()).send(t).ok().unwrap();
557 token.signal();
558 (a, Ok(()))
559 }
560 }
561 }
562 }
563 }
564 Flavor::Stream(ref p) => return unsafe {
565 (*p.get()).send(t).map_err(SendError)
566 },
567 Flavor::Shared(ref p) => return unsafe {
568 (*p.get()).send(t).map_err(SendError)
569 },
570 Flavor::Sync(..) => unreachable!(),
571 };
572
573 unsafe {
574 let tmp = Sender::new(Flavor::Stream(new_inner));
575 mem::swap(self.inner_mut(), tmp.inner_mut());
576 }
577 ret.map_err(SendError)
578 }
579 }
580
581 #[stable(feature = "rust1", since = "1.0.0")]
582 impl<T> Clone for Sender<T> {
583 fn clone(&self) -> Sender<T> {
584 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
585 Flavor::Oneshot(ref p) => {
586 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
587 unsafe {
588 let guard = (*a.get()).postinit_lock();
589 let rx = Receiver::new(Flavor::Shared(a.clone()));
590 match (*p.get()).upgrade(rx) {
591 oneshot::UpSuccess |
592 oneshot::UpDisconnected => (a, None, guard),
593 oneshot::UpWoke(task) => (a, Some(task), guard)
594 }
595 }
596 }
597 Flavor::Stream(ref p) => {
598 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
599 unsafe {
600 let guard = (*a.get()).postinit_lock();
601 let rx = Receiver::new(Flavor::Shared(a.clone()));
602 match (*p.get()).upgrade(rx) {
603 stream::UpSuccess |
604 stream::UpDisconnected => (a, None, guard),
605 stream::UpWoke(task) => (a, Some(task), guard),
606 }
607 }
608 }
609 Flavor::Shared(ref p) => {
610 unsafe { (*p.get()).clone_chan(); }
611 return Sender::new(Flavor::Shared(p.clone()));
612 }
613 Flavor::Sync(..) => unreachable!(),
614 };
615
616 unsafe {
617 (*packet.get()).inherit_blocker(sleeper, guard);
618
619 let tmp = Sender::new(Flavor::Shared(packet.clone()));
620 mem::swap(self.inner_mut(), tmp.inner_mut());
621 }
622 Sender::new(Flavor::Shared(packet))
623 }
624 }
625
626 #[stable(feature = "rust1", since = "1.0.0")]
627 impl<T> Drop for Sender<T> {
628 fn drop(&mut self) {
629 match *unsafe { self.inner_mut() } {
630 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
631 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
632 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
633 Flavor::Sync(..) => unreachable!(),
634 }
635 }
636 }
637
638 ////////////////////////////////////////////////////////////////////////////////
639 // SyncSender
640 ////////////////////////////////////////////////////////////////////////////////
641
642 impl<T> SyncSender<T> {
643 fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
644 SyncSender { inner: inner }
645 }
646
647 /// Sends a value on this synchronous channel.
648 ///
649 /// This function will *block* until space in the internal buffer becomes
650 /// available or a receiver is available to hand off the message to.
651 ///
652 /// Note that a successful send does *not* guarantee that the receiver will
653 /// ever see the data if there is a buffer on this channel. Items may be
654 /// enqueued in the internal buffer for the receiver to receive at a later
655 /// time. If the buffer size is 0, however, it can be guaranteed that the
656 /// receiver has indeed received the data if this function returns success.
657 ///
658 /// This function will never panic, but it may return `Err` if the
659 /// `Receiver` has disconnected and is no longer able to receive
660 /// information.
661 #[stable(feature = "rust1", since = "1.0.0")]
662 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
663 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
664 }
665
666 /// Attempts to send a value on this channel without blocking.
667 ///
668 /// This method differs from `send` by returning immediately if the
669 /// channel's buffer is full or no receiver is waiting to acquire some
670 /// data. Compared with `send`, this function has two failure cases
671 /// instead of one (one for disconnection, one for a full buffer).
672 ///
673 /// See `SyncSender::send` for notes about guarantees of whether the
674 /// receiver has received the data or not if this function is successful.
675 #[stable(feature = "rust1", since = "1.0.0")]
676 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
677 unsafe { (*self.inner.get()).try_send(t) }
678 }
679 }
680
681 #[stable(feature = "rust1", since = "1.0.0")]
682 impl<T> Clone for SyncSender<T> {
683 fn clone(&self) -> SyncSender<T> {
684 unsafe { (*self.inner.get()).clone_chan(); }
685 SyncSender::new(self.inner.clone())
686 }
687 }
688
689 #[stable(feature = "rust1", since = "1.0.0")]
690 impl<T> Drop for SyncSender<T> {
691 fn drop(&mut self) {
692 unsafe { (*self.inner.get()).drop_chan(); }
693 }
694 }
695
696 ////////////////////////////////////////////////////////////////////////////////
697 // Receiver
698 ////////////////////////////////////////////////////////////////////////////////
699
700 impl<T> Receiver<T> {
701 fn new(inner: Flavor<T>) -> Receiver<T> {
702 Receiver { inner: UnsafeCell::new(inner) }
703 }
704
705 /// Attempts to return a pending value on this receiver without blocking
706 ///
707 /// This method will never block the caller in order to wait for data to
708 /// become available. Instead, this will always return immediately with a
709 /// possible option of pending data on the channel.
710 ///
711 /// This is useful for a flavor of "optimistic check" before deciding to
712 /// block on a receiver.
713 #[stable(feature = "rust1", since = "1.0.0")]
714 pub fn try_recv(&self) -> Result<T, TryRecvError> {
715 loop {
716 let new_port = match *unsafe { self.inner() } {
717 Flavor::Oneshot(ref p) => {
718 match unsafe { (*p.get()).try_recv() } {
719 Ok(t) => return Ok(t),
720 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
721 Err(oneshot::Disconnected) => {
722 return Err(TryRecvError::Disconnected)
723 }
724 Err(oneshot::Upgraded(rx)) => rx,
725 }
726 }
727 Flavor::Stream(ref p) => {
728 match unsafe { (*p.get()).try_recv() } {
729 Ok(t) => return Ok(t),
730 Err(stream::Empty) => return Err(TryRecvError::Empty),
731 Err(stream::Disconnected) => {
732 return Err(TryRecvError::Disconnected)
733 }
734 Err(stream::Upgraded(rx)) => rx,
735 }
736 }
737 Flavor::Shared(ref p) => {
738 match unsafe { (*p.get()).try_recv() } {
739 Ok(t) => return Ok(t),
740 Err(shared::Empty) => return Err(TryRecvError::Empty),
741 Err(shared::Disconnected) => {
742 return Err(TryRecvError::Disconnected)
743 }
744 }
745 }
746 Flavor::Sync(ref p) => {
747 match unsafe { (*p.get()).try_recv() } {
748 Ok(t) => return Ok(t),
749 Err(sync::Empty) => return Err(TryRecvError::Empty),
750 Err(sync::Disconnected) => {
751 return Err(TryRecvError::Disconnected)
752 }
753 }
754 }
755 };
756 unsafe {
757 mem::swap(self.inner_mut(),
758 new_port.inner_mut());
759 }
760 }
761 }
762
763 /// Attempts to wait for a value on this receiver, returning an error if the
764 /// corresponding channel has hung up.
765 ///
766 /// This function will always block the current thread if there is no data
767 /// available and it's possible for more data to be sent. Once a message is
768 /// sent to the corresponding `Sender`, then this receiver will wake up and
769 /// return that message.
770 ///
771 /// If the corresponding `Sender` has disconnected, or it disconnects while
772 /// this call is blocking, this call will wake up and return `Err` to
773 /// indicate that no more messages can ever be received on this channel.
774 /// However, since channels are buffered, messages sent before the disconnect
775 /// will still be properly received.
776 ///
777 /// # Examples
778 ///
779 /// ```
780 /// use std::sync::mpsc;
781 /// use std::thread;
782 ///
783 /// let (send, recv) = mpsc::channel();
784 /// let handle = thread::spawn(move || {
785 /// send.send(1u8).unwrap();
786 /// });
787 ///
788 /// handle.join().unwrap();
789 ///
790 /// assert_eq!(Ok(1), recv.recv());
791 /// ```
792 ///
793 /// Buffering behavior:
794 ///
795 /// ```
796 /// use std::sync::mpsc;
797 /// use std::thread;
798 /// use std::sync::mpsc::RecvError;
799 ///
800 /// let (send, recv) = mpsc::channel();
801 /// let handle = thread::spawn(move || {
802 /// send.send(1u8).unwrap();
803 /// send.send(2).unwrap();
804 /// send.send(3).unwrap();
805 /// drop(send);
806 /// });
807 ///
808 /// // wait for the thread to join so we ensure the sender is dropped
809 /// handle.join().unwrap();
810 ///
811 /// assert_eq!(Ok(1), recv.recv());
812 /// assert_eq!(Ok(2), recv.recv());
813 /// assert_eq!(Ok(3), recv.recv());
814 /// assert_eq!(Err(RecvError), recv.recv());
815 /// ```
816 #[stable(feature = "rust1", since = "1.0.0")]
817 pub fn recv(&self) -> Result<T, RecvError> {
818 loop {
819 let new_port = match *unsafe { self.inner() } {
820 Flavor::Oneshot(ref p) => {
821 match unsafe { (*p.get()).recv() } {
822 Ok(t) => return Ok(t),
823 Err(oneshot::Empty) => return unreachable!(),
824 Err(oneshot::Disconnected) => return Err(RecvError),
825 Err(oneshot::Upgraded(rx)) => rx,
826 }
827 }
828 Flavor::Stream(ref p) => {
829 match unsafe { (*p.get()).recv() } {
830 Ok(t) => return Ok(t),
831 Err(stream::Empty) => return unreachable!(),
832 Err(stream::Disconnected) => return Err(RecvError),
833 Err(stream::Upgraded(rx)) => rx,
834 }
835 }
836 Flavor::Shared(ref p) => {
837 match unsafe { (*p.get()).recv() } {
838 Ok(t) => return Ok(t),
839 Err(shared::Empty) => return unreachable!(),
840 Err(shared::Disconnected) => return Err(RecvError),
841 }
842 }
843 Flavor::Sync(ref p) => return unsafe {
844 (*p.get()).recv().map_err(|()| RecvError)
845 }
846 };
847 unsafe {
848 mem::swap(self.inner_mut(), new_port.inner_mut());
849 }
850 }
851 }
852
853 /// Returns an iterator that will block waiting for messages, but never
854 /// `panic!`. It will return `None` when the channel has hung up.
855 #[stable(feature = "rust1", since = "1.0.0")]
856 pub fn iter(&self) -> Iter<T> {
857 Iter { rx: self }
858 }
859 }
860
861 impl<T> select::Packet for Receiver<T> {
862 fn can_recv(&self) -> bool {
863 loop {
864 let new_port = match *unsafe { self.inner() } {
865 Flavor::Oneshot(ref p) => {
866 match unsafe { (*p.get()).can_recv() } {
867 Ok(ret) => return ret,
868 Err(upgrade) => upgrade,
869 }
870 }
871 Flavor::Stream(ref p) => {
872 match unsafe { (*p.get()).can_recv() } {
873 Ok(ret) => return ret,
874 Err(upgrade) => upgrade,
875 }
876 }
877 Flavor::Shared(ref p) => {
878 return unsafe { (*p.get()).can_recv() };
879 }
880 Flavor::Sync(ref p) => {
881 return unsafe { (*p.get()).can_recv() };
882 }
883 };
884 unsafe {
885 mem::swap(self.inner_mut(),
886 new_port.inner_mut());
887 }
888 }
889 }
890
891 fn start_selection(&self, mut token: SignalToken) -> StartResult {
892 loop {
893 let (t, new_port) = match *unsafe { self.inner() } {
894 Flavor::Oneshot(ref p) => {
895 match unsafe { (*p.get()).start_selection(token) } {
896 oneshot::SelSuccess => return Installed,
897 oneshot::SelCanceled => return Abort,
898 oneshot::SelUpgraded(t, rx) => (t, rx),
899 }
900 }
901 Flavor::Stream(ref p) => {
902 match unsafe { (*p.get()).start_selection(token) } {
903 stream::SelSuccess => return Installed,
904 stream::SelCanceled => return Abort,
905 stream::SelUpgraded(t, rx) => (t, rx),
906 }
907 }
908 Flavor::Shared(ref p) => {
909 return unsafe { (*p.get()).start_selection(token) };
910 }
911 Flavor::Sync(ref p) => {
912 return unsafe { (*p.get()).start_selection(token) };
913 }
914 };
915 token = t;
916 unsafe {
917 mem::swap(self.inner_mut(), new_port.inner_mut());
918 }
919 }
920 }
921
922 fn abort_selection(&self) -> bool {
923 let mut was_upgrade = false;
924 loop {
925 let result = match *unsafe { self.inner() } {
926 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
927 Flavor::Stream(ref p) => unsafe {
928 (*p.get()).abort_selection(was_upgrade)
929 },
930 Flavor::Shared(ref p) => return unsafe {
931 (*p.get()).abort_selection(was_upgrade)
932 },
933 Flavor::Sync(ref p) => return unsafe {
934 (*p.get()).abort_selection()
935 },
936 };
937 let new_port = match result { Ok(b) => return b, Err(p) => p };
938 was_upgrade = true;
939 unsafe {
940 mem::swap(self.inner_mut(),
941 new_port.inner_mut());
942 }
943 }
944 }
945 }
946
947 #[stable(feature = "rust1", since = "1.0.0")]
948 impl<'a, T> Iterator for Iter<'a, T> {
949 type Item = T;
950
951 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
952 }
953
954 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
955 impl<'a, T> IntoIterator for &'a Receiver<T> {
956 type Item = T;
957 type IntoIter = Iter<'a, T>;
958
959 fn into_iter(self) -> Iter<'a, T> { self.iter() }
960 }
961
962 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
963 impl<T> Iterator for IntoIter<T> {
964 type Item = T;
965 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
966 }
967
968 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
969 impl <T> IntoIterator for Receiver<T> {
970 type Item = T;
971 type IntoIter = IntoIter<T>;
972
973 fn into_iter(self) -> IntoIter<T> {
974 IntoIter { rx: self }
975 }
976 }
977
978 #[stable(feature = "rust1", since = "1.0.0")]
979 impl<T> Drop for Receiver<T> {
980 fn drop(&mut self) {
981 match *unsafe { self.inner_mut() } {
982 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
983 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
984 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
985 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
986 }
987 }
988 }
989
990 #[stable(feature = "rust1", since = "1.0.0")]
991 impl<T> fmt::Debug for SendError<T> {
992 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
993 "SendError(..)".fmt(f)
994 }
995 }
996
997 #[stable(feature = "rust1", since = "1.0.0")]
998 impl<T> fmt::Display for SendError<T> {
999 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1000 "sending on a closed channel".fmt(f)
1001 }
1002 }
1003
1004 #[stable(feature = "rust1", since = "1.0.0")]
1005 impl<T: Send + Reflect> error::Error for SendError<T> {
1006 fn description(&self) -> &str {
1007 "sending on a closed channel"
1008 }
1009
1010 fn cause(&self) -> Option<&error::Error> {
1011 None
1012 }
1013 }
1014
1015 #[stable(feature = "rust1", since = "1.0.0")]
1016 impl<T> fmt::Debug for TrySendError<T> {
1017 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1018 match *self {
1019 TrySendError::Full(..) => "Full(..)".fmt(f),
1020 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1021 }
1022 }
1023 }
1024
1025 #[stable(feature = "rust1", since = "1.0.0")]
1026 impl<T> fmt::Display for TrySendError<T> {
1027 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1028 match *self {
1029 TrySendError::Full(..) => {
1030 "sending on a full channel".fmt(f)
1031 }
1032 TrySendError::Disconnected(..) => {
1033 "sending on a closed channel".fmt(f)
1034 }
1035 }
1036 }
1037 }
1038
1039 #[stable(feature = "rust1", since = "1.0.0")]
1040 impl<T: Send + Reflect> error::Error for TrySendError<T> {
1041
1042 fn description(&self) -> &str {
1043 match *self {
1044 TrySendError::Full(..) => {
1045 "sending on a full channel"
1046 }
1047 TrySendError::Disconnected(..) => {
1048 "sending on a closed channel"
1049 }
1050 }
1051 }
1052
1053 fn cause(&self) -> Option<&error::Error> {
1054 None
1055 }
1056 }
1057
1058 #[stable(feature = "rust1", since = "1.0.0")]
1059 impl fmt::Display for RecvError {
1060 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1061 "receiving on a closed channel".fmt(f)
1062 }
1063 }
1064
1065 #[stable(feature = "rust1", since = "1.0.0")]
1066 impl error::Error for RecvError {
1067
1068 fn description(&self) -> &str {
1069 "receiving on a closed channel"
1070 }
1071
1072 fn cause(&self) -> Option<&error::Error> {
1073 None
1074 }
1075 }
1076
1077 #[stable(feature = "rust1", since = "1.0.0")]
1078 impl fmt::Display for TryRecvError {
1079 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1080 match *self {
1081 TryRecvError::Empty => {
1082 "receiving on an empty channel".fmt(f)
1083 }
1084 TryRecvError::Disconnected => {
1085 "receiving on a closed channel".fmt(f)
1086 }
1087 }
1088 }
1089 }
1090
1091 #[stable(feature = "rust1", since = "1.0.0")]
1092 impl error::Error for TryRecvError {
1093
1094 fn description(&self) -> &str {
1095 match *self {
1096 TryRecvError::Empty => {
1097 "receiving on an empty channel"
1098 }
1099 TryRecvError::Disconnected => {
1100 "receiving on a closed channel"
1101 }
1102 }
1103 }
1104
1105 fn cause(&self) -> Option<&error::Error> {
1106 None
1107 }
1108 }
1109
1110 #[cfg(test)]
1111 mod tests {
1112 use prelude::v1::*;
1113
1114 use env;
1115 use super::*;
1116 use thread;
1117
1118 pub fn stress_factor() -> usize {
1119 match env::var("RUST_TEST_STRESS") {
1120 Ok(val) => val.parse().unwrap(),
1121 Err(..) => 1,
1122 }
1123 }
1124
1125 #[test]
1126 fn smoke() {
1127 let (tx, rx) = channel::<i32>();
1128 tx.send(1).unwrap();
1129 assert_eq!(rx.recv().unwrap(), 1);
1130 }
1131
1132 #[test]
1133 fn drop_full() {
1134 let (tx, _rx) = channel::<Box<isize>>();
1135 tx.send(box 1).unwrap();
1136 }
1137
1138 #[test]
1139 fn drop_full_shared() {
1140 let (tx, _rx) = channel::<Box<isize>>();
1141 drop(tx.clone());
1142 drop(tx.clone());
1143 tx.send(box 1).unwrap();
1144 }
1145
1146 #[test]
1147 fn smoke_shared() {
1148 let (tx, rx) = channel::<i32>();
1149 tx.send(1).unwrap();
1150 assert_eq!(rx.recv().unwrap(), 1);
1151 let tx = tx.clone();
1152 tx.send(1).unwrap();
1153 assert_eq!(rx.recv().unwrap(), 1);
1154 }
1155
1156 #[test]
1157 fn smoke_threads() {
1158 let (tx, rx) = channel::<i32>();
1159 let _t = thread::spawn(move|| {
1160 tx.send(1).unwrap();
1161 });
1162 assert_eq!(rx.recv().unwrap(), 1);
1163 }
1164
1165 #[test]
1166 fn smoke_port_gone() {
1167 let (tx, rx) = channel::<i32>();
1168 drop(rx);
1169 assert!(tx.send(1).is_err());
1170 }
1171
1172 #[test]
1173 fn smoke_shared_port_gone() {
1174 let (tx, rx) = channel::<i32>();
1175 drop(rx);
1176 assert!(tx.send(1).is_err())
1177 }
1178
1179 #[test]
1180 fn smoke_shared_port_gone2() {
1181 let (tx, rx) = channel::<i32>();
1182 drop(rx);
1183 let tx2 = tx.clone();
1184 drop(tx);
1185 assert!(tx2.send(1).is_err());
1186 }
1187
1188 #[test]
1189 fn port_gone_concurrent() {
1190 let (tx, rx) = channel::<i32>();
1191 let _t = thread::spawn(move|| {
1192 rx.recv().unwrap();
1193 });
1194 while tx.send(1).is_ok() {}
1195 }
1196
1197 #[test]
1198 fn port_gone_concurrent_shared() {
1199 let (tx, rx) = channel::<i32>();
1200 let tx2 = tx.clone();
1201 let _t = thread::spawn(move|| {
1202 rx.recv().unwrap();
1203 });
1204 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1205 }
1206
1207 #[test]
1208 fn smoke_chan_gone() {
1209 let (tx, rx) = channel::<i32>();
1210 drop(tx);
1211 assert!(rx.recv().is_err());
1212 }
1213
1214 #[test]
1215 fn smoke_chan_gone_shared() {
1216 let (tx, rx) = channel::<()>();
1217 let tx2 = tx.clone();
1218 drop(tx);
1219 drop(tx2);
1220 assert!(rx.recv().is_err());
1221 }
1222
1223 #[test]
1224 fn chan_gone_concurrent() {
1225 let (tx, rx) = channel::<i32>();
1226 let _t = thread::spawn(move|| {
1227 tx.send(1).unwrap();
1228 tx.send(1).unwrap();
1229 });
1230 while rx.recv().is_ok() {}
1231 }
1232
1233 #[test]
1234 fn stress() {
1235 let (tx, rx) = channel::<i32>();
1236 let t = thread::spawn(move|| {
1237 for _ in 0..10000 { tx.send(1).unwrap(); }
1238 });
1239 for _ in 0..10000 {
1240 assert_eq!(rx.recv().unwrap(), 1);
1241 }
1242 t.join().ok().unwrap();
1243 }
1244
1245 #[test]
1246 fn stress_shared() {
1247 const AMT: u32 = 10000;
1248 const NTHREADS: u32 = 8;
1249 let (tx, rx) = channel::<i32>();
1250
1251 let t = thread::spawn(move|| {
1252 for _ in 0..AMT * NTHREADS {
1253 assert_eq!(rx.recv().unwrap(), 1);
1254 }
1255 match rx.try_recv() {
1256 Ok(..) => panic!(),
1257 _ => {}
1258 }
1259 });
1260
1261 for _ in 0..NTHREADS {
1262 let tx = tx.clone();
1263 thread::spawn(move|| {
1264 for _ in 0..AMT { tx.send(1).unwrap(); }
1265 });
1266 }
1267 drop(tx);
1268 t.join().ok().unwrap();
1269 }
1270
1271 #[test]
1272 fn send_from_outside_runtime() {
1273 let (tx1, rx1) = channel::<()>();
1274 let (tx2, rx2) = channel::<i32>();
1275 let t1 = thread::spawn(move|| {
1276 tx1.send(()).unwrap();
1277 for _ in 0..40 {
1278 assert_eq!(rx2.recv().unwrap(), 1);
1279 }
1280 });
1281 rx1.recv().unwrap();
1282 let t2 = thread::spawn(move|| {
1283 for _ in 0..40 {
1284 tx2.send(1).unwrap();
1285 }
1286 });
1287 t1.join().ok().unwrap();
1288 t2.join().ok().unwrap();
1289 }
1290
1291 #[test]
1292 fn recv_from_outside_runtime() {
1293 let (tx, rx) = channel::<i32>();
1294 let t = thread::spawn(move|| {
1295 for _ in 0..40 {
1296 assert_eq!(rx.recv().unwrap(), 1);
1297 }
1298 });
1299 for _ in 0..40 {
1300 tx.send(1).unwrap();
1301 }
1302 t.join().ok().unwrap();
1303 }
1304
1305 #[test]
1306 fn no_runtime() {
1307 let (tx1, rx1) = channel::<i32>();
1308 let (tx2, rx2) = channel::<i32>();
1309 let t1 = thread::spawn(move|| {
1310 assert_eq!(rx1.recv().unwrap(), 1);
1311 tx2.send(2).unwrap();
1312 });
1313 let t2 = thread::spawn(move|| {
1314 tx1.send(1).unwrap();
1315 assert_eq!(rx2.recv().unwrap(), 2);
1316 });
1317 t1.join().ok().unwrap();
1318 t2.join().ok().unwrap();
1319 }
1320
1321 #[test]
1322 fn oneshot_single_thread_close_port_first() {
1323 // Simple test of closing without sending
1324 let (_tx, rx) = channel::<i32>();
1325 drop(rx);
1326 }
1327
1328 #[test]
1329 fn oneshot_single_thread_close_chan_first() {
1330 // Simple test of closing without sending
1331 let (tx, _rx) = channel::<i32>();
1332 drop(tx);
1333 }
1334
1335 #[test]
1336 fn oneshot_single_thread_send_port_close() {
1337 // Testing that the sender cleans up the payload if receiver is closed
1338 let (tx, rx) = channel::<Box<i32>>();
1339 drop(rx);
1340 assert!(tx.send(box 0).is_err());
1341 }
1342
1343 #[test]
1344 fn oneshot_single_thread_recv_chan_close() {
1345 // Receiving on a closed chan will panic
1346 let res = thread::spawn(move|| {
1347 let (tx, rx) = channel::<i32>();
1348 drop(tx);
1349 rx.recv().unwrap();
1350 }).join();
1351 // What is our res?
1352 assert!(res.is_err());
1353 }
1354
1355 #[test]
1356 fn oneshot_single_thread_send_then_recv() {
1357 let (tx, rx) = channel::<Box<i32>>();
1358 tx.send(box 10).unwrap();
1359 assert!(rx.recv().unwrap() == box 10);
1360 }
1361
1362 #[test]
1363 fn oneshot_single_thread_try_send_open() {
1364 let (tx, rx) = channel::<i32>();
1365 assert!(tx.send(10).is_ok());
1366 assert!(rx.recv().unwrap() == 10);
1367 }
1368
1369 #[test]
1370 fn oneshot_single_thread_try_send_closed() {
1371 let (tx, rx) = channel::<i32>();
1372 drop(rx);
1373 assert!(tx.send(10).is_err());
1374 }
1375
1376 #[test]
1377 fn oneshot_single_thread_try_recv_open() {
1378 let (tx, rx) = channel::<i32>();
1379 tx.send(10).unwrap();
1380 assert!(rx.recv() == Ok(10));
1381 }
1382
1383 #[test]
1384 fn oneshot_single_thread_try_recv_closed() {
1385 let (tx, rx) = channel::<i32>();
1386 drop(tx);
1387 assert!(rx.recv().is_err());
1388 }
1389
1390 #[test]
1391 fn oneshot_single_thread_peek_data() {
1392 let (tx, rx) = channel::<i32>();
1393 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1394 tx.send(10).unwrap();
1395 assert_eq!(rx.try_recv(), Ok(10));
1396 }
1397
1398 #[test]
1399 fn oneshot_single_thread_peek_close() {
1400 let (tx, rx) = channel::<i32>();
1401 drop(tx);
1402 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1403 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1404 }
1405
1406 #[test]
1407 fn oneshot_single_thread_peek_open() {
1408 let (_tx, rx) = channel::<i32>();
1409 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1410 }
1411
1412 #[test]
1413 fn oneshot_multi_task_recv_then_send() {
1414 let (tx, rx) = channel::<Box<i32>>();
1415 let _t = thread::spawn(move|| {
1416 assert!(rx.recv().unwrap() == box 10);
1417 });
1418
1419 tx.send(box 10).unwrap();
1420 }
1421
1422 #[test]
1423 fn oneshot_multi_task_recv_then_close() {
1424 let (tx, rx) = channel::<Box<i32>>();
1425 let _t = thread::spawn(move|| {
1426 drop(tx);
1427 });
1428 let res = thread::spawn(move|| {
1429 assert!(rx.recv().unwrap() == box 10);
1430 }).join();
1431 assert!(res.is_err());
1432 }
1433
1434 #[test]
1435 fn oneshot_multi_thread_close_stress() {
1436 for _ in 0..stress_factor() {
1437 let (tx, rx) = channel::<i32>();
1438 let _t = thread::spawn(move|| {
1439 drop(rx);
1440 });
1441 drop(tx);
1442 }
1443 }
1444
1445 #[test]
1446 fn oneshot_multi_thread_send_close_stress() {
1447 for _ in 0..stress_factor() {
1448 let (tx, rx) = channel::<i32>();
1449 let _t = thread::spawn(move|| {
1450 drop(rx);
1451 });
1452 let _ = thread::spawn(move|| {
1453 tx.send(1).unwrap();
1454 }).join();
1455 }
1456 }
1457
1458 #[test]
1459 fn oneshot_multi_thread_recv_close_stress() {
1460 for _ in 0..stress_factor() {
1461 let (tx, rx) = channel::<i32>();
1462 thread::spawn(move|| {
1463 let res = thread::spawn(move|| {
1464 rx.recv().unwrap();
1465 }).join();
1466 assert!(res.is_err());
1467 });
1468 let _t = thread::spawn(move|| {
1469 thread::spawn(move|| {
1470 drop(tx);
1471 });
1472 });
1473 }
1474 }
1475
1476 #[test]
1477 fn oneshot_multi_thread_send_recv_stress() {
1478 for _ in 0..stress_factor() {
1479 let (tx, rx) = channel::<Box<isize>>();
1480 let _t = thread::spawn(move|| {
1481 tx.send(box 10).unwrap();
1482 });
1483 assert!(rx.recv().unwrap() == box 10);
1484 }
1485 }
1486
1487 #[test]
1488 fn stream_send_recv_stress() {
1489 for _ in 0..stress_factor() {
1490 let (tx, rx) = channel();
1491
1492 send(tx, 0);
1493 recv(rx, 0);
1494
1495 fn send(tx: Sender<Box<i32>>, i: i32) {
1496 if i == 10 { return }
1497
1498 thread::spawn(move|| {
1499 tx.send(box i).unwrap();
1500 send(tx, i + 1);
1501 });
1502 }
1503
1504 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1505 if i == 10 { return }
1506
1507 thread::spawn(move|| {
1508 assert!(rx.recv().unwrap() == box i);
1509 recv(rx, i + 1);
1510 });
1511 }
1512 }
1513 }
1514
1515 #[test]
1516 fn recv_a_lot() {
1517 // Regression test that we don't run out of stack in scheduler context
1518 let (tx, rx) = channel();
1519 for _ in 0..10000 { tx.send(()).unwrap(); }
1520 for _ in 0..10000 { rx.recv().unwrap(); }
1521 }
1522
1523 #[test]
1524 fn shared_chan_stress() {
1525 let (tx, rx) = channel();
1526 let total = stress_factor() + 100;
1527 for _ in 0..total {
1528 let tx = tx.clone();
1529 thread::spawn(move|| {
1530 tx.send(()).unwrap();
1531 });
1532 }
1533
1534 for _ in 0..total {
1535 rx.recv().unwrap();
1536 }
1537 }
1538
1539 #[test]
1540 fn test_nested_recv_iter() {
1541 let (tx, rx) = channel::<i32>();
1542 let (total_tx, total_rx) = channel::<i32>();
1543
1544 let _t = thread::spawn(move|| {
1545 let mut acc = 0;
1546 for x in rx.iter() {
1547 acc += x;
1548 }
1549 total_tx.send(acc).unwrap();
1550 });
1551
1552 tx.send(3).unwrap();
1553 tx.send(1).unwrap();
1554 tx.send(2).unwrap();
1555 drop(tx);
1556 assert_eq!(total_rx.recv().unwrap(), 6);
1557 }
1558
1559 #[test]
1560 fn test_recv_iter_break() {
1561 let (tx, rx) = channel::<i32>();
1562 let (count_tx, count_rx) = channel();
1563
1564 let _t = thread::spawn(move|| {
1565 let mut count = 0;
1566 for x in rx.iter() {
1567 if count >= 3 {
1568 break;
1569 } else {
1570 count += x;
1571 }
1572 }
1573 count_tx.send(count).unwrap();
1574 });
1575
1576 tx.send(2).unwrap();
1577 tx.send(2).unwrap();
1578 tx.send(2).unwrap();
1579 let _ = tx.send(2);
1580 drop(tx);
1581 assert_eq!(count_rx.recv().unwrap(), 4);
1582 }
1583
1584 #[test]
1585 fn test_recv_into_iter_owned() {
1586 let mut iter = {
1587 let (tx, rx) = channel::<i32>();
1588 tx.send(1).unwrap();
1589 tx.send(2).unwrap();
1590
1591 rx.into_iter()
1592 };
1593 assert_eq!(iter.next().unwrap(), 1);
1594 assert_eq!(iter.next().unwrap(), 2);
1595 assert_eq!(iter.next().is_none(), true);
1596 }
1597
1598 #[test]
1599 fn test_recv_into_iter_borrowed() {
1600 let (tx, rx) = channel::<i32>();
1601 tx.send(1).unwrap();
1602 tx.send(2).unwrap();
1603 drop(tx);
1604 let mut iter = (&rx).into_iter();
1605 assert_eq!(iter.next().unwrap(), 1);
1606 assert_eq!(iter.next().unwrap(), 2);
1607 assert_eq!(iter.next().is_none(), true);
1608 }
1609
1610 #[test]
1611 fn try_recv_states() {
1612 let (tx1, rx1) = channel::<i32>();
1613 let (tx2, rx2) = channel::<()>();
1614 let (tx3, rx3) = channel::<()>();
1615 let _t = thread::spawn(move|| {
1616 rx2.recv().unwrap();
1617 tx1.send(1).unwrap();
1618 tx3.send(()).unwrap();
1619 rx2.recv().unwrap();
1620 drop(tx1);
1621 tx3.send(()).unwrap();
1622 });
1623
1624 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1625 tx2.send(()).unwrap();
1626 rx3.recv().unwrap();
1627 assert_eq!(rx1.try_recv(), Ok(1));
1628 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1629 tx2.send(()).unwrap();
1630 rx3.recv().unwrap();
1631 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1632 }
1633
1634 // This bug used to end up in a livelock inside of the Receiver destructor
1635 // because the internal state of the Shared packet was corrupted
1636 #[test]
1637 fn destroy_upgraded_shared_port_when_sender_still_active() {
1638 let (tx, rx) = channel();
1639 let (tx2, rx2) = channel();
1640 let _t = thread::spawn(move|| {
1641 rx.recv().unwrap(); // wait on a oneshot
1642 drop(rx); // destroy a shared
1643 tx2.send(()).unwrap();
1644 });
1645 // make sure the other thread has gone to sleep
1646 for _ in 0..5000 { thread::yield_now(); }
1647
1648 // upgrade to a shared chan and send a message
1649 let t = tx.clone();
1650 drop(tx);
1651 t.send(()).unwrap();
1652
1653 // wait for the child thread to exit before we exit
1654 rx2.recv().unwrap();
1655 }
1656 }
1657
1658 #[cfg(test)]
1659 mod sync_tests {
1660 use prelude::v1::*;
1661
1662 use env;
1663 use thread;
1664 use super::*;
1665
1666 pub fn stress_factor() -> usize {
1667 match env::var("RUST_TEST_STRESS") {
1668 Ok(val) => val.parse().unwrap(),
1669 Err(..) => 1,
1670 }
1671 }
1672
1673 #[test]
1674 fn smoke() {
1675 let (tx, rx) = sync_channel::<i32>(1);
1676 tx.send(1).unwrap();
1677 assert_eq!(rx.recv().unwrap(), 1);
1678 }
1679
1680 #[test]
1681 fn drop_full() {
1682 let (tx, _rx) = sync_channel::<Box<isize>>(1);
1683 tx.send(box 1).unwrap();
1684 }
1685
1686 #[test]
1687 fn smoke_shared() {
1688 let (tx, rx) = sync_channel::<i32>(1);
1689 tx.send(1).unwrap();
1690 assert_eq!(rx.recv().unwrap(), 1);
1691 let tx = tx.clone();
1692 tx.send(1).unwrap();
1693 assert_eq!(rx.recv().unwrap(), 1);
1694 }
1695
1696 #[test]
1697 fn smoke_threads() {
1698 let (tx, rx) = sync_channel::<i32>(0);
1699 let _t = thread::spawn(move|| {
1700 tx.send(1).unwrap();
1701 });
1702 assert_eq!(rx.recv().unwrap(), 1);
1703 }
1704
1705 #[test]
1706 fn smoke_port_gone() {
1707 let (tx, rx) = sync_channel::<i32>(0);
1708 drop(rx);
1709 assert!(tx.send(1).is_err());
1710 }
1711
1712 #[test]
1713 fn smoke_shared_port_gone2() {
1714 let (tx, rx) = sync_channel::<i32>(0);
1715 drop(rx);
1716 let tx2 = tx.clone();
1717 drop(tx);
1718 assert!(tx2.send(1).is_err());
1719 }
1720
1721 #[test]
1722 fn port_gone_concurrent() {
1723 let (tx, rx) = sync_channel::<i32>(0);
1724 let _t = thread::spawn(move|| {
1725 rx.recv().unwrap();
1726 });
1727 while tx.send(1).is_ok() {}
1728 }
1729
1730 #[test]
1731 fn port_gone_concurrent_shared() {
1732 let (tx, rx) = sync_channel::<i32>(0);
1733 let tx2 = tx.clone();
1734 let _t = thread::spawn(move|| {
1735 rx.recv().unwrap();
1736 });
1737 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1738 }
1739
1740 #[test]
1741 fn smoke_chan_gone() {
1742 let (tx, rx) = sync_channel::<i32>(0);
1743 drop(tx);
1744 assert!(rx.recv().is_err());
1745 }
1746
1747 #[test]
1748 fn smoke_chan_gone_shared() {
1749 let (tx, rx) = sync_channel::<()>(0);
1750 let tx2 = tx.clone();
1751 drop(tx);
1752 drop(tx2);
1753 assert!(rx.recv().is_err());
1754 }
1755
1756 #[test]
1757 fn chan_gone_concurrent() {
1758 let (tx, rx) = sync_channel::<i32>(0);
1759 thread::spawn(move|| {
1760 tx.send(1).unwrap();
1761 tx.send(1).unwrap();
1762 });
1763 while rx.recv().is_ok() {}
1764 }
1765
1766 #[test]
1767 fn stress() {
1768 let (tx, rx) = sync_channel::<i32>(0);
1769 thread::spawn(move|| {
1770 for _ in 0..10000 { tx.send(1).unwrap(); }
1771 });
1772 for _ in 0..10000 {
1773 assert_eq!(rx.recv().unwrap(), 1);
1774 }
1775 }
1776
1777 #[test]
1778 fn stress_shared() {
1779 const AMT: u32 = 1000;
1780 const NTHREADS: u32 = 8;
1781 let (tx, rx) = sync_channel::<i32>(0);
1782 let (dtx, drx) = sync_channel::<()>(0);
1783
1784 thread::spawn(move|| {
1785 for _ in 0..AMT * NTHREADS {
1786 assert_eq!(rx.recv().unwrap(), 1);
1787 }
1788 match rx.try_recv() {
1789 Ok(..) => panic!(),
1790 _ => {}
1791 }
1792 dtx.send(()).unwrap();
1793 });
1794
1795 for _ in 0..NTHREADS {
1796 let tx = tx.clone();
1797 thread::spawn(move|| {
1798 for _ in 0..AMT { tx.send(1).unwrap(); }
1799 });
1800 }
1801 drop(tx);
1802 drx.recv().unwrap();
1803 }
1804
1805 #[test]
1806 fn oneshot_single_thread_close_port_first() {
1807 // Simple test of closing without sending
1808 let (_tx, rx) = sync_channel::<i32>(0);
1809 drop(rx);
1810 }
1811
1812 #[test]
1813 fn oneshot_single_thread_close_chan_first() {
1814 // Simple test of closing without sending
1815 let (tx, _rx) = sync_channel::<i32>(0);
1816 drop(tx);
1817 }
1818
1819 #[test]
1820 fn oneshot_single_thread_send_port_close() {
1821 // Testing that the sender cleans up the payload if receiver is closed
1822 let (tx, rx) = sync_channel::<Box<i32>>(0);
1823 drop(rx);
1824 assert!(tx.send(box 0).is_err());
1825 }
1826
1827 #[test]
1828 fn oneshot_single_thread_recv_chan_close() {
1829 // Receiving on a closed chan will panic
1830 let res = thread::spawn(move|| {
1831 let (tx, rx) = sync_channel::<i32>(0);
1832 drop(tx);
1833 rx.recv().unwrap();
1834 }).join();
1835 // What is our res?
1836 assert!(res.is_err());
1837 }
1838
1839 #[test]
1840 fn oneshot_single_thread_send_then_recv() {
1841 let (tx, rx) = sync_channel::<Box<i32>>(1);
1842 tx.send(box 10).unwrap();
1843 assert!(rx.recv().unwrap() == box 10);
1844 }
1845
1846 #[test]
1847 fn oneshot_single_thread_try_send_open() {
1848 let (tx, rx) = sync_channel::<i32>(1);
1849 assert_eq!(tx.try_send(10), Ok(()));
1850 assert!(rx.recv().unwrap() == 10);
1851 }
1852
1853 #[test]
1854 fn oneshot_single_thread_try_send_closed() {
1855 let (tx, rx) = sync_channel::<i32>(0);
1856 drop(rx);
1857 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1858 }
1859
1860 #[test]
1861 fn oneshot_single_thread_try_send_closed2() {
1862 let (tx, _rx) = sync_channel::<i32>(0);
1863 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1864 }
1865
1866 #[test]
1867 fn oneshot_single_thread_try_recv_open() {
1868 let (tx, rx) = sync_channel::<i32>(1);
1869 tx.send(10).unwrap();
1870 assert!(rx.recv() == Ok(10));
1871 }
1872
1873 #[test]
1874 fn oneshot_single_thread_try_recv_closed() {
1875 let (tx, rx) = sync_channel::<i32>(0);
1876 drop(tx);
1877 assert!(rx.recv().is_err());
1878 }
1879
1880 #[test]
1881 fn oneshot_single_thread_peek_data() {
1882 let (tx, rx) = sync_channel::<i32>(1);
1883 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1884 tx.send(10).unwrap();
1885 assert_eq!(rx.try_recv(), Ok(10));
1886 }
1887
1888 #[test]
1889 fn oneshot_single_thread_peek_close() {
1890 let (tx, rx) = sync_channel::<i32>(0);
1891 drop(tx);
1892 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1893 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1894 }
1895
1896 #[test]
1897 fn oneshot_single_thread_peek_open() {
1898 let (_tx, rx) = sync_channel::<i32>(0);
1899 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1900 }
1901
1902 #[test]
1903 fn oneshot_multi_task_recv_then_send() {
1904 let (tx, rx) = sync_channel::<Box<i32>>(0);
1905 let _t = thread::spawn(move|| {
1906 assert!(rx.recv().unwrap() == box 10);
1907 });
1908
1909 tx.send(box 10).unwrap();
1910 }
1911
1912 #[test]
1913 fn oneshot_multi_task_recv_then_close() {
1914 let (tx, rx) = sync_channel::<Box<i32>>(0);
1915 let _t = thread::spawn(move|| {
1916 drop(tx);
1917 });
1918 let res = thread::spawn(move|| {
1919 assert!(rx.recv().unwrap() == box 10);
1920 }).join();
1921 assert!(res.is_err());
1922 }
1923
1924 #[test]
1925 fn oneshot_multi_thread_close_stress() {
1926 for _ in 0..stress_factor() {
1927 let (tx, rx) = sync_channel::<i32>(0);
1928 let _t = thread::spawn(move|| {
1929 drop(rx);
1930 });
1931 drop(tx);
1932 }
1933 }
1934
1935 #[test]
1936 fn oneshot_multi_thread_send_close_stress() {
1937 for _ in 0..stress_factor() {
1938 let (tx, rx) = sync_channel::<i32>(0);
1939 let _t = thread::spawn(move|| {
1940 drop(rx);
1941 });
1942 let _ = thread::spawn(move || {
1943 tx.send(1).unwrap();
1944 }).join();
1945 }
1946 }
1947
1948 #[test]
1949 fn oneshot_multi_thread_recv_close_stress() {
1950 for _ in 0..stress_factor() {
1951 let (tx, rx) = sync_channel::<i32>(0);
1952 let _t = thread::spawn(move|| {
1953 let res = thread::spawn(move|| {
1954 rx.recv().unwrap();
1955 }).join();
1956 assert!(res.is_err());
1957 });
1958 let _t = thread::spawn(move|| {
1959 thread::spawn(move|| {
1960 drop(tx);
1961 });
1962 });
1963 }
1964 }
1965
1966 #[test]
1967 fn oneshot_multi_thread_send_recv_stress() {
1968 for _ in 0..stress_factor() {
1969 let (tx, rx) = sync_channel::<Box<i32>>(0);
1970 let _t = thread::spawn(move|| {
1971 tx.send(box 10).unwrap();
1972 });
1973 assert!(rx.recv().unwrap() == box 10);
1974 }
1975 }
1976
1977 #[test]
1978 fn stream_send_recv_stress() {
1979 for _ in 0..stress_factor() {
1980 let (tx, rx) = sync_channel::<Box<i32>>(0);
1981
1982 send(tx, 0);
1983 recv(rx, 0);
1984
1985 fn send(tx: SyncSender<Box<i32>>, i: i32) {
1986 if i == 10 { return }
1987
1988 thread::spawn(move|| {
1989 tx.send(box i).unwrap();
1990 send(tx, i + 1);
1991 });
1992 }
1993
1994 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1995 if i == 10 { return }
1996
1997 thread::spawn(move|| {
1998 assert!(rx.recv().unwrap() == box i);
1999 recv(rx, i + 1);
2000 });
2001 }
2002 }
2003 }
2004
2005 #[test]
2006 fn recv_a_lot() {
2007 // Regression test that we don't run out of stack in scheduler context
2008 let (tx, rx) = sync_channel(10000);
2009 for _ in 0..10000 { tx.send(()).unwrap(); }
2010 for _ in 0..10000 { rx.recv().unwrap(); }
2011 }
2012
2013 #[test]
2014 fn shared_chan_stress() {
2015 let (tx, rx) = sync_channel(0);
2016 let total = stress_factor() + 100;
2017 for _ in 0..total {
2018 let tx = tx.clone();
2019 thread::spawn(move|| {
2020 tx.send(()).unwrap();
2021 });
2022 }
2023
2024 for _ in 0..total {
2025 rx.recv().unwrap();
2026 }
2027 }
2028
2029 #[test]
2030 fn test_nested_recv_iter() {
2031 let (tx, rx) = sync_channel::<i32>(0);
2032 let (total_tx, total_rx) = sync_channel::<i32>(0);
2033
2034 let _t = thread::spawn(move|| {
2035 let mut acc = 0;
2036 for x in rx.iter() {
2037 acc += x;
2038 }
2039 total_tx.send(acc).unwrap();
2040 });
2041
2042 tx.send(3).unwrap();
2043 tx.send(1).unwrap();
2044 tx.send(2).unwrap();
2045 drop(tx);
2046 assert_eq!(total_rx.recv().unwrap(), 6);
2047 }
2048
2049 #[test]
2050 fn test_recv_iter_break() {
2051 let (tx, rx) = sync_channel::<i32>(0);
2052 let (count_tx, count_rx) = sync_channel(0);
2053
2054 let _t = thread::spawn(move|| {
2055 let mut count = 0;
2056 for x in rx.iter() {
2057 if count >= 3 {
2058 break;
2059 } else {
2060 count += x;
2061 }
2062 }
2063 count_tx.send(count).unwrap();
2064 });
2065
2066 tx.send(2).unwrap();
2067 tx.send(2).unwrap();
2068 tx.send(2).unwrap();
2069 let _ = tx.try_send(2);
2070 drop(tx);
2071 assert_eq!(count_rx.recv().unwrap(), 4);
2072 }
2073
2074 #[test]
2075 fn try_recv_states() {
2076 let (tx1, rx1) = sync_channel::<i32>(1);
2077 let (tx2, rx2) = sync_channel::<()>(1);
2078 let (tx3, rx3) = sync_channel::<()>(1);
2079 let _t = thread::spawn(move|| {
2080 rx2.recv().unwrap();
2081 tx1.send(1).unwrap();
2082 tx3.send(()).unwrap();
2083 rx2.recv().unwrap();
2084 drop(tx1);
2085 tx3.send(()).unwrap();
2086 });
2087
2088 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2089 tx2.send(()).unwrap();
2090 rx3.recv().unwrap();
2091 assert_eq!(rx1.try_recv(), Ok(1));
2092 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2093 tx2.send(()).unwrap();
2094 rx3.recv().unwrap();
2095 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2096 }
2097
2098 // This bug used to end up in a livelock inside of the Receiver destructor
2099 // because the internal state of the Shared packet was corrupted
2100 #[test]
2101 fn destroy_upgraded_shared_port_when_sender_still_active() {
2102 let (tx, rx) = sync_channel::<()>(0);
2103 let (tx2, rx2) = sync_channel::<()>(0);
2104 let _t = thread::spawn(move|| {
2105 rx.recv().unwrap(); // wait on a oneshot
2106 drop(rx); // destroy a shared
2107 tx2.send(()).unwrap();
2108 });
2109 // make sure the other thread has gone to sleep
2110 for _ in 0..5000 { thread::yield_now(); }
2111
2112 // upgrade to a shared chan and send a message
2113 let t = tx.clone();
2114 drop(tx);
2115 t.send(()).unwrap();
2116
2117 // wait for the child thread to exit before we exit
2118 rx2.recv().unwrap();
2119 }
2120
2121 #[test]
2122 fn send1() {
2123 let (tx, rx) = sync_channel::<i32>(0);
2124 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2125 assert_eq!(tx.send(1), Ok(()));
2126 }
2127
2128 #[test]
2129 fn send2() {
2130 let (tx, rx) = sync_channel::<i32>(0);
2131 let _t = thread::spawn(move|| { drop(rx); });
2132 assert!(tx.send(1).is_err());
2133 }
2134
2135 #[test]
2136 fn send3() {
2137 let (tx, rx) = sync_channel::<i32>(1);
2138 assert_eq!(tx.send(1), Ok(()));
2139 let _t =thread::spawn(move|| { drop(rx); });
2140 assert!(tx.send(1).is_err());
2141 }
2142
2143 #[test]
2144 fn send4() {
2145 let (tx, rx) = sync_channel::<i32>(0);
2146 let tx2 = tx.clone();
2147 let (done, donerx) = channel();
2148 let done2 = done.clone();
2149 let _t = thread::spawn(move|| {
2150 assert!(tx.send(1).is_err());
2151 done.send(()).unwrap();
2152 });
2153 let _t = thread::spawn(move|| {
2154 assert!(tx2.send(2).is_err());
2155 done2.send(()).unwrap();
2156 });
2157 drop(rx);
2158 donerx.recv().unwrap();
2159 donerx.recv().unwrap();
2160 }
2161
2162 #[test]
2163 fn try_send1() {
2164 let (tx, _rx) = sync_channel::<i32>(0);
2165 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2166 }
2167
2168 #[test]
2169 fn try_send2() {
2170 let (tx, _rx) = sync_channel::<i32>(1);
2171 assert_eq!(tx.try_send(1), Ok(()));
2172 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2173 }
2174
2175 #[test]
2176 fn try_send3() {
2177 let (tx, rx) = sync_channel::<i32>(1);
2178 assert_eq!(tx.try_send(1), Ok(()));
2179 drop(rx);
2180 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2181 }
2182
2183 #[test]
2184 fn issue_15761() {
2185 fn repro() {
2186 let (tx1, rx1) = sync_channel::<()>(3);
2187 let (tx2, rx2) = sync_channel::<()>(3);
2188
2189 let _t = thread::spawn(move|| {
2190 rx1.recv().unwrap();
2191 tx2.try_send(()).unwrap();
2192 });
2193
2194 tx1.try_send(()).unwrap();
2195 rx2.recv().unwrap();
2196 }
2197
2198 for _ in 0..100 {
2199 repro()
2200 }
2201 }
2202 }