]> git.proxmox.com Git - rustc.git/blob - src/libstd/sync/mpsc/mod.rs
Imported Upstream version 1.4.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
29 //! infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
37 //! a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //! tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //! let tx = tx.clone();
79 //! thread::spawn(move|| {
80 //! tx.send(i).unwrap();
81 //! });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<i32>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<i32>(0);
109 //! thread::spawn(move|| {
110 //! // This will wait for the parent thread to start receiving
111 //! tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115
116 #![stable(feature = "rust1", since = "1.0.0")]
117
118 // A description of how Rust's channel implementation works
119 //
120 // Channels are supposed to be the basic building block for all other
121 // concurrent primitives that are used in Rust. As a result, the channel type
122 // needs to be highly optimized, flexible, and broad enough for use everywhere.
123 //
124 // The choice of implementation of all channels is to be built on lock-free data
125 // structures. The channels themselves are then consequently also lock-free data
126 // structures. As always with lock-free code, this is a very "here be dragons"
127 // territory, especially because I'm unaware of any academic papers that have
128 // gone into great length about channels of these flavors.
129 //
130 // ## Flavors of channels
131 //
132 // From the perspective of a consumer of this library, there is only one flavor
133 // of channel. This channel can be used as a stream and cloned to allow multiple
134 // senders. Under the hood, however, there are actually three flavors of
135 // channels in play.
136 //
137 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
138 // They contain as few atomics as possible and involve one and
139 // exactly one allocation.
140 // * Streams - these channels are optimized for the non-shared use case. They
141 // use a different concurrent queue that is more tailored for this
142 // use case. The initial allocation of this flavor of channel is not
143 // optimized.
144 // * Shared - this is the most general form of channel that this module offers,
145 // a channel with multiple senders. This type is as optimized as it
146 // can be, but the previous two types mentioned are much faster for
147 // their use-cases.
148 //
149 // ## Concurrent queues
150 //
151 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
152 // recv() obviously blocks. This means that under the hood there must be some
153 // shared and concurrent queue holding all of the actual data.
154 //
155 // With two flavors of channels, two flavors of queues are also used. We have
156 // chosen to use queues from a well-known author that are abbreviated as SPSC
157 // and MPSC (single producer, single consumer and multiple producer, single
158 // consumer). SPSC queues are used for streams while MPSC queues are used for
159 // shared channels.
160 //
161 // ### SPSC optimizations
162 //
163 // The SPSC queue found online is essentially a linked list of nodes where one
164 // half of the nodes are the "queue of data" and the other half of nodes are a
165 // cache of unused nodes. The unused nodes are used such that an allocation is
166 // not required on every push() and a free doesn't need to happen on every
167 // pop().
168 //
169 // As found online, however, the cache of nodes is of an infinite size. This
170 // means that if a channel at one point in its life had 50k items in the queue,
171 // then the queue will always have the capacity for 50k items. I believed that
172 // this was an unnecessary limitation of the implementation, so I have altered
173 // the queue to optionally have a bound on the cache size.
174 //
175 // By default, streams will have an unbounded SPSC queue with a small-ish cache
176 // size. The hope is that the cache is still large enough to have very fast
177 // send() operations while not too large such that millions of channels can
178 // coexist at once.
179 //
180 // ### MPSC optimizations
181 //
182 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
183 // a linked list under the hood to earn its unboundedness, but I have not put
184 // forth much effort into having a cache of nodes similar to the SPSC queue.
185 //
186 // For now, I believe that this is "ok" because shared channels are not the most
187 // common type, but soon we may wish to revisit this queue choice and determine
188 // another candidate for backend storage of shared channels.
189 //
190 // ## Overview of the Implementation
191 //
192 // Now that there's a little background on the concurrent queues used, it's
193 // worth going into much more detail about the channels themselves. The basic
194 // pseudocode for a send/recv are:
195 //
196 //
197 // send(t) recv()
198 // queue.push(t) return if queue.pop()
199 // if increment() == -1 deschedule {
200 // wakeup() if decrement() > 0
201 // cancel_deschedule()
202 // }
203 // queue.pop()
204 //
205 // As mentioned before, there are no locks in this implementation, only atomic
206 // instructions are used.
207 //
208 // ### The internal atomic counter
209 //
210 // Every channel has a shared counter with each half to keep track of the size
211 // of the queue. This counter is used to abort descheduling by the receiver and
212 // to know when to wake up on the sending side.
213 //
214 // As seen in the pseudocode, senders will increment this count and receivers
215 // will decrement the count. The theory behind this is that if a sender sees a
216 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
217 // then it doesn't need to block.
218 //
219 // The recv() method has a beginning call to pop(), and if successful, it needs
220 // to decrement the count. It is a crucial implementation detail that this
221 // decrement does *not* happen to the shared counter. If this were the case,
222 // then it would be possible for the counter to be very negative when there were
223 // no receivers waiting, in which case the senders would have to determine when
224 // it was actually appropriate to wake up a receiver.
225 //
226 // Instead, the "steal count" is kept track of separately (not atomically
227 // because it's only used by receivers), and then the decrement() call when
228 // descheduling will lump in all of the recent steals into one large decrement.
229 //
230 // The implication of this is that if a sender sees a -1 count, then there's
231 // guaranteed to be a waiter waiting!
232 //
233 // ## Native Implementation
234 //
235 // A major goal of these channels is to work seamlessly on and off the runtime.
236 // All of the previous race conditions have been worded in terms of
237 // scheduler-isms (which is obviously not available without the runtime).
238 //
239 // For now, native usage of channels (off the runtime) will fall back onto
240 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
241 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
242 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
243 // condition variable.
244 //
245 // ## Select
246 //
247 // Being able to support selection over channels has greatly influenced this
248 // design, and not only does selection need to work inside the runtime, but also
249 // outside the runtime.
250 //
251 // The implementation is fairly straightforward. The goal of select() is not to
252 // return some data, but only to return which channel can receive data without
253 // blocking. The implementation is essentially the entire blocking procedure
254 // followed by an increment as soon as its woken up. The cancellation procedure
255 // involves an increment and swapping out of to_wake to acquire ownership of the
256 // thread to unblock.
257 //
258 // Sadly this current implementation requires multiple allocations, so I have
259 // seen the throughput of select() be much worse than it should be. I do not
260 // believe that there is anything fundamental that needs to change about these
261 // channels, however, in order to support a more efficient select().
262 //
263 // # Conclusion
264 //
265 // And now that you've seen all the races that I found and attempted to fix,
266 // here's the code for you to find some more!
267
268 use sync::Arc;
269 use error;
270 use fmt;
271 use mem;
272 use cell::UnsafeCell;
273 use marker::Reflect;
274
275 pub use self::select::{Select, Handle};
276 use self::select::StartResult;
277 use self::select::StartResult::*;
278 use self::blocking::SignalToken;
279
280 mod blocking;
281 mod oneshot;
282 mod select;
283 mod shared;
284 mod stream;
285 mod sync;
286 mod mpsc_queue;
287 mod spsc_queue;
288
289 /// The receiving-half of Rust's channel type. This half can only be owned by
290 /// one thread
291 #[stable(feature = "rust1", since = "1.0.0")]
292 pub struct Receiver<T> {
293 inner: UnsafeCell<Flavor<T>>,
294 }
295
296 // The receiver port can be sent from place to place, so long as it
297 // is not used to receive non-sendable things.
298 unsafe impl<T: Send> Send for Receiver<T> { }
299
300 /// An iterator over messages on a receiver, this iterator will block
301 /// whenever `next` is called, waiting for a new message, and `None` will be
302 /// returned when the corresponding channel has hung up.
303 #[stable(feature = "rust1", since = "1.0.0")]
304 pub struct Iter<'a, T: 'a> {
305 rx: &'a Receiver<T>
306 }
307
308 /// An owning iterator over messages on a receiver, this iterator will block
309 /// whenever `next` is called, waiting for a new message, and `None` will be
310 /// returned when the corresponding channel has hung up.
311 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
312 pub struct IntoIter<T> {
313 rx: Receiver<T>
314 }
315
316 /// The sending-half of Rust's asynchronous channel type. This half can only be
317 /// owned by one thread, but it can be cloned to send to other threads.
318 #[stable(feature = "rust1", since = "1.0.0")]
319 pub struct Sender<T> {
320 inner: UnsafeCell<Flavor<T>>,
321 }
322
323 // The send port can be sent from place to place, so long as it
324 // is not used to send non-sendable things.
325 unsafe impl<T: Send> Send for Sender<T> { }
326
327 /// The sending-half of Rust's synchronous channel type. This half can only be
328 /// owned by one thread, but it can be cloned to send to other threads.
329 #[stable(feature = "rust1", since = "1.0.0")]
330 pub struct SyncSender<T> {
331 inner: Arc<UnsafeCell<sync::Packet<T>>>,
332 }
333
334 unsafe impl<T: Send> Send for SyncSender<T> {}
335
336 impl<T> !Sync for SyncSender<T> {}
337
338 /// An error returned from the `send` function on channels.
339 ///
340 /// A `send` operation can only fail if the receiving end of a channel is
341 /// disconnected, implying that the data could never be received. The error
342 /// contains the data being sent as a payload so it can be recovered.
343 #[stable(feature = "rust1", since = "1.0.0")]
344 #[derive(PartialEq, Eq, Clone, Copy)]
345 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
346
347 /// An error returned from the `recv` function on a `Receiver`.
348 ///
349 /// The `recv` operation can only fail if the sending half of a channel is
350 /// disconnected, implying that no further messages will ever be received.
351 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
352 #[stable(feature = "rust1", since = "1.0.0")]
353 pub struct RecvError;
354
355 /// This enumeration is the list of the possible reasons that `try_recv` could
356 /// not return data when called.
357 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
358 #[stable(feature = "rust1", since = "1.0.0")]
359 pub enum TryRecvError {
360 /// This channel is currently empty, but the sender(s) have not yet
361 /// disconnected, so data may yet become available.
362 #[stable(feature = "rust1", since = "1.0.0")]
363 Empty,
364
365 /// This channel's sending half has become disconnected, and there will
366 /// never be any more data received on this channel
367 #[stable(feature = "rust1", since = "1.0.0")]
368 Disconnected,
369 }
370
371 /// This enumeration is the list of the possible error outcomes for the
372 /// `SyncSender::try_send` method.
373 #[stable(feature = "rust1", since = "1.0.0")]
374 #[derive(PartialEq, Eq, Clone, Copy)]
375 pub enum TrySendError<T> {
376 /// The data could not be sent on the channel because it would require that
377 /// the callee block to send the data.
378 ///
379 /// If this is a buffered channel, then the buffer is full at this time. If
380 /// this is not a buffered channel, then there is no receiver available to
381 /// acquire the data.
382 #[stable(feature = "rust1", since = "1.0.0")]
383 Full(T),
384
385 /// This channel's receiving half has disconnected, so the data could not be
386 /// sent. The data is returned back to the callee in this case.
387 #[stable(feature = "rust1", since = "1.0.0")]
388 Disconnected(T),
389 }
390
391 enum Flavor<T> {
392 Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
393 Stream(Arc<UnsafeCell<stream::Packet<T>>>),
394 Shared(Arc<UnsafeCell<shared::Packet<T>>>),
395 Sync(Arc<UnsafeCell<sync::Packet<T>>>),
396 }
397
398 #[doc(hidden)]
399 trait UnsafeFlavor<T> {
400 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
401 unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
402 &mut *self.inner_unsafe().get()
403 }
404 unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
405 &*self.inner_unsafe().get()
406 }
407 }
408 impl<T> UnsafeFlavor<T> for Sender<T> {
409 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
410 &self.inner
411 }
412 }
413 impl<T> UnsafeFlavor<T> for Receiver<T> {
414 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
415 &self.inner
416 }
417 }
418
419 /// Creates a new asynchronous channel, returning the sender/receiver halves.
420 ///
421 /// All data sent on the sender will become available on the receiver, and no
422 /// send will block the calling thread (this channel has an "infinite buffer").
423 ///
424 /// # Examples
425 ///
426 /// ```
427 /// use std::sync::mpsc::channel;
428 /// use std::thread;
429 ///
430 /// // tx is is the sending half (tx for transmission), and rx is the receiving
431 /// // half (rx for receiving).
432 /// let (tx, rx) = channel();
433 ///
434 /// // Spawn off an expensive computation
435 /// thread::spawn(move|| {
436 /// # fn expensive_computation() {}
437 /// tx.send(expensive_computation()).unwrap();
438 /// });
439 ///
440 /// // Do some useful work for awhile
441 ///
442 /// // Let's see what that answer was
443 /// println!("{:?}", rx.recv().unwrap());
444 /// ```
445 #[stable(feature = "rust1", since = "1.0.0")]
446 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
447 let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
448 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
449 }
450
451 /// Creates a new synchronous, bounded channel.
452 ///
453 /// Like asynchronous channels, the `Receiver` will block until a message
454 /// becomes available. These channels differ greatly in the semantics of the
455 /// sender from asynchronous channels, however.
456 ///
457 /// This channel has an internal buffer on which messages will be queued. When
458 /// the internal buffer becomes full, future sends will *block* waiting for the
459 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
460 /// becomes "rendezvous channel" where each send will not return until a recv
461 /// is paired with it.
462 ///
463 /// As with asynchronous channels, all senders will panic in `send` if the
464 /// `Receiver` has been destroyed.
465 ///
466 /// # Examples
467 ///
468 /// ```
469 /// use std::sync::mpsc::sync_channel;
470 /// use std::thread;
471 ///
472 /// let (tx, rx) = sync_channel(1);
473 ///
474 /// // this returns immediately
475 /// tx.send(1).unwrap();
476 ///
477 /// thread::spawn(move|| {
478 /// // this will block until the previous message has been received
479 /// tx.send(2).unwrap();
480 /// });
481 ///
482 /// assert_eq!(rx.recv().unwrap(), 1);
483 /// assert_eq!(rx.recv().unwrap(), 2);
484 /// ```
485 #[stable(feature = "rust1", since = "1.0.0")]
486 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
487 let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
488 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
489 }
490
491 ////////////////////////////////////////////////////////////////////////////////
492 // Sender
493 ////////////////////////////////////////////////////////////////////////////////
494
495 impl<T> Sender<T> {
496 fn new(inner: Flavor<T>) -> Sender<T> {
497 Sender {
498 inner: UnsafeCell::new(inner),
499 }
500 }
501
502 /// Attempts to send a value on this channel, returning it back if it could
503 /// not be sent.
504 ///
505 /// A successful send occurs when it is determined that the other end of
506 /// the channel has not hung up already. An unsuccessful send would be one
507 /// where the corresponding receiver has already been deallocated. Note
508 /// that a return value of `Err` means that the data will never be
509 /// received, but a return value of `Ok` does *not* mean that the data
510 /// will be received. It is possible for the corresponding receiver to
511 /// hang up immediately after this function returns `Ok`.
512 ///
513 /// This method will never block the current thread.
514 ///
515 /// # Examples
516 ///
517 /// ```
518 /// use std::sync::mpsc::channel;
519 ///
520 /// let (tx, rx) = channel();
521 ///
522 /// // This send is always successful
523 /// tx.send(1).unwrap();
524 ///
525 /// // This send will fail because the receiver is gone
526 /// drop(rx);
527 /// assert_eq!(tx.send(1).err().unwrap().0, 1);
528 /// ```
529 #[stable(feature = "rust1", since = "1.0.0")]
530 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
531 let (new_inner, ret) = match *unsafe { self.inner() } {
532 Flavor::Oneshot(ref p) => {
533 unsafe {
534 let p = p.get();
535 if !(*p).sent() {
536 return (*p).send(t).map_err(SendError);
537 } else {
538 let a =
539 Arc::new(UnsafeCell::new(stream::Packet::new()));
540 let rx = Receiver::new(Flavor::Stream(a.clone()));
541 match (*p).upgrade(rx) {
542 oneshot::UpSuccess => {
543 let ret = (*a.get()).send(t);
544 (a, ret)
545 }
546 oneshot::UpDisconnected => (a, Err(t)),
547 oneshot::UpWoke(token) => {
548 // This send cannot panic because the thread is
549 // asleep (we're looking at it), so the receiver
550 // can't go away.
551 (*a.get()).send(t).ok().unwrap();
552 token.signal();
553 (a, Ok(()))
554 }
555 }
556 }
557 }
558 }
559 Flavor::Stream(ref p) => return unsafe {
560 (*p.get()).send(t).map_err(SendError)
561 },
562 Flavor::Shared(ref p) => return unsafe {
563 (*p.get()).send(t).map_err(SendError)
564 },
565 Flavor::Sync(..) => unreachable!(),
566 };
567
568 unsafe {
569 let tmp = Sender::new(Flavor::Stream(new_inner));
570 mem::swap(self.inner_mut(), tmp.inner_mut());
571 }
572 ret.map_err(SendError)
573 }
574 }
575
576 #[stable(feature = "rust1", since = "1.0.0")]
577 impl<T> Clone for Sender<T> {
578 fn clone(&self) -> Sender<T> {
579 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
580 Flavor::Oneshot(ref p) => {
581 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
582 unsafe {
583 let guard = (*a.get()).postinit_lock();
584 let rx = Receiver::new(Flavor::Shared(a.clone()));
585 match (*p.get()).upgrade(rx) {
586 oneshot::UpSuccess |
587 oneshot::UpDisconnected => (a, None, guard),
588 oneshot::UpWoke(task) => (a, Some(task), guard)
589 }
590 }
591 }
592 Flavor::Stream(ref p) => {
593 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
594 unsafe {
595 let guard = (*a.get()).postinit_lock();
596 let rx = Receiver::new(Flavor::Shared(a.clone()));
597 match (*p.get()).upgrade(rx) {
598 stream::UpSuccess |
599 stream::UpDisconnected => (a, None, guard),
600 stream::UpWoke(task) => (a, Some(task), guard),
601 }
602 }
603 }
604 Flavor::Shared(ref p) => {
605 unsafe { (*p.get()).clone_chan(); }
606 return Sender::new(Flavor::Shared(p.clone()));
607 }
608 Flavor::Sync(..) => unreachable!(),
609 };
610
611 unsafe {
612 (*packet.get()).inherit_blocker(sleeper, guard);
613
614 let tmp = Sender::new(Flavor::Shared(packet.clone()));
615 mem::swap(self.inner_mut(), tmp.inner_mut());
616 }
617 Sender::new(Flavor::Shared(packet))
618 }
619 }
620
621 #[stable(feature = "rust1", since = "1.0.0")]
622 impl<T> Drop for Sender<T> {
623 fn drop(&mut self) {
624 match *unsafe { self.inner_mut() } {
625 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
626 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
627 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
628 Flavor::Sync(..) => unreachable!(),
629 }
630 }
631 }
632
633 ////////////////////////////////////////////////////////////////////////////////
634 // SyncSender
635 ////////////////////////////////////////////////////////////////////////////////
636
637 impl<T> SyncSender<T> {
638 fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
639 SyncSender { inner: inner }
640 }
641
642 /// Sends a value on this synchronous channel.
643 ///
644 /// This function will *block* until space in the internal buffer becomes
645 /// available or a receiver is available to hand off the message to.
646 ///
647 /// Note that a successful send does *not* guarantee that the receiver will
648 /// ever see the data if there is a buffer on this channel. Items may be
649 /// enqueued in the internal buffer for the receiver to receive at a later
650 /// time. If the buffer size is 0, however, it can be guaranteed that the
651 /// receiver has indeed received the data if this function returns success.
652 ///
653 /// This function will never panic, but it may return `Err` if the
654 /// `Receiver` has disconnected and is no longer able to receive
655 /// information.
656 #[stable(feature = "rust1", since = "1.0.0")]
657 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
658 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
659 }
660
661 /// Attempts to send a value on this channel without blocking.
662 ///
663 /// This method differs from `send` by returning immediately if the
664 /// channel's buffer is full or no receiver is waiting to acquire some
665 /// data. Compared with `send`, this function has two failure cases
666 /// instead of one (one for disconnection, one for a full buffer).
667 ///
668 /// See `SyncSender::send` for notes about guarantees of whether the
669 /// receiver has received the data or not if this function is successful.
670 #[stable(feature = "rust1", since = "1.0.0")]
671 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
672 unsafe { (*self.inner.get()).try_send(t) }
673 }
674 }
675
676 #[stable(feature = "rust1", since = "1.0.0")]
677 impl<T> Clone for SyncSender<T> {
678 fn clone(&self) -> SyncSender<T> {
679 unsafe { (*self.inner.get()).clone_chan(); }
680 SyncSender::new(self.inner.clone())
681 }
682 }
683
684 #[stable(feature = "rust1", since = "1.0.0")]
685 impl<T> Drop for SyncSender<T> {
686 fn drop(&mut self) {
687 unsafe { (*self.inner.get()).drop_chan(); }
688 }
689 }
690
691 ////////////////////////////////////////////////////////////////////////////////
692 // Receiver
693 ////////////////////////////////////////////////////////////////////////////////
694
695 impl<T> Receiver<T> {
696 fn new(inner: Flavor<T>) -> Receiver<T> {
697 Receiver { inner: UnsafeCell::new(inner) }
698 }
699
700 /// Attempts to return a pending value on this receiver without blocking
701 ///
702 /// This method will never block the caller in order to wait for data to
703 /// become available. Instead, this will always return immediately with a
704 /// possible option of pending data on the channel.
705 ///
706 /// This is useful for a flavor of "optimistic check" before deciding to
707 /// block on a receiver.
708 #[stable(feature = "rust1", since = "1.0.0")]
709 pub fn try_recv(&self) -> Result<T, TryRecvError> {
710 loop {
711 let new_port = match *unsafe { self.inner() } {
712 Flavor::Oneshot(ref p) => {
713 match unsafe { (*p.get()).try_recv() } {
714 Ok(t) => return Ok(t),
715 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
716 Err(oneshot::Disconnected) => {
717 return Err(TryRecvError::Disconnected)
718 }
719 Err(oneshot::Upgraded(rx)) => rx,
720 }
721 }
722 Flavor::Stream(ref p) => {
723 match unsafe { (*p.get()).try_recv() } {
724 Ok(t) => return Ok(t),
725 Err(stream::Empty) => return Err(TryRecvError::Empty),
726 Err(stream::Disconnected) => {
727 return Err(TryRecvError::Disconnected)
728 }
729 Err(stream::Upgraded(rx)) => rx,
730 }
731 }
732 Flavor::Shared(ref p) => {
733 match unsafe { (*p.get()).try_recv() } {
734 Ok(t) => return Ok(t),
735 Err(shared::Empty) => return Err(TryRecvError::Empty),
736 Err(shared::Disconnected) => {
737 return Err(TryRecvError::Disconnected)
738 }
739 }
740 }
741 Flavor::Sync(ref p) => {
742 match unsafe { (*p.get()).try_recv() } {
743 Ok(t) => return Ok(t),
744 Err(sync::Empty) => return Err(TryRecvError::Empty),
745 Err(sync::Disconnected) => {
746 return Err(TryRecvError::Disconnected)
747 }
748 }
749 }
750 };
751 unsafe {
752 mem::swap(self.inner_mut(),
753 new_port.inner_mut());
754 }
755 }
756 }
757
758 /// Attempts to wait for a value on this receiver, returning an error if the
759 /// corresponding channel has hung up.
760 ///
761 /// This function will always block the current thread if there is no data
762 /// available and it's possible for more data to be sent. Once a message is
763 /// sent to the corresponding `Sender`, then this receiver will wake up and
764 /// return that message.
765 ///
766 /// If the corresponding `Sender` has disconnected, or it disconnects while
767 /// this call is blocking, this call will wake up and return `Err` to
768 /// indicate that no more messages can ever be received on this channel.
769 /// However, since channels are buffered, messages sent before the disconnect
770 /// will still be properly received.
771 ///
772 /// # Examples
773 ///
774 /// ```
775 /// use std::sync::mpsc;
776 /// use std::thread;
777 ///
778 /// let (send, recv) = mpsc::channel();
779 /// let handle = thread::spawn(move || {
780 /// send.send(1u8).unwrap();
781 /// });
782 ///
783 /// handle.join().unwrap();
784 ///
785 /// assert_eq!(Ok(1), recv.recv());
786 /// ```
787 ///
788 /// Buffering behavior:
789 ///
790 /// ```
791 /// use std::sync::mpsc;
792 /// use std::thread;
793 /// use std::sync::mpsc::RecvError;
794 ///
795 /// let (send, recv) = mpsc::channel();
796 /// let handle = thread::spawn(move || {
797 /// send.send(1u8).unwrap();
798 /// send.send(2).unwrap();
799 /// send.send(3).unwrap();
800 /// drop(send);
801 /// });
802 ///
803 /// // wait for the thread to join so we ensure the sender is dropped
804 /// handle.join().unwrap();
805 ///
806 /// assert_eq!(Ok(1), recv.recv());
807 /// assert_eq!(Ok(2), recv.recv());
808 /// assert_eq!(Ok(3), recv.recv());
809 /// assert_eq!(Err(RecvError), recv.recv());
810 /// ```
811 #[stable(feature = "rust1", since = "1.0.0")]
812 pub fn recv(&self) -> Result<T, RecvError> {
813 loop {
814 let new_port = match *unsafe { self.inner() } {
815 Flavor::Oneshot(ref p) => {
816 match unsafe { (*p.get()).recv() } {
817 Ok(t) => return Ok(t),
818 Err(oneshot::Empty) => return unreachable!(),
819 Err(oneshot::Disconnected) => return Err(RecvError),
820 Err(oneshot::Upgraded(rx)) => rx,
821 }
822 }
823 Flavor::Stream(ref p) => {
824 match unsafe { (*p.get()).recv() } {
825 Ok(t) => return Ok(t),
826 Err(stream::Empty) => return unreachable!(),
827 Err(stream::Disconnected) => return Err(RecvError),
828 Err(stream::Upgraded(rx)) => rx,
829 }
830 }
831 Flavor::Shared(ref p) => {
832 match unsafe { (*p.get()).recv() } {
833 Ok(t) => return Ok(t),
834 Err(shared::Empty) => return unreachable!(),
835 Err(shared::Disconnected) => return Err(RecvError),
836 }
837 }
838 Flavor::Sync(ref p) => return unsafe {
839 (*p.get()).recv().map_err(|()| RecvError)
840 }
841 };
842 unsafe {
843 mem::swap(self.inner_mut(), new_port.inner_mut());
844 }
845 }
846 }
847
848 /// Returns an iterator that will block waiting for messages, but never
849 /// `panic!`. It will return `None` when the channel has hung up.
850 #[stable(feature = "rust1", since = "1.0.0")]
851 pub fn iter(&self) -> Iter<T> {
852 Iter { rx: self }
853 }
854 }
855
856 impl<T> select::Packet for Receiver<T> {
857 fn can_recv(&self) -> bool {
858 loop {
859 let new_port = match *unsafe { self.inner() } {
860 Flavor::Oneshot(ref p) => {
861 match unsafe { (*p.get()).can_recv() } {
862 Ok(ret) => return ret,
863 Err(upgrade) => upgrade,
864 }
865 }
866 Flavor::Stream(ref p) => {
867 match unsafe { (*p.get()).can_recv() } {
868 Ok(ret) => return ret,
869 Err(upgrade) => upgrade,
870 }
871 }
872 Flavor::Shared(ref p) => {
873 return unsafe { (*p.get()).can_recv() };
874 }
875 Flavor::Sync(ref p) => {
876 return unsafe { (*p.get()).can_recv() };
877 }
878 };
879 unsafe {
880 mem::swap(self.inner_mut(),
881 new_port.inner_mut());
882 }
883 }
884 }
885
886 fn start_selection(&self, mut token: SignalToken) -> StartResult {
887 loop {
888 let (t, new_port) = match *unsafe { self.inner() } {
889 Flavor::Oneshot(ref p) => {
890 match unsafe { (*p.get()).start_selection(token) } {
891 oneshot::SelSuccess => return Installed,
892 oneshot::SelCanceled => return Abort,
893 oneshot::SelUpgraded(t, rx) => (t, rx),
894 }
895 }
896 Flavor::Stream(ref p) => {
897 match unsafe { (*p.get()).start_selection(token) } {
898 stream::SelSuccess => return Installed,
899 stream::SelCanceled => return Abort,
900 stream::SelUpgraded(t, rx) => (t, rx),
901 }
902 }
903 Flavor::Shared(ref p) => {
904 return unsafe { (*p.get()).start_selection(token) };
905 }
906 Flavor::Sync(ref p) => {
907 return unsafe { (*p.get()).start_selection(token) };
908 }
909 };
910 token = t;
911 unsafe {
912 mem::swap(self.inner_mut(), new_port.inner_mut());
913 }
914 }
915 }
916
917 fn abort_selection(&self) -> bool {
918 let mut was_upgrade = false;
919 loop {
920 let result = match *unsafe { self.inner() } {
921 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
922 Flavor::Stream(ref p) => unsafe {
923 (*p.get()).abort_selection(was_upgrade)
924 },
925 Flavor::Shared(ref p) => return unsafe {
926 (*p.get()).abort_selection(was_upgrade)
927 },
928 Flavor::Sync(ref p) => return unsafe {
929 (*p.get()).abort_selection()
930 },
931 };
932 let new_port = match result { Ok(b) => return b, Err(p) => p };
933 was_upgrade = true;
934 unsafe {
935 mem::swap(self.inner_mut(),
936 new_port.inner_mut());
937 }
938 }
939 }
940 }
941
942 #[stable(feature = "rust1", since = "1.0.0")]
943 impl<'a, T> Iterator for Iter<'a, T> {
944 type Item = T;
945
946 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
947 }
948
949 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
950 impl<'a, T> IntoIterator for &'a Receiver<T> {
951 type Item = T;
952 type IntoIter = Iter<'a, T>;
953
954 fn into_iter(self) -> Iter<'a, T> { self.iter() }
955 }
956
957 impl<T> Iterator for IntoIter<T> {
958 type Item = T;
959 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
960 }
961
962 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
963 impl <T> IntoIterator for Receiver<T> {
964 type Item = T;
965 type IntoIter = IntoIter<T>;
966
967 fn into_iter(self) -> IntoIter<T> {
968 IntoIter { rx: self }
969 }
970 }
971
972 #[stable(feature = "rust1", since = "1.0.0")]
973 impl<T> Drop for Receiver<T> {
974 fn drop(&mut self) {
975 match *unsafe { self.inner_mut() } {
976 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
977 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
978 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
979 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
980 }
981 }
982 }
983
984 #[stable(feature = "rust1", since = "1.0.0")]
985 impl<T> fmt::Debug for SendError<T> {
986 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
987 "SendError(..)".fmt(f)
988 }
989 }
990
991 #[stable(feature = "rust1", since = "1.0.0")]
992 impl<T> fmt::Display for SendError<T> {
993 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
994 "sending on a closed channel".fmt(f)
995 }
996 }
997
998 #[stable(feature = "rust1", since = "1.0.0")]
999 impl<T: Send + Reflect> error::Error for SendError<T> {
1000 fn description(&self) -> &str {
1001 "sending on a closed channel"
1002 }
1003
1004 fn cause(&self) -> Option<&error::Error> {
1005 None
1006 }
1007 }
1008
1009 #[stable(feature = "rust1", since = "1.0.0")]
1010 impl<T> fmt::Debug for TrySendError<T> {
1011 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1012 match *self {
1013 TrySendError::Full(..) => "Full(..)".fmt(f),
1014 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1015 }
1016 }
1017 }
1018
1019 #[stable(feature = "rust1", since = "1.0.0")]
1020 impl<T> fmt::Display for TrySendError<T> {
1021 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1022 match *self {
1023 TrySendError::Full(..) => {
1024 "sending on a full channel".fmt(f)
1025 }
1026 TrySendError::Disconnected(..) => {
1027 "sending on a closed channel".fmt(f)
1028 }
1029 }
1030 }
1031 }
1032
1033 #[stable(feature = "rust1", since = "1.0.0")]
1034 impl<T: Send + Reflect> error::Error for TrySendError<T> {
1035
1036 fn description(&self) -> &str {
1037 match *self {
1038 TrySendError::Full(..) => {
1039 "sending on a full channel"
1040 }
1041 TrySendError::Disconnected(..) => {
1042 "sending on a closed channel"
1043 }
1044 }
1045 }
1046
1047 fn cause(&self) -> Option<&error::Error> {
1048 None
1049 }
1050 }
1051
1052 #[stable(feature = "rust1", since = "1.0.0")]
1053 impl fmt::Display for RecvError {
1054 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1055 "receiving on a closed channel".fmt(f)
1056 }
1057 }
1058
1059 #[stable(feature = "rust1", since = "1.0.0")]
1060 impl error::Error for RecvError {
1061
1062 fn description(&self) -> &str {
1063 "receiving on a closed channel"
1064 }
1065
1066 fn cause(&self) -> Option<&error::Error> {
1067 None
1068 }
1069 }
1070
1071 #[stable(feature = "rust1", since = "1.0.0")]
1072 impl fmt::Display for TryRecvError {
1073 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1074 match *self {
1075 TryRecvError::Empty => {
1076 "receiving on an empty channel".fmt(f)
1077 }
1078 TryRecvError::Disconnected => {
1079 "receiving on a closed channel".fmt(f)
1080 }
1081 }
1082 }
1083 }
1084
1085 #[stable(feature = "rust1", since = "1.0.0")]
1086 impl error::Error for TryRecvError {
1087
1088 fn description(&self) -> &str {
1089 match *self {
1090 TryRecvError::Empty => {
1091 "receiving on an empty channel"
1092 }
1093 TryRecvError::Disconnected => {
1094 "receiving on a closed channel"
1095 }
1096 }
1097 }
1098
1099 fn cause(&self) -> Option<&error::Error> {
1100 None
1101 }
1102 }
1103
1104 #[cfg(test)]
1105 mod tests {
1106 use prelude::v1::*;
1107
1108 use env;
1109 use super::*;
1110 use thread;
1111
1112 pub fn stress_factor() -> usize {
1113 match env::var("RUST_TEST_STRESS") {
1114 Ok(val) => val.parse().unwrap(),
1115 Err(..) => 1,
1116 }
1117 }
1118
1119 #[test]
1120 fn smoke() {
1121 let (tx, rx) = channel::<i32>();
1122 tx.send(1).unwrap();
1123 assert_eq!(rx.recv().unwrap(), 1);
1124 }
1125
1126 #[test]
1127 fn drop_full() {
1128 let (tx, _rx) = channel::<Box<isize>>();
1129 tx.send(box 1).unwrap();
1130 }
1131
1132 #[test]
1133 fn drop_full_shared() {
1134 let (tx, _rx) = channel::<Box<isize>>();
1135 drop(tx.clone());
1136 drop(tx.clone());
1137 tx.send(box 1).unwrap();
1138 }
1139
1140 #[test]
1141 fn smoke_shared() {
1142 let (tx, rx) = channel::<i32>();
1143 tx.send(1).unwrap();
1144 assert_eq!(rx.recv().unwrap(), 1);
1145 let tx = tx.clone();
1146 tx.send(1).unwrap();
1147 assert_eq!(rx.recv().unwrap(), 1);
1148 }
1149
1150 #[test]
1151 fn smoke_threads() {
1152 let (tx, rx) = channel::<i32>();
1153 let _t = thread::spawn(move|| {
1154 tx.send(1).unwrap();
1155 });
1156 assert_eq!(rx.recv().unwrap(), 1);
1157 }
1158
1159 #[test]
1160 fn smoke_port_gone() {
1161 let (tx, rx) = channel::<i32>();
1162 drop(rx);
1163 assert!(tx.send(1).is_err());
1164 }
1165
1166 #[test]
1167 fn smoke_shared_port_gone() {
1168 let (tx, rx) = channel::<i32>();
1169 drop(rx);
1170 assert!(tx.send(1).is_err())
1171 }
1172
1173 #[test]
1174 fn smoke_shared_port_gone2() {
1175 let (tx, rx) = channel::<i32>();
1176 drop(rx);
1177 let tx2 = tx.clone();
1178 drop(tx);
1179 assert!(tx2.send(1).is_err());
1180 }
1181
1182 #[test]
1183 fn port_gone_concurrent() {
1184 let (tx, rx) = channel::<i32>();
1185 let _t = thread::spawn(move|| {
1186 rx.recv().unwrap();
1187 });
1188 while tx.send(1).is_ok() {}
1189 }
1190
1191 #[test]
1192 fn port_gone_concurrent_shared() {
1193 let (tx, rx) = channel::<i32>();
1194 let tx2 = tx.clone();
1195 let _t = thread::spawn(move|| {
1196 rx.recv().unwrap();
1197 });
1198 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1199 }
1200
1201 #[test]
1202 fn smoke_chan_gone() {
1203 let (tx, rx) = channel::<i32>();
1204 drop(tx);
1205 assert!(rx.recv().is_err());
1206 }
1207
1208 #[test]
1209 fn smoke_chan_gone_shared() {
1210 let (tx, rx) = channel::<()>();
1211 let tx2 = tx.clone();
1212 drop(tx);
1213 drop(tx2);
1214 assert!(rx.recv().is_err());
1215 }
1216
1217 #[test]
1218 fn chan_gone_concurrent() {
1219 let (tx, rx) = channel::<i32>();
1220 let _t = thread::spawn(move|| {
1221 tx.send(1).unwrap();
1222 tx.send(1).unwrap();
1223 });
1224 while rx.recv().is_ok() {}
1225 }
1226
1227 #[test]
1228 fn stress() {
1229 let (tx, rx) = channel::<i32>();
1230 let t = thread::spawn(move|| {
1231 for _ in 0..10000 { tx.send(1).unwrap(); }
1232 });
1233 for _ in 0..10000 {
1234 assert_eq!(rx.recv().unwrap(), 1);
1235 }
1236 t.join().ok().unwrap();
1237 }
1238
1239 #[test]
1240 fn stress_shared() {
1241 const AMT: u32 = 10000;
1242 const NTHREADS: u32 = 8;
1243 let (tx, rx) = channel::<i32>();
1244
1245 let t = thread::spawn(move|| {
1246 for _ in 0..AMT * NTHREADS {
1247 assert_eq!(rx.recv().unwrap(), 1);
1248 }
1249 match rx.try_recv() {
1250 Ok(..) => panic!(),
1251 _ => {}
1252 }
1253 });
1254
1255 for _ in 0..NTHREADS {
1256 let tx = tx.clone();
1257 thread::spawn(move|| {
1258 for _ in 0..AMT { tx.send(1).unwrap(); }
1259 });
1260 }
1261 drop(tx);
1262 t.join().ok().unwrap();
1263 }
1264
1265 #[test]
1266 fn send_from_outside_runtime() {
1267 let (tx1, rx1) = channel::<()>();
1268 let (tx2, rx2) = channel::<i32>();
1269 let t1 = thread::spawn(move|| {
1270 tx1.send(()).unwrap();
1271 for _ in 0..40 {
1272 assert_eq!(rx2.recv().unwrap(), 1);
1273 }
1274 });
1275 rx1.recv().unwrap();
1276 let t2 = thread::spawn(move|| {
1277 for _ in 0..40 {
1278 tx2.send(1).unwrap();
1279 }
1280 });
1281 t1.join().ok().unwrap();
1282 t2.join().ok().unwrap();
1283 }
1284
1285 #[test]
1286 fn recv_from_outside_runtime() {
1287 let (tx, rx) = channel::<i32>();
1288 let t = thread::spawn(move|| {
1289 for _ in 0..40 {
1290 assert_eq!(rx.recv().unwrap(), 1);
1291 }
1292 });
1293 for _ in 0..40 {
1294 tx.send(1).unwrap();
1295 }
1296 t.join().ok().unwrap();
1297 }
1298
1299 #[test]
1300 fn no_runtime() {
1301 let (tx1, rx1) = channel::<i32>();
1302 let (tx2, rx2) = channel::<i32>();
1303 let t1 = thread::spawn(move|| {
1304 assert_eq!(rx1.recv().unwrap(), 1);
1305 tx2.send(2).unwrap();
1306 });
1307 let t2 = thread::spawn(move|| {
1308 tx1.send(1).unwrap();
1309 assert_eq!(rx2.recv().unwrap(), 2);
1310 });
1311 t1.join().ok().unwrap();
1312 t2.join().ok().unwrap();
1313 }
1314
1315 #[test]
1316 fn oneshot_single_thread_close_port_first() {
1317 // Simple test of closing without sending
1318 let (_tx, rx) = channel::<i32>();
1319 drop(rx);
1320 }
1321
1322 #[test]
1323 fn oneshot_single_thread_close_chan_first() {
1324 // Simple test of closing without sending
1325 let (tx, _rx) = channel::<i32>();
1326 drop(tx);
1327 }
1328
1329 #[test]
1330 fn oneshot_single_thread_send_port_close() {
1331 // Testing that the sender cleans up the payload if receiver is closed
1332 let (tx, rx) = channel::<Box<i32>>();
1333 drop(rx);
1334 assert!(tx.send(box 0).is_err());
1335 }
1336
1337 #[test]
1338 fn oneshot_single_thread_recv_chan_close() {
1339 // Receiving on a closed chan will panic
1340 let res = thread::spawn(move|| {
1341 let (tx, rx) = channel::<i32>();
1342 drop(tx);
1343 rx.recv().unwrap();
1344 }).join();
1345 // What is our res?
1346 assert!(res.is_err());
1347 }
1348
1349 #[test]
1350 fn oneshot_single_thread_send_then_recv() {
1351 let (tx, rx) = channel::<Box<i32>>();
1352 tx.send(box 10).unwrap();
1353 assert!(rx.recv().unwrap() == box 10);
1354 }
1355
1356 #[test]
1357 fn oneshot_single_thread_try_send_open() {
1358 let (tx, rx) = channel::<i32>();
1359 assert!(tx.send(10).is_ok());
1360 assert!(rx.recv().unwrap() == 10);
1361 }
1362
1363 #[test]
1364 fn oneshot_single_thread_try_send_closed() {
1365 let (tx, rx) = channel::<i32>();
1366 drop(rx);
1367 assert!(tx.send(10).is_err());
1368 }
1369
1370 #[test]
1371 fn oneshot_single_thread_try_recv_open() {
1372 let (tx, rx) = channel::<i32>();
1373 tx.send(10).unwrap();
1374 assert!(rx.recv() == Ok(10));
1375 }
1376
1377 #[test]
1378 fn oneshot_single_thread_try_recv_closed() {
1379 let (tx, rx) = channel::<i32>();
1380 drop(tx);
1381 assert!(rx.recv().is_err());
1382 }
1383
1384 #[test]
1385 fn oneshot_single_thread_peek_data() {
1386 let (tx, rx) = channel::<i32>();
1387 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1388 tx.send(10).unwrap();
1389 assert_eq!(rx.try_recv(), Ok(10));
1390 }
1391
1392 #[test]
1393 fn oneshot_single_thread_peek_close() {
1394 let (tx, rx) = channel::<i32>();
1395 drop(tx);
1396 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1397 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1398 }
1399
1400 #[test]
1401 fn oneshot_single_thread_peek_open() {
1402 let (_tx, rx) = channel::<i32>();
1403 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1404 }
1405
1406 #[test]
1407 fn oneshot_multi_task_recv_then_send() {
1408 let (tx, rx) = channel::<Box<i32>>();
1409 let _t = thread::spawn(move|| {
1410 assert!(rx.recv().unwrap() == box 10);
1411 });
1412
1413 tx.send(box 10).unwrap();
1414 }
1415
1416 #[test]
1417 fn oneshot_multi_task_recv_then_close() {
1418 let (tx, rx) = channel::<Box<i32>>();
1419 let _t = thread::spawn(move|| {
1420 drop(tx);
1421 });
1422 let res = thread::spawn(move|| {
1423 assert!(rx.recv().unwrap() == box 10);
1424 }).join();
1425 assert!(res.is_err());
1426 }
1427
1428 #[test]
1429 fn oneshot_multi_thread_close_stress() {
1430 for _ in 0..stress_factor() {
1431 let (tx, rx) = channel::<i32>();
1432 let _t = thread::spawn(move|| {
1433 drop(rx);
1434 });
1435 drop(tx);
1436 }
1437 }
1438
1439 #[test]
1440 fn oneshot_multi_thread_send_close_stress() {
1441 for _ in 0..stress_factor() {
1442 let (tx, rx) = channel::<i32>();
1443 let _t = thread::spawn(move|| {
1444 drop(rx);
1445 });
1446 let _ = thread::spawn(move|| {
1447 tx.send(1).unwrap();
1448 }).join();
1449 }
1450 }
1451
1452 #[test]
1453 fn oneshot_multi_thread_recv_close_stress() {
1454 for _ in 0..stress_factor() {
1455 let (tx, rx) = channel::<i32>();
1456 thread::spawn(move|| {
1457 let res = thread::spawn(move|| {
1458 rx.recv().unwrap();
1459 }).join();
1460 assert!(res.is_err());
1461 });
1462 let _t = thread::spawn(move|| {
1463 thread::spawn(move|| {
1464 drop(tx);
1465 });
1466 });
1467 }
1468 }
1469
1470 #[test]
1471 fn oneshot_multi_thread_send_recv_stress() {
1472 for _ in 0..stress_factor() {
1473 let (tx, rx) = channel::<Box<isize>>();
1474 let _t = thread::spawn(move|| {
1475 tx.send(box 10).unwrap();
1476 });
1477 assert!(rx.recv().unwrap() == box 10);
1478 }
1479 }
1480
1481 #[test]
1482 fn stream_send_recv_stress() {
1483 for _ in 0..stress_factor() {
1484 let (tx, rx) = channel();
1485
1486 send(tx, 0);
1487 recv(rx, 0);
1488
1489 fn send(tx: Sender<Box<i32>>, i: i32) {
1490 if i == 10 { return }
1491
1492 thread::spawn(move|| {
1493 tx.send(box i).unwrap();
1494 send(tx, i + 1);
1495 });
1496 }
1497
1498 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1499 if i == 10 { return }
1500
1501 thread::spawn(move|| {
1502 assert!(rx.recv().unwrap() == box i);
1503 recv(rx, i + 1);
1504 });
1505 }
1506 }
1507 }
1508
1509 #[test]
1510 fn recv_a_lot() {
1511 // Regression test that we don't run out of stack in scheduler context
1512 let (tx, rx) = channel();
1513 for _ in 0..10000 { tx.send(()).unwrap(); }
1514 for _ in 0..10000 { rx.recv().unwrap(); }
1515 }
1516
1517 #[test]
1518 fn shared_chan_stress() {
1519 let (tx, rx) = channel();
1520 let total = stress_factor() + 100;
1521 for _ in 0..total {
1522 let tx = tx.clone();
1523 thread::spawn(move|| {
1524 tx.send(()).unwrap();
1525 });
1526 }
1527
1528 for _ in 0..total {
1529 rx.recv().unwrap();
1530 }
1531 }
1532
1533 #[test]
1534 fn test_nested_recv_iter() {
1535 let (tx, rx) = channel::<i32>();
1536 let (total_tx, total_rx) = channel::<i32>();
1537
1538 let _t = thread::spawn(move|| {
1539 let mut acc = 0;
1540 for x in rx.iter() {
1541 acc += x;
1542 }
1543 total_tx.send(acc).unwrap();
1544 });
1545
1546 tx.send(3).unwrap();
1547 tx.send(1).unwrap();
1548 tx.send(2).unwrap();
1549 drop(tx);
1550 assert_eq!(total_rx.recv().unwrap(), 6);
1551 }
1552
1553 #[test]
1554 fn test_recv_iter_break() {
1555 let (tx, rx) = channel::<i32>();
1556 let (count_tx, count_rx) = channel();
1557
1558 let _t = thread::spawn(move|| {
1559 let mut count = 0;
1560 for x in rx.iter() {
1561 if count >= 3 {
1562 break;
1563 } else {
1564 count += x;
1565 }
1566 }
1567 count_tx.send(count).unwrap();
1568 });
1569
1570 tx.send(2).unwrap();
1571 tx.send(2).unwrap();
1572 tx.send(2).unwrap();
1573 let _ = tx.send(2);
1574 drop(tx);
1575 assert_eq!(count_rx.recv().unwrap(), 4);
1576 }
1577
1578 #[test]
1579 fn test_recv_into_iter_owned() {
1580 let mut iter = {
1581 let (tx, rx) = channel::<i32>();
1582 tx.send(1).unwrap();
1583 tx.send(2).unwrap();
1584
1585 rx.into_iter()
1586 };
1587 assert_eq!(iter.next().unwrap(), 1);
1588 assert_eq!(iter.next().unwrap(), 2);
1589 assert_eq!(iter.next().is_none(), true);
1590 }
1591
1592 #[test]
1593 fn test_recv_into_iter_borrowed() {
1594 let (tx, rx) = channel::<i32>();
1595 tx.send(1).unwrap();
1596 tx.send(2).unwrap();
1597 drop(tx);
1598 let mut iter = (&rx).into_iter();
1599 assert_eq!(iter.next().unwrap(), 1);
1600 assert_eq!(iter.next().unwrap(), 2);
1601 assert_eq!(iter.next().is_none(), true);
1602 }
1603
1604 #[test]
1605 fn try_recv_states() {
1606 let (tx1, rx1) = channel::<i32>();
1607 let (tx2, rx2) = channel::<()>();
1608 let (tx3, rx3) = channel::<()>();
1609 let _t = thread::spawn(move|| {
1610 rx2.recv().unwrap();
1611 tx1.send(1).unwrap();
1612 tx3.send(()).unwrap();
1613 rx2.recv().unwrap();
1614 drop(tx1);
1615 tx3.send(()).unwrap();
1616 });
1617
1618 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1619 tx2.send(()).unwrap();
1620 rx3.recv().unwrap();
1621 assert_eq!(rx1.try_recv(), Ok(1));
1622 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1623 tx2.send(()).unwrap();
1624 rx3.recv().unwrap();
1625 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1626 }
1627
1628 // This bug used to end up in a livelock inside of the Receiver destructor
1629 // because the internal state of the Shared packet was corrupted
1630 #[test]
1631 fn destroy_upgraded_shared_port_when_sender_still_active() {
1632 let (tx, rx) = channel();
1633 let (tx2, rx2) = channel();
1634 let _t = thread::spawn(move|| {
1635 rx.recv().unwrap(); // wait on a oneshot
1636 drop(rx); // destroy a shared
1637 tx2.send(()).unwrap();
1638 });
1639 // make sure the other thread has gone to sleep
1640 for _ in 0..5000 { thread::yield_now(); }
1641
1642 // upgrade to a shared chan and send a message
1643 let t = tx.clone();
1644 drop(tx);
1645 t.send(()).unwrap();
1646
1647 // wait for the child thread to exit before we exit
1648 rx2.recv().unwrap();
1649 }
1650 }
1651
1652 #[cfg(test)]
1653 mod sync_tests {
1654 use prelude::v1::*;
1655
1656 use env;
1657 use thread;
1658 use super::*;
1659
1660 pub fn stress_factor() -> usize {
1661 match env::var("RUST_TEST_STRESS") {
1662 Ok(val) => val.parse().unwrap(),
1663 Err(..) => 1,
1664 }
1665 }
1666
1667 #[test]
1668 fn smoke() {
1669 let (tx, rx) = sync_channel::<i32>(1);
1670 tx.send(1).unwrap();
1671 assert_eq!(rx.recv().unwrap(), 1);
1672 }
1673
1674 #[test]
1675 fn drop_full() {
1676 let (tx, _rx) = sync_channel::<Box<isize>>(1);
1677 tx.send(box 1).unwrap();
1678 }
1679
1680 #[test]
1681 fn smoke_shared() {
1682 let (tx, rx) = sync_channel::<i32>(1);
1683 tx.send(1).unwrap();
1684 assert_eq!(rx.recv().unwrap(), 1);
1685 let tx = tx.clone();
1686 tx.send(1).unwrap();
1687 assert_eq!(rx.recv().unwrap(), 1);
1688 }
1689
1690 #[test]
1691 fn smoke_threads() {
1692 let (tx, rx) = sync_channel::<i32>(0);
1693 let _t = thread::spawn(move|| {
1694 tx.send(1).unwrap();
1695 });
1696 assert_eq!(rx.recv().unwrap(), 1);
1697 }
1698
1699 #[test]
1700 fn smoke_port_gone() {
1701 let (tx, rx) = sync_channel::<i32>(0);
1702 drop(rx);
1703 assert!(tx.send(1).is_err());
1704 }
1705
1706 #[test]
1707 fn smoke_shared_port_gone2() {
1708 let (tx, rx) = sync_channel::<i32>(0);
1709 drop(rx);
1710 let tx2 = tx.clone();
1711 drop(tx);
1712 assert!(tx2.send(1).is_err());
1713 }
1714
1715 #[test]
1716 fn port_gone_concurrent() {
1717 let (tx, rx) = sync_channel::<i32>(0);
1718 let _t = thread::spawn(move|| {
1719 rx.recv().unwrap();
1720 });
1721 while tx.send(1).is_ok() {}
1722 }
1723
1724 #[test]
1725 fn port_gone_concurrent_shared() {
1726 let (tx, rx) = sync_channel::<i32>(0);
1727 let tx2 = tx.clone();
1728 let _t = thread::spawn(move|| {
1729 rx.recv().unwrap();
1730 });
1731 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1732 }
1733
1734 #[test]
1735 fn smoke_chan_gone() {
1736 let (tx, rx) = sync_channel::<i32>(0);
1737 drop(tx);
1738 assert!(rx.recv().is_err());
1739 }
1740
1741 #[test]
1742 fn smoke_chan_gone_shared() {
1743 let (tx, rx) = sync_channel::<()>(0);
1744 let tx2 = tx.clone();
1745 drop(tx);
1746 drop(tx2);
1747 assert!(rx.recv().is_err());
1748 }
1749
1750 #[test]
1751 fn chan_gone_concurrent() {
1752 let (tx, rx) = sync_channel::<i32>(0);
1753 thread::spawn(move|| {
1754 tx.send(1).unwrap();
1755 tx.send(1).unwrap();
1756 });
1757 while rx.recv().is_ok() {}
1758 }
1759
1760 #[test]
1761 fn stress() {
1762 let (tx, rx) = sync_channel::<i32>(0);
1763 thread::spawn(move|| {
1764 for _ in 0..10000 { tx.send(1).unwrap(); }
1765 });
1766 for _ in 0..10000 {
1767 assert_eq!(rx.recv().unwrap(), 1);
1768 }
1769 }
1770
1771 #[test]
1772 fn stress_shared() {
1773 const AMT: u32 = 1000;
1774 const NTHREADS: u32 = 8;
1775 let (tx, rx) = sync_channel::<i32>(0);
1776 let (dtx, drx) = sync_channel::<()>(0);
1777
1778 thread::spawn(move|| {
1779 for _ in 0..AMT * NTHREADS {
1780 assert_eq!(rx.recv().unwrap(), 1);
1781 }
1782 match rx.try_recv() {
1783 Ok(..) => panic!(),
1784 _ => {}
1785 }
1786 dtx.send(()).unwrap();
1787 });
1788
1789 for _ in 0..NTHREADS {
1790 let tx = tx.clone();
1791 thread::spawn(move|| {
1792 for _ in 0..AMT { tx.send(1).unwrap(); }
1793 });
1794 }
1795 drop(tx);
1796 drx.recv().unwrap();
1797 }
1798
1799 #[test]
1800 fn oneshot_single_thread_close_port_first() {
1801 // Simple test of closing without sending
1802 let (_tx, rx) = sync_channel::<i32>(0);
1803 drop(rx);
1804 }
1805
1806 #[test]
1807 fn oneshot_single_thread_close_chan_first() {
1808 // Simple test of closing without sending
1809 let (tx, _rx) = sync_channel::<i32>(0);
1810 drop(tx);
1811 }
1812
1813 #[test]
1814 fn oneshot_single_thread_send_port_close() {
1815 // Testing that the sender cleans up the payload if receiver is closed
1816 let (tx, rx) = sync_channel::<Box<i32>>(0);
1817 drop(rx);
1818 assert!(tx.send(box 0).is_err());
1819 }
1820
1821 #[test]
1822 fn oneshot_single_thread_recv_chan_close() {
1823 // Receiving on a closed chan will panic
1824 let res = thread::spawn(move|| {
1825 let (tx, rx) = sync_channel::<i32>(0);
1826 drop(tx);
1827 rx.recv().unwrap();
1828 }).join();
1829 // What is our res?
1830 assert!(res.is_err());
1831 }
1832
1833 #[test]
1834 fn oneshot_single_thread_send_then_recv() {
1835 let (tx, rx) = sync_channel::<Box<i32>>(1);
1836 tx.send(box 10).unwrap();
1837 assert!(rx.recv().unwrap() == box 10);
1838 }
1839
1840 #[test]
1841 fn oneshot_single_thread_try_send_open() {
1842 let (tx, rx) = sync_channel::<i32>(1);
1843 assert_eq!(tx.try_send(10), Ok(()));
1844 assert!(rx.recv().unwrap() == 10);
1845 }
1846
1847 #[test]
1848 fn oneshot_single_thread_try_send_closed() {
1849 let (tx, rx) = sync_channel::<i32>(0);
1850 drop(rx);
1851 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1852 }
1853
1854 #[test]
1855 fn oneshot_single_thread_try_send_closed2() {
1856 let (tx, _rx) = sync_channel::<i32>(0);
1857 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1858 }
1859
1860 #[test]
1861 fn oneshot_single_thread_try_recv_open() {
1862 let (tx, rx) = sync_channel::<i32>(1);
1863 tx.send(10).unwrap();
1864 assert!(rx.recv() == Ok(10));
1865 }
1866
1867 #[test]
1868 fn oneshot_single_thread_try_recv_closed() {
1869 let (tx, rx) = sync_channel::<i32>(0);
1870 drop(tx);
1871 assert!(rx.recv().is_err());
1872 }
1873
1874 #[test]
1875 fn oneshot_single_thread_peek_data() {
1876 let (tx, rx) = sync_channel::<i32>(1);
1877 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1878 tx.send(10).unwrap();
1879 assert_eq!(rx.try_recv(), Ok(10));
1880 }
1881
1882 #[test]
1883 fn oneshot_single_thread_peek_close() {
1884 let (tx, rx) = sync_channel::<i32>(0);
1885 drop(tx);
1886 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1887 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1888 }
1889
1890 #[test]
1891 fn oneshot_single_thread_peek_open() {
1892 let (_tx, rx) = sync_channel::<i32>(0);
1893 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1894 }
1895
1896 #[test]
1897 fn oneshot_multi_task_recv_then_send() {
1898 let (tx, rx) = sync_channel::<Box<i32>>(0);
1899 let _t = thread::spawn(move|| {
1900 assert!(rx.recv().unwrap() == box 10);
1901 });
1902
1903 tx.send(box 10).unwrap();
1904 }
1905
1906 #[test]
1907 fn oneshot_multi_task_recv_then_close() {
1908 let (tx, rx) = sync_channel::<Box<i32>>(0);
1909 let _t = thread::spawn(move|| {
1910 drop(tx);
1911 });
1912 let res = thread::spawn(move|| {
1913 assert!(rx.recv().unwrap() == box 10);
1914 }).join();
1915 assert!(res.is_err());
1916 }
1917
1918 #[test]
1919 fn oneshot_multi_thread_close_stress() {
1920 for _ in 0..stress_factor() {
1921 let (tx, rx) = sync_channel::<i32>(0);
1922 let _t = thread::spawn(move|| {
1923 drop(rx);
1924 });
1925 drop(tx);
1926 }
1927 }
1928
1929 #[test]
1930 fn oneshot_multi_thread_send_close_stress() {
1931 for _ in 0..stress_factor() {
1932 let (tx, rx) = sync_channel::<i32>(0);
1933 let _t = thread::spawn(move|| {
1934 drop(rx);
1935 });
1936 let _ = thread::spawn(move || {
1937 tx.send(1).unwrap();
1938 }).join();
1939 }
1940 }
1941
1942 #[test]
1943 fn oneshot_multi_thread_recv_close_stress() {
1944 for _ in 0..stress_factor() {
1945 let (tx, rx) = sync_channel::<i32>(0);
1946 let _t = thread::spawn(move|| {
1947 let res = thread::spawn(move|| {
1948 rx.recv().unwrap();
1949 }).join();
1950 assert!(res.is_err());
1951 });
1952 let _t = thread::spawn(move|| {
1953 thread::spawn(move|| {
1954 drop(tx);
1955 });
1956 });
1957 }
1958 }
1959
1960 #[test]
1961 fn oneshot_multi_thread_send_recv_stress() {
1962 for _ in 0..stress_factor() {
1963 let (tx, rx) = sync_channel::<Box<i32>>(0);
1964 let _t = thread::spawn(move|| {
1965 tx.send(box 10).unwrap();
1966 });
1967 assert!(rx.recv().unwrap() == box 10);
1968 }
1969 }
1970
1971 #[test]
1972 fn stream_send_recv_stress() {
1973 for _ in 0..stress_factor() {
1974 let (tx, rx) = sync_channel::<Box<i32>>(0);
1975
1976 send(tx, 0);
1977 recv(rx, 0);
1978
1979 fn send(tx: SyncSender<Box<i32>>, i: i32) {
1980 if i == 10 { return }
1981
1982 thread::spawn(move|| {
1983 tx.send(box i).unwrap();
1984 send(tx, i + 1);
1985 });
1986 }
1987
1988 fn recv(rx: Receiver<Box<i32>>, i: i32) {
1989 if i == 10 { return }
1990
1991 thread::spawn(move|| {
1992 assert!(rx.recv().unwrap() == box i);
1993 recv(rx, i + 1);
1994 });
1995 }
1996 }
1997 }
1998
1999 #[test]
2000 fn recv_a_lot() {
2001 // Regression test that we don't run out of stack in scheduler context
2002 let (tx, rx) = sync_channel(10000);
2003 for _ in 0..10000 { tx.send(()).unwrap(); }
2004 for _ in 0..10000 { rx.recv().unwrap(); }
2005 }
2006
2007 #[test]
2008 fn shared_chan_stress() {
2009 let (tx, rx) = sync_channel(0);
2010 let total = stress_factor() + 100;
2011 for _ in 0..total {
2012 let tx = tx.clone();
2013 thread::spawn(move|| {
2014 tx.send(()).unwrap();
2015 });
2016 }
2017
2018 for _ in 0..total {
2019 rx.recv().unwrap();
2020 }
2021 }
2022
2023 #[test]
2024 fn test_nested_recv_iter() {
2025 let (tx, rx) = sync_channel::<i32>(0);
2026 let (total_tx, total_rx) = sync_channel::<i32>(0);
2027
2028 let _t = thread::spawn(move|| {
2029 let mut acc = 0;
2030 for x in rx.iter() {
2031 acc += x;
2032 }
2033 total_tx.send(acc).unwrap();
2034 });
2035
2036 tx.send(3).unwrap();
2037 tx.send(1).unwrap();
2038 tx.send(2).unwrap();
2039 drop(tx);
2040 assert_eq!(total_rx.recv().unwrap(), 6);
2041 }
2042
2043 #[test]
2044 fn test_recv_iter_break() {
2045 let (tx, rx) = sync_channel::<i32>(0);
2046 let (count_tx, count_rx) = sync_channel(0);
2047
2048 let _t = thread::spawn(move|| {
2049 let mut count = 0;
2050 for x in rx.iter() {
2051 if count >= 3 {
2052 break;
2053 } else {
2054 count += x;
2055 }
2056 }
2057 count_tx.send(count).unwrap();
2058 });
2059
2060 tx.send(2).unwrap();
2061 tx.send(2).unwrap();
2062 tx.send(2).unwrap();
2063 let _ = tx.try_send(2);
2064 drop(tx);
2065 assert_eq!(count_rx.recv().unwrap(), 4);
2066 }
2067
2068 #[test]
2069 fn try_recv_states() {
2070 let (tx1, rx1) = sync_channel::<i32>(1);
2071 let (tx2, rx2) = sync_channel::<()>(1);
2072 let (tx3, rx3) = sync_channel::<()>(1);
2073 let _t = thread::spawn(move|| {
2074 rx2.recv().unwrap();
2075 tx1.send(1).unwrap();
2076 tx3.send(()).unwrap();
2077 rx2.recv().unwrap();
2078 drop(tx1);
2079 tx3.send(()).unwrap();
2080 });
2081
2082 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2083 tx2.send(()).unwrap();
2084 rx3.recv().unwrap();
2085 assert_eq!(rx1.try_recv(), Ok(1));
2086 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2087 tx2.send(()).unwrap();
2088 rx3.recv().unwrap();
2089 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2090 }
2091
2092 // This bug used to end up in a livelock inside of the Receiver destructor
2093 // because the internal state of the Shared packet was corrupted
2094 #[test]
2095 fn destroy_upgraded_shared_port_when_sender_still_active() {
2096 let (tx, rx) = sync_channel::<()>(0);
2097 let (tx2, rx2) = sync_channel::<()>(0);
2098 let _t = thread::spawn(move|| {
2099 rx.recv().unwrap(); // wait on a oneshot
2100 drop(rx); // destroy a shared
2101 tx2.send(()).unwrap();
2102 });
2103 // make sure the other thread has gone to sleep
2104 for _ in 0..5000 { thread::yield_now(); }
2105
2106 // upgrade to a shared chan and send a message
2107 let t = tx.clone();
2108 drop(tx);
2109 t.send(()).unwrap();
2110
2111 // wait for the child thread to exit before we exit
2112 rx2.recv().unwrap();
2113 }
2114
2115 #[test]
2116 fn send1() {
2117 let (tx, rx) = sync_channel::<i32>(0);
2118 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2119 assert_eq!(tx.send(1), Ok(()));
2120 }
2121
2122 #[test]
2123 fn send2() {
2124 let (tx, rx) = sync_channel::<i32>(0);
2125 let _t = thread::spawn(move|| { drop(rx); });
2126 assert!(tx.send(1).is_err());
2127 }
2128
2129 #[test]
2130 fn send3() {
2131 let (tx, rx) = sync_channel::<i32>(1);
2132 assert_eq!(tx.send(1), Ok(()));
2133 let _t =thread::spawn(move|| { drop(rx); });
2134 assert!(tx.send(1).is_err());
2135 }
2136
2137 #[test]
2138 fn send4() {
2139 let (tx, rx) = sync_channel::<i32>(0);
2140 let tx2 = tx.clone();
2141 let (done, donerx) = channel();
2142 let done2 = done.clone();
2143 let _t = thread::spawn(move|| {
2144 assert!(tx.send(1).is_err());
2145 done.send(()).unwrap();
2146 });
2147 let _t = thread::spawn(move|| {
2148 assert!(tx2.send(2).is_err());
2149 done2.send(()).unwrap();
2150 });
2151 drop(rx);
2152 donerx.recv().unwrap();
2153 donerx.recv().unwrap();
2154 }
2155
2156 #[test]
2157 fn try_send1() {
2158 let (tx, _rx) = sync_channel::<i32>(0);
2159 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2160 }
2161
2162 #[test]
2163 fn try_send2() {
2164 let (tx, _rx) = sync_channel::<i32>(1);
2165 assert_eq!(tx.try_send(1), Ok(()));
2166 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2167 }
2168
2169 #[test]
2170 fn try_send3() {
2171 let (tx, rx) = sync_channel::<i32>(1);
2172 assert_eq!(tx.try_send(1), Ok(()));
2173 drop(rx);
2174 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2175 }
2176
2177 #[test]
2178 fn issue_15761() {
2179 fn repro() {
2180 let (tx1, rx1) = sync_channel::<()>(3);
2181 let (tx2, rx2) = sync_channel::<()>(3);
2182
2183 let _t = thread::spawn(move|| {
2184 rx1.recv().unwrap();
2185 tx2.try_send(()).unwrap();
2186 });
2187
2188 tx1.try_send(()).unwrap();
2189 rx2.recv().unwrap();
2190 }
2191
2192 for _ in 0..100 {
2193 repro()
2194 }
2195 }
2196 }