]> git.proxmox.com Git - rustc.git/blob - src/libstd/sync/mpsc/mod.rs
New upstream version 1.19.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * [`Sender`]
17 //! * [`SyncSender`]
18 //! * [`Receiver`]
19 //!
20 //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
29 //! infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The [`sync_channel`] function will
32 //! return a `(SyncSender, Receiver)` tuple where the storage for pending
33 //! messages is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
36 //! channel where each sender atomically hands off a message to a receiver.
37 //!
38 //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
39 //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
40 //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
41 //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
42 //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
43 //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
44 //!
45 //! ## Disconnection
46 //!
47 //! The send and receive operations on channels will all return a [`Result`]
48 //! indicating whether the operation succeeded or not. An unsuccessful operation
49 //! is normally indicative of the other half of a channel having "hung up" by
50 //! being dropped in its corresponding thread.
51 //!
52 //! Once half of a channel has been deallocated, most operations can no longer
53 //! continue to make progress, so [`Err`] will be returned. Many applications
54 //! will continue to [`unwrap`] the results returned from this module,
55 //! instigating a propagation of failure among threads if one unexpectedly dies.
56 //!
57 //! [`Result`]: ../../../std/result/enum.Result.html
58 //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
59 //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
60 //!
61 //! # Examples
62 //!
63 //! Simple usage:
64 //!
65 //! ```
66 //! use std::thread;
67 //! use std::sync::mpsc::channel;
68 //!
69 //! // Create a simple streaming channel
70 //! let (tx, rx) = channel();
71 //! thread::spawn(move|| {
72 //! tx.send(10).unwrap();
73 //! });
74 //! assert_eq!(rx.recv().unwrap(), 10);
75 //! ```
76 //!
77 //! Shared usage:
78 //!
79 //! ```
80 //! use std::thread;
81 //! use std::sync::mpsc::channel;
82 //!
83 //! // Create a shared channel that can be sent along from many threads
84 //! // where tx is the sending half (tx for transmission), and rx is the receiving
85 //! // half (rx for receiving).
86 //! let (tx, rx) = channel();
87 //! for i in 0..10 {
88 //! let tx = tx.clone();
89 //! thread::spawn(move|| {
90 //! tx.send(i).unwrap();
91 //! });
92 //! }
93 //!
94 //! for _ in 0..10 {
95 //! let j = rx.recv().unwrap();
96 //! assert!(0 <= j && j < 10);
97 //! }
98 //! ```
99 //!
100 //! Propagating panics:
101 //!
102 //! ```
103 //! use std::sync::mpsc::channel;
104 //!
105 //! // The call to recv() will return an error because the channel has already
106 //! // hung up (or been deallocated)
107 //! let (tx, rx) = channel::<i32>();
108 //! drop(tx);
109 //! assert!(rx.recv().is_err());
110 //! ```
111 //!
112 //! Synchronous channels:
113 //!
114 //! ```
115 //! use std::thread;
116 //! use std::sync::mpsc::sync_channel;
117 //!
118 //! let (tx, rx) = sync_channel::<i32>(0);
119 //! thread::spawn(move|| {
120 //! // This will wait for the parent thread to start receiving
121 //! tx.send(53).unwrap();
122 //! });
123 //! rx.recv().unwrap();
124 //! ```
125
126 #![stable(feature = "rust1", since = "1.0.0")]
127
128 // A description of how Rust's channel implementation works
129 //
130 // Channels are supposed to be the basic building block for all other
131 // concurrent primitives that are used in Rust. As a result, the channel type
132 // needs to be highly optimized, flexible, and broad enough for use everywhere.
133 //
134 // The choice of implementation of all channels is to be built on lock-free data
135 // structures. The channels themselves are then consequently also lock-free data
136 // structures. As always with lock-free code, this is a very "here be dragons"
137 // territory, especially because I'm unaware of any academic papers that have
138 // gone into great length about channels of these flavors.
139 //
140 // ## Flavors of channels
141 //
142 // From the perspective of a consumer of this library, there is only one flavor
143 // of channel. This channel can be used as a stream and cloned to allow multiple
144 // senders. Under the hood, however, there are actually three flavors of
145 // channels in play.
146 //
147 // * Flavor::Oneshots - these channels are highly optimized for the one-send use
148 // case. They contain as few atomics as possible and
149 // involve one and exactly one allocation.
150 // * Streams - these channels are optimized for the non-shared use case. They
151 // use a different concurrent queue that is more tailored for this
152 // use case. The initial allocation of this flavor of channel is not
153 // optimized.
154 // * Shared - this is the most general form of channel that this module offers,
155 // a channel with multiple senders. This type is as optimized as it
156 // can be, but the previous two types mentioned are much faster for
157 // their use-cases.
158 //
159 // ## Concurrent queues
160 //
161 // The basic idea of Rust's Sender/Receiver types is that send() never blocks,
162 // but recv() obviously blocks. This means that under the hood there must be
163 // some shared and concurrent queue holding all of the actual data.
164 //
165 // With two flavors of channels, two flavors of queues are also used. We have
166 // chosen to use queues from a well-known author that are abbreviated as SPSC
167 // and MPSC (single producer, single consumer and multiple producer, single
168 // consumer). SPSC queues are used for streams while MPSC queues are used for
169 // shared channels.
170 //
171 // ### SPSC optimizations
172 //
173 // The SPSC queue found online is essentially a linked list of nodes where one
174 // half of the nodes are the "queue of data" and the other half of nodes are a
175 // cache of unused nodes. The unused nodes are used such that an allocation is
176 // not required on every push() and a free doesn't need to happen on every
177 // pop().
178 //
179 // As found online, however, the cache of nodes is of an infinite size. This
180 // means that if a channel at one point in its life had 50k items in the queue,
181 // then the queue will always have the capacity for 50k items. I believed that
182 // this was an unnecessary limitation of the implementation, so I have altered
183 // the queue to optionally have a bound on the cache size.
184 //
185 // By default, streams will have an unbounded SPSC queue with a small-ish cache
186 // size. The hope is that the cache is still large enough to have very fast
187 // send() operations while not too large such that millions of channels can
188 // coexist at once.
189 //
190 // ### MPSC optimizations
191 //
192 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
193 // a linked list under the hood to earn its unboundedness, but I have not put
194 // forth much effort into having a cache of nodes similar to the SPSC queue.
195 //
196 // For now, I believe that this is "ok" because shared channels are not the most
197 // common type, but soon we may wish to revisit this queue choice and determine
198 // another candidate for backend storage of shared channels.
199 //
200 // ## Overview of the Implementation
201 //
202 // Now that there's a little background on the concurrent queues used, it's
203 // worth going into much more detail about the channels themselves. The basic
204 // pseudocode for a send/recv are:
205 //
206 //
207 // send(t) recv()
208 // queue.push(t) return if queue.pop()
209 // if increment() == -1 deschedule {
210 // wakeup() if decrement() > 0
211 // cancel_deschedule()
212 // }
213 // queue.pop()
214 //
215 // As mentioned before, there are no locks in this implementation, only atomic
216 // instructions are used.
217 //
218 // ### The internal atomic counter
219 //
220 // Every channel has a shared counter with each half to keep track of the size
221 // of the queue. This counter is used to abort descheduling by the receiver and
222 // to know when to wake up on the sending side.
223 //
224 // As seen in the pseudocode, senders will increment this count and receivers
225 // will decrement the count. The theory behind this is that if a sender sees a
226 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
227 // then it doesn't need to block.
228 //
229 // The recv() method has a beginning call to pop(), and if successful, it needs
230 // to decrement the count. It is a crucial implementation detail that this
231 // decrement does *not* happen to the shared counter. If this were the case,
232 // then it would be possible for the counter to be very negative when there were
233 // no receivers waiting, in which case the senders would have to determine when
234 // it was actually appropriate to wake up a receiver.
235 //
236 // Instead, the "steal count" is kept track of separately (not atomically
237 // because it's only used by receivers), and then the decrement() call when
238 // descheduling will lump in all of the recent steals into one large decrement.
239 //
240 // The implication of this is that if a sender sees a -1 count, then there's
241 // guaranteed to be a waiter waiting!
242 //
243 // ## Native Implementation
244 //
245 // A major goal of these channels is to work seamlessly on and off the runtime.
246 // All of the previous race conditions have been worded in terms of
247 // scheduler-isms (which is obviously not available without the runtime).
248 //
249 // For now, native usage of channels (off the runtime) will fall back onto
250 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
251 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
252 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
253 // condition variable.
254 //
255 // ## Select
256 //
257 // Being able to support selection over channels has greatly influenced this
258 // design, and not only does selection need to work inside the runtime, but also
259 // outside the runtime.
260 //
261 // The implementation is fairly straightforward. The goal of select() is not to
262 // return some data, but only to return which channel can receive data without
263 // blocking. The implementation is essentially the entire blocking procedure
264 // followed by an increment as soon as its woken up. The cancellation procedure
265 // involves an increment and swapping out of to_wake to acquire ownership of the
266 // thread to unblock.
267 //
268 // Sadly this current implementation requires multiple allocations, so I have
269 // seen the throughput of select() be much worse than it should be. I do not
270 // believe that there is anything fundamental that needs to change about these
271 // channels, however, in order to support a more efficient select().
272 //
273 // # Conclusion
274 //
275 // And now that you've seen all the races that I found and attempted to fix,
276 // here's the code for you to find some more!
277
278 use sync::Arc;
279 use error;
280 use fmt;
281 use mem;
282 use cell::UnsafeCell;
283 use time::{Duration, Instant};
284
285 #[unstable(feature = "mpsc_select", issue = "27800")]
286 pub use self::select::{Select, Handle};
287 use self::select::StartResult;
288 use self::select::StartResult::*;
289 use self::blocking::SignalToken;
290
291 mod blocking;
292 mod oneshot;
293 mod select;
294 mod shared;
295 mod stream;
296 mod sync;
297 mod mpsc_queue;
298 mod spsc_queue;
299
300 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
301 /// This half can only be owned by one thread.
302 ///
303 /// Messages sent to the channel can be retrieved using [`recv`].
304 ///
305 /// [`channel`]: fn.channel.html
306 /// [`sync_channel`]: fn.sync_channel.html
307 /// [`recv`]: struct.Receiver.html#method.recv
308 ///
309 /// # Examples
310 ///
311 /// ```rust
312 /// use std::sync::mpsc::channel;
313 /// use std::thread;
314 /// use std::time::Duration;
315 ///
316 /// let (send, recv) = channel();
317 ///
318 /// thread::spawn(move || {
319 /// send.send("Hello world!").unwrap();
320 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
321 /// send.send("Delayed for 2 seconds").unwrap();
322 /// });
323 ///
324 /// println!("{}", recv.recv().unwrap()); // Received immediately
325 /// println!("Waiting...");
326 /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
327 /// ```
328 #[stable(feature = "rust1", since = "1.0.0")]
329 pub struct Receiver<T> {
330 inner: UnsafeCell<Flavor<T>>,
331 }
332
333 // The receiver port can be sent from place to place, so long as it
334 // is not used to receive non-sendable things.
335 #[stable(feature = "rust1", since = "1.0.0")]
336 unsafe impl<T: Send> Send for Receiver<T> { }
337
338 #[stable(feature = "rust1", since = "1.0.0")]
339 impl<T> !Sync for Receiver<T> { }
340
341 /// An iterator over messages on a [`Receiver`], created by [`iter`].
342 ///
343 /// This iterator will block whenever [`next`] is called,
344 /// waiting for a new message, and [`None`] will be returned
345 /// when the corresponding channel has hung up.
346 ///
347 /// [`iter`]: struct.Receiver.html#method.iter
348 /// [`Receiver`]: struct.Receiver.html
349 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
350 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
351 ///
352 /// # Examples
353 ///
354 /// ```rust
355 /// use std::sync::mpsc::channel;
356 /// use std::thread;
357 ///
358 /// let (send, recv) = channel();
359 ///
360 /// thread::spawn(move || {
361 /// send.send(1u8).unwrap();
362 /// send.send(2u8).unwrap();
363 /// send.send(3u8).unwrap();
364 /// });
365 ///
366 /// for x in recv.iter() {
367 /// println!("Got: {}", x);
368 /// }
369 /// ```
370 #[stable(feature = "rust1", since = "1.0.0")]
371 #[derive(Debug)]
372 pub struct Iter<'a, T: 'a> {
373 rx: &'a Receiver<T>
374 }
375
376 /// An iterator that attempts to yield all pending values for a [`Receiver`],
377 /// created by [`try_iter`].
378 ///
379 /// [`None`] will be returned when there are no pending values remaining or
380 /// if the corresponding channel has hung up.
381 ///
382 /// This iterator will never block the caller in order to wait for data to
383 /// become available. Instead, it will return [`None`].
384 ///
385 /// [`Receiver`]: struct.Receiver.html
386 /// [`try_iter`]: struct.Receiver.html#method.try_iter
387 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
388 ///
389 /// # Examples
390 ///
391 /// ```rust
392 /// use std::sync::mpsc::channel;
393 /// use std::thread;
394 /// use std::time::Duration;
395 ///
396 /// let (sender, receiver) = channel();
397 ///
398 /// // Nothing is in the buffer yet
399 /// assert!(receiver.try_iter().next().is_none());
400 /// println!("Nothing in the buffer...");
401 ///
402 /// thread::spawn(move || {
403 /// sender.send(1).unwrap();
404 /// sender.send(2).unwrap();
405 /// sender.send(3).unwrap();
406 /// });
407 ///
408 /// println!("Going to sleep...");
409 /// thread::sleep(Duration::from_secs(2)); // block for two seconds
410 ///
411 /// for x in receiver.try_iter() {
412 /// println!("Got: {}", x);
413 /// }
414 /// ```
415 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
416 #[derive(Debug)]
417 pub struct TryIter<'a, T: 'a> {
418 rx: &'a Receiver<T>
419 }
420
421 /// An owning iterator over messages on a [`Receiver`],
422 /// created by **Receiver::into_iter**.
423 ///
424 /// This iterator will block whenever [`next`]
425 /// is called, waiting for a new message, and [`None`] will be
426 /// returned if the corresponding channel has hung up.
427 ///
428 /// [`Receiver`]: struct.Receiver.html
429 /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
430 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
431 ///
432 /// # Examples
433 ///
434 /// ```rust
435 /// use std::sync::mpsc::channel;
436 /// use std::thread;
437 ///
438 /// let (send, recv) = channel();
439 ///
440 /// thread::spawn(move || {
441 /// send.send(1u8).unwrap();
442 /// send.send(2u8).unwrap();
443 /// send.send(3u8).unwrap();
444 /// });
445 ///
446 /// for x in recv.into_iter() {
447 /// println!("Got: {}", x);
448 /// }
449 /// ```
450 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
451 #[derive(Debug)]
452 pub struct IntoIter<T> {
453 rx: Receiver<T>
454 }
455
456 /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
457 /// owned by one thread, but it can be cloned to send to other threads.
458 ///
459 /// Messages can be sent through this channel with [`send`].
460 ///
461 /// [`channel`]: fn.channel.html
462 /// [`send`]: struct.Sender.html#method.send
463 ///
464 /// # Examples
465 ///
466 /// ```rust
467 /// use std::sync::mpsc::channel;
468 /// use std::thread;
469 ///
470 /// let (sender, receiver) = channel();
471 /// let sender2 = sender.clone();
472 ///
473 /// // First thread owns sender
474 /// thread::spawn(move || {
475 /// sender.send(1).unwrap();
476 /// });
477 ///
478 /// // Second thread owns sender2
479 /// thread::spawn(move || {
480 /// sender2.send(2).unwrap();
481 /// });
482 ///
483 /// let msg = receiver.recv().unwrap();
484 /// let msg2 = receiver.recv().unwrap();
485 ///
486 /// assert_eq!(3, msg + msg2);
487 /// ```
488 #[stable(feature = "rust1", since = "1.0.0")]
489 pub struct Sender<T> {
490 inner: UnsafeCell<Flavor<T>>,
491 }
492
493 // The send port can be sent from place to place, so long as it
494 // is not used to send non-sendable things.
495 #[stable(feature = "rust1", since = "1.0.0")]
496 unsafe impl<T: Send> Send for Sender<T> { }
497
498 #[stable(feature = "rust1", since = "1.0.0")]
499 impl<T> !Sync for Sender<T> { }
500
501 /// The sending-half of Rust's synchronous [`sync_channel`] type.
502 /// This half can only be owned by one thread, but it can be cloned
503 /// to send to other threads.
504 ///
505 /// Messages can be sent through this channel with [`send`] or [`try_send`].
506 ///
507 /// [`send`] will block if there is no space in the internal buffer.
508 ///
509 /// [`sync_channel`]: fn.sync_channel.html
510 /// [`send`]: struct.SyncSender.html#method.send
511 /// [`try_send`]: struct.SyncSender.html#method.try_send
512 ///
513 /// # Examples
514 ///
515 /// ```rust
516 /// use std::sync::mpsc::sync_channel;
517 /// use std::thread;
518 ///
519 /// // Create a sync_channel with buffer size 2
520 /// let (sync_sender, receiver) = sync_channel(2);
521 /// let sync_sender2 = sync_sender.clone();
522 ///
523 /// // First thread owns sync_sender
524 /// thread::spawn(move || {
525 /// sync_sender.send(1).unwrap();
526 /// sync_sender.send(2).unwrap();
527 /// });
528 ///
529 /// // Second thread owns sync_sender2
530 /// thread::spawn(move || {
531 /// sync_sender2.send(3).unwrap();
532 /// // thread will now block since the buffer is full
533 /// println!("Thread unblocked!");
534 /// });
535 ///
536 /// let mut msg;
537 ///
538 /// msg = receiver.recv().unwrap();
539 /// println!("message {} received", msg);
540 ///
541 /// // "Thread unblocked!" will be printed now
542 ///
543 /// msg = receiver.recv().unwrap();
544 /// println!("message {} received", msg);
545 ///
546 /// msg = receiver.recv().unwrap();
547 ///
548 /// println!("message {} received", msg);
549 /// ```
550 #[stable(feature = "rust1", since = "1.0.0")]
551 pub struct SyncSender<T> {
552 inner: Arc<sync::Packet<T>>,
553 }
554
555 #[stable(feature = "rust1", since = "1.0.0")]
556 unsafe impl<T: Send> Send for SyncSender<T> {}
557
558 #[stable(feature = "rust1", since = "1.0.0")]
559 impl<T> !Sync for SyncSender<T> {}
560
561 /// An error returned from the [`Sender::send`] or [`SyncSender::send`]
562 /// function on **channel**s.
563 ///
564 /// A **send** operation can only fail if the receiving end of a channel is
565 /// disconnected, implying that the data could never be received. The error
566 /// contains the data being sent as a payload so it can be recovered.
567 ///
568 /// [`Sender::send`]: struct.Sender.html#method.send
569 /// [`SyncSender::send`]: struct.SyncSender.html#method.send
570 #[stable(feature = "rust1", since = "1.0.0")]
571 #[derive(PartialEq, Eq, Clone, Copy)]
572 pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
573
574 /// An error returned from the [`recv`] function on a [`Receiver`].
575 ///
576 /// The [`recv`] operation can only fail if the sending half of a
577 /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
578 /// messages will ever be received.
579 ///
580 /// [`recv`]: struct.Receiver.html#method.recv
581 /// [`Receiver`]: struct.Receiver.html
582 /// [`channel`]: fn.channel.html
583 /// [`sync_channel`]: fn.sync_channel.html
584 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
585 #[stable(feature = "rust1", since = "1.0.0")]
586 pub struct RecvError;
587
588 /// This enumeration is the list of the possible reasons that [`try_recv`] could
589 /// not return data when called. This can occur with both a [`channel`] and
590 /// a [`sync_channel`].
591 ///
592 /// [`try_recv`]: struct.Receiver.html#method.try_recv
593 /// [`channel`]: fn.channel.html
594 /// [`sync_channel`]: fn.sync_channel.html
595 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
596 #[stable(feature = "rust1", since = "1.0.0")]
597 pub enum TryRecvError {
598 /// This **channel** is currently empty, but the **Sender**(s) have not yet
599 /// disconnected, so data may yet become available.
600 #[stable(feature = "rust1", since = "1.0.0")]
601 Empty,
602
603 /// The **channel**'s sending half has become disconnected, and there will
604 /// never be any more data received on it.
605 #[stable(feature = "rust1", since = "1.0.0")]
606 Disconnected,
607 }
608
609 /// This enumeration is the list of possible errors that made [`recv_timeout`]
610 /// unable to return data when called. This can occur with both a [`channel`] and
611 /// a [`sync_channel`].
612 ///
613 /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
614 /// [`channel`]: fn.channel.html
615 /// [`sync_channel`]: fn.sync_channel.html
616 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
617 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
618 pub enum RecvTimeoutError {
619 /// This **channel** is currently empty, but the **Sender**(s) have not yet
620 /// disconnected, so data may yet become available.
621 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
622 Timeout,
623 /// The **channel**'s sending half has become disconnected, and there will
624 /// never be any more data received on it.
625 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
626 Disconnected,
627 }
628
629 /// This enumeration is the list of the possible error outcomes for the
630 /// [`try_send`] method.
631 ///
632 /// [`try_send`]: struct.SyncSender.html#method.try_send
633 #[stable(feature = "rust1", since = "1.0.0")]
634 #[derive(PartialEq, Eq, Clone, Copy)]
635 pub enum TrySendError<T> {
636 /// The data could not be sent on the [`sync_channel`] because it would require that
637 /// the callee block to send the data.
638 ///
639 /// If this is a buffered channel, then the buffer is full at this time. If
640 /// this is not a buffered channel, then there is no [`Receiver`] available to
641 /// acquire the data.
642 ///
643 /// [`sync_channel`]: fn.sync_channel.html
644 /// [`Receiver`]: struct.Receiver.html
645 #[stable(feature = "rust1", since = "1.0.0")]
646 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
647
648 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
649 /// sent. The data is returned back to the callee in this case.
650 ///
651 /// [`sync_channel`]: fn.sync_channel.html
652 #[stable(feature = "rust1", since = "1.0.0")]
653 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
654 }
655
656 enum Flavor<T> {
657 Oneshot(Arc<oneshot::Packet<T>>),
658 Stream(Arc<stream::Packet<T>>),
659 Shared(Arc<shared::Packet<T>>),
660 Sync(Arc<sync::Packet<T>>),
661 }
662
663 #[doc(hidden)]
664 trait UnsafeFlavor<T> {
665 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
666 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
667 &mut *self.inner_unsafe().get()
668 }
669 unsafe fn inner(&self) -> &Flavor<T> {
670 &*self.inner_unsafe().get()
671 }
672 }
673 impl<T> UnsafeFlavor<T> for Sender<T> {
674 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
675 &self.inner
676 }
677 }
678 impl<T> UnsafeFlavor<T> for Receiver<T> {
679 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
680 &self.inner
681 }
682 }
683
684 /// Creates a new asynchronous channel, returning the sender/receiver halves.
685 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
686 /// the same order as it was sent, and no [`send`] will block the calling thread
687 /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
688 /// block after its buffer limit is reached). [`recv`] will block until a message
689 /// is available.
690 ///
691 /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
692 /// only one [`Receiver`] is supported.
693 ///
694 /// If the [`Receiver`] is disconnected while trying to [`send`] with the
695 /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If the
696 /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
697 /// return a [`RecvError`].
698 ///
699 /// [`send`]: struct.Sender.html#method.send
700 /// [`recv`]: struct.Receiver.html#method.recv
701 /// [`Sender`]: struct.Sender.html
702 /// [`Receiver`]: struct.Receiver.html
703 /// [`sync_channel`]: fn.sync_channel.html
704 /// [`SendError`]: struct.SendError.html
705 /// [`RecvError`]: struct.RecvError.html
706 ///
707 /// # Examples
708 ///
709 /// ```
710 /// use std::sync::mpsc::channel;
711 /// use std::thread;
712 ///
713 /// let (sender, receiver) = channel();
714 ///
715 /// // Spawn off an expensive computation
716 /// thread::spawn(move|| {
717 /// # fn expensive_computation() {}
718 /// sender.send(expensive_computation()).unwrap();
719 /// });
720 ///
721 /// // Do some useful work for awhile
722 ///
723 /// // Let's see what that answer was
724 /// println!("{:?}", receiver.recv().unwrap());
725 /// ```
726 #[stable(feature = "rust1", since = "1.0.0")]
727 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
728 let a = Arc::new(oneshot::Packet::new());
729 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
730 }
731
732 /// Creates a new synchronous, bounded channel.
733 /// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
734 /// in the same order as it was sent. Like asynchronous [`channel`]s, the
735 /// [`Receiver`] will block until a message becomes available. `sync_channel`
736 /// differs greatly in the semantics of the sender, however.
737 ///
738 /// This channel has an internal buffer on which messages will be queued.
739 /// `bound` specifies the buffer size. When the internal buffer becomes full,
740 /// future sends will *block* waiting for the buffer to open up. Note that a
741 /// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
742 /// where each [`send`] will not return until a [`recv`] is paired with it.
743 ///
744 /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
745 /// times, but only one [`Receiver`] is supported.
746 ///
747 /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
748 /// to [`send`] with the [`SyncSender`], the [`send`] method will return a
749 /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
750 /// to [`recv`], the [`recv`] method will return a [`RecvError`].
751 ///
752 /// [`channel`]: fn.channel.html
753 /// [`send`]: struct.SyncSender.html#method.send
754 /// [`recv`]: struct.Receiver.html#method.recv
755 /// [`SyncSender`]: struct.SyncSender.html
756 /// [`Receiver`]: struct.Receiver.html
757 /// [`SendError`]: struct.SendError.html
758 /// [`RecvError`]: struct.RecvError.html
759 ///
760 /// # Examples
761 ///
762 /// ```
763 /// use std::sync::mpsc::sync_channel;
764 /// use std::thread;
765 ///
766 /// let (sender, receiver) = sync_channel(1);
767 ///
768 /// // this returns immediately
769 /// sender.send(1).unwrap();
770 ///
771 /// thread::spawn(move|| {
772 /// // this will block until the previous message has been received
773 /// sender.send(2).unwrap();
774 /// });
775 ///
776 /// assert_eq!(receiver.recv().unwrap(), 1);
777 /// assert_eq!(receiver.recv().unwrap(), 2);
778 /// ```
779 #[stable(feature = "rust1", since = "1.0.0")]
780 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
781 let a = Arc::new(sync::Packet::new(bound));
782 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
783 }
784
785 ////////////////////////////////////////////////////////////////////////////////
786 // Sender
787 ////////////////////////////////////////////////////////////////////////////////
788
789 impl<T> Sender<T> {
790 fn new(inner: Flavor<T>) -> Sender<T> {
791 Sender {
792 inner: UnsafeCell::new(inner),
793 }
794 }
795
796 /// Attempts to send a value on this channel, returning it back if it could
797 /// not be sent.
798 ///
799 /// A successful send occurs when it is determined that the other end of
800 /// the channel has not hung up already. An unsuccessful send would be one
801 /// where the corresponding receiver has already been deallocated. Note
802 /// that a return value of [`Err`] means that the data will never be
803 /// received, but a return value of [`Ok`] does *not* mean that the data
804 /// will be received. It is possible for the corresponding receiver to
805 /// hang up immediately after this function returns [`Ok`].
806 ///
807 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
808 /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
809 ///
810 /// This method will never block the current thread.
811 ///
812 /// # Examples
813 ///
814 /// ```
815 /// use std::sync::mpsc::channel;
816 ///
817 /// let (tx, rx) = channel();
818 ///
819 /// // This send is always successful
820 /// tx.send(1).unwrap();
821 ///
822 /// // This send will fail because the receiver is gone
823 /// drop(rx);
824 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
825 /// ```
826 #[stable(feature = "rust1", since = "1.0.0")]
827 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
828 let (new_inner, ret) = match *unsafe { self.inner() } {
829 Flavor::Oneshot(ref p) => {
830 if !p.sent() {
831 return p.send(t).map_err(SendError);
832 } else {
833 let a = Arc::new(stream::Packet::new());
834 let rx = Receiver::new(Flavor::Stream(a.clone()));
835 match p.upgrade(rx) {
836 oneshot::UpSuccess => {
837 let ret = a.send(t);
838 (a, ret)
839 }
840 oneshot::UpDisconnected => (a, Err(t)),
841 oneshot::UpWoke(token) => {
842 // This send cannot panic because the thread is
843 // asleep (we're looking at it), so the receiver
844 // can't go away.
845 a.send(t).ok().unwrap();
846 token.signal();
847 (a, Ok(()))
848 }
849 }
850 }
851 }
852 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
853 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
854 Flavor::Sync(..) => unreachable!(),
855 };
856
857 unsafe {
858 let tmp = Sender::new(Flavor::Stream(new_inner));
859 mem::swap(self.inner_mut(), tmp.inner_mut());
860 }
861 ret.map_err(SendError)
862 }
863 }
864
865 #[stable(feature = "rust1", since = "1.0.0")]
866 impl<T> Clone for Sender<T> {
867 fn clone(&self) -> Sender<T> {
868 let packet = match *unsafe { self.inner() } {
869 Flavor::Oneshot(ref p) => {
870 let a = Arc::new(shared::Packet::new());
871 {
872 let guard = a.postinit_lock();
873 let rx = Receiver::new(Flavor::Shared(a.clone()));
874 let sleeper = match p.upgrade(rx) {
875 oneshot::UpSuccess |
876 oneshot::UpDisconnected => None,
877 oneshot::UpWoke(task) => Some(task),
878 };
879 a.inherit_blocker(sleeper, guard);
880 }
881 a
882 }
883 Flavor::Stream(ref p) => {
884 let a = Arc::new(shared::Packet::new());
885 {
886 let guard = a.postinit_lock();
887 let rx = Receiver::new(Flavor::Shared(a.clone()));
888 let sleeper = match p.upgrade(rx) {
889 stream::UpSuccess |
890 stream::UpDisconnected => None,
891 stream::UpWoke(task) => Some(task),
892 };
893 a.inherit_blocker(sleeper, guard);
894 }
895 a
896 }
897 Flavor::Shared(ref p) => {
898 p.clone_chan();
899 return Sender::new(Flavor::Shared(p.clone()));
900 }
901 Flavor::Sync(..) => unreachable!(),
902 };
903
904 unsafe {
905 let tmp = Sender::new(Flavor::Shared(packet.clone()));
906 mem::swap(self.inner_mut(), tmp.inner_mut());
907 }
908 Sender::new(Flavor::Shared(packet))
909 }
910 }
911
912 #[stable(feature = "rust1", since = "1.0.0")]
913 impl<T> Drop for Sender<T> {
914 fn drop(&mut self) {
915 match *unsafe { self.inner() } {
916 Flavor::Oneshot(ref p) => p.drop_chan(),
917 Flavor::Stream(ref p) => p.drop_chan(),
918 Flavor::Shared(ref p) => p.drop_chan(),
919 Flavor::Sync(..) => unreachable!(),
920 }
921 }
922 }
923
924 #[stable(feature = "mpsc_debug", since = "1.8.0")]
925 impl<T> fmt::Debug for Sender<T> {
926 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
927 write!(f, "Sender {{ .. }}")
928 }
929 }
930
931 ////////////////////////////////////////////////////////////////////////////////
932 // SyncSender
933 ////////////////////////////////////////////////////////////////////////////////
934
935 impl<T> SyncSender<T> {
936 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
937 SyncSender { inner: inner }
938 }
939
940 /// Sends a value on this synchronous channel.
941 ///
942 /// This function will *block* until space in the internal buffer becomes
943 /// available or a receiver is available to hand off the message to.
944 ///
945 /// Note that a successful send does *not* guarantee that the receiver will
946 /// ever see the data if there is a buffer on this channel. Items may be
947 /// enqueued in the internal buffer for the receiver to receive at a later
948 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
949 /// channel and it guarantees that the receiver has indeed received
950 /// the data if this function returns success.
951 ///
952 /// This function will never panic, but it may return [`Err`] if the
953 /// [`Receiver`] has disconnected and is no longer able to receive
954 /// information.
955 ///
956 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
957 /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
958 ///
959 /// # Examples
960 ///
961 /// ```rust
962 /// use std::sync::mpsc::sync_channel;
963 /// use std::thread;
964 ///
965 /// // Create a rendezvous sync_channel with buffer size 0
966 /// let (sync_sender, receiver) = sync_channel(0);
967 ///
968 /// thread::spawn(move || {
969 /// println!("sending message...");
970 /// sync_sender.send(1).unwrap();
971 /// // Thread is now blocked until the message is received
972 ///
973 /// println!("...message received!");
974 /// });
975 ///
976 /// let msg = receiver.recv().unwrap();
977 /// assert_eq!(1, msg);
978 /// ```
979 #[stable(feature = "rust1", since = "1.0.0")]
980 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
981 self.inner.send(t).map_err(SendError)
982 }
983
984 /// Attempts to send a value on this channel without blocking.
985 ///
986 /// This method differs from [`send`] by returning immediately if the
987 /// channel's buffer is full or no receiver is waiting to acquire some
988 /// data. Compared with [`send`], this function has two failure cases
989 /// instead of one (one for disconnection, one for a full buffer).
990 ///
991 /// See [`send`] for notes about guarantees of whether the
992 /// receiver has received the data or not if this function is successful.
993 ///
994 /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
995 ///
996 /// # Examples
997 ///
998 /// ```rust
999 /// use std::sync::mpsc::sync_channel;
1000 /// use std::thread;
1001 ///
1002 /// // Create a sync_channel with buffer size 1
1003 /// let (sync_sender, receiver) = sync_channel(1);
1004 /// let sync_sender2 = sync_sender.clone();
1005 ///
1006 /// // First thread owns sync_sender
1007 /// thread::spawn(move || {
1008 /// sync_sender.send(1).unwrap();
1009 /// sync_sender.send(2).unwrap();
1010 /// // Thread blocked
1011 /// });
1012 ///
1013 /// // Second thread owns sync_sender2
1014 /// thread::spawn(move || {
1015 /// // This will return an error and send
1016 /// // no message if the buffer is full
1017 /// sync_sender2.try_send(3).is_err();
1018 /// });
1019 ///
1020 /// let mut msg;
1021 /// msg = receiver.recv().unwrap();
1022 /// println!("message {} received", msg);
1023 ///
1024 /// msg = receiver.recv().unwrap();
1025 /// println!("message {} received", msg);
1026 ///
1027 /// // Third message may have never been sent
1028 /// match receiver.try_recv() {
1029 /// Ok(msg) => println!("message {} received", msg),
1030 /// Err(_) => println!("the third message was never sent"),
1031 /// }
1032 /// ```
1033 #[stable(feature = "rust1", since = "1.0.0")]
1034 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
1035 self.inner.try_send(t)
1036 }
1037 }
1038
1039 #[stable(feature = "rust1", since = "1.0.0")]
1040 impl<T> Clone for SyncSender<T> {
1041 fn clone(&self) -> SyncSender<T> {
1042 self.inner.clone_chan();
1043 SyncSender::new(self.inner.clone())
1044 }
1045 }
1046
1047 #[stable(feature = "rust1", since = "1.0.0")]
1048 impl<T> Drop for SyncSender<T> {
1049 fn drop(&mut self) {
1050 self.inner.drop_chan();
1051 }
1052 }
1053
1054 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1055 impl<T> fmt::Debug for SyncSender<T> {
1056 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1057 write!(f, "SyncSender {{ .. }}")
1058 }
1059 }
1060
1061 ////////////////////////////////////////////////////////////////////////////////
1062 // Receiver
1063 ////////////////////////////////////////////////////////////////////////////////
1064
1065 impl<T> Receiver<T> {
1066 fn new(inner: Flavor<T>) -> Receiver<T> {
1067 Receiver { inner: UnsafeCell::new(inner) }
1068 }
1069
1070 /// Attempts to return a pending value on this receiver without blocking.
1071 ///
1072 /// This method will never block the caller in order to wait for data to
1073 /// become available. Instead, this will always return immediately with a
1074 /// possible option of pending data on the channel.
1075 ///
1076 /// This is useful for a flavor of "optimistic check" before deciding to
1077 /// block on a receiver.
1078 ///
1079 /// Compared with [`recv`], this function has two failure cases instead of one
1080 /// (one for disconnection, one for an empty buffer).
1081 ///
1082 /// [`recv`]: struct.Receiver.html#method.recv
1083 ///
1084 /// # Examples
1085 ///
1086 /// ```rust
1087 /// use std::sync::mpsc::{Receiver, channel};
1088 ///
1089 /// let (_, receiver): (_, Receiver<i32>) = channel();
1090 ///
1091 /// assert!(receiver.try_recv().is_err());
1092 /// ```
1093 #[stable(feature = "rust1", since = "1.0.0")]
1094 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1095 loop {
1096 let new_port = match *unsafe { self.inner() } {
1097 Flavor::Oneshot(ref p) => {
1098 match p.try_recv() {
1099 Ok(t) => return Ok(t),
1100 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1101 Err(oneshot::Disconnected) => {
1102 return Err(TryRecvError::Disconnected)
1103 }
1104 Err(oneshot::Upgraded(rx)) => rx,
1105 }
1106 }
1107 Flavor::Stream(ref p) => {
1108 match p.try_recv() {
1109 Ok(t) => return Ok(t),
1110 Err(stream::Empty) => return Err(TryRecvError::Empty),
1111 Err(stream::Disconnected) => {
1112 return Err(TryRecvError::Disconnected)
1113 }
1114 Err(stream::Upgraded(rx)) => rx,
1115 }
1116 }
1117 Flavor::Shared(ref p) => {
1118 match p.try_recv() {
1119 Ok(t) => return Ok(t),
1120 Err(shared::Empty) => return Err(TryRecvError::Empty),
1121 Err(shared::Disconnected) => {
1122 return Err(TryRecvError::Disconnected)
1123 }
1124 }
1125 }
1126 Flavor::Sync(ref p) => {
1127 match p.try_recv() {
1128 Ok(t) => return Ok(t),
1129 Err(sync::Empty) => return Err(TryRecvError::Empty),
1130 Err(sync::Disconnected) => {
1131 return Err(TryRecvError::Disconnected)
1132 }
1133 }
1134 }
1135 };
1136 unsafe {
1137 mem::swap(self.inner_mut(),
1138 new_port.inner_mut());
1139 }
1140 }
1141 }
1142
1143 /// Attempts to wait for a value on this receiver, returning an error if the
1144 /// corresponding channel has hung up.
1145 ///
1146 /// This function will always block the current thread if there is no data
1147 /// available and it's possible for more data to be sent. Once a message is
1148 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1149 /// receiver will wake up and return that message.
1150 ///
1151 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1152 /// this call is blocking, this call will wake up and return [`Err`] to
1153 /// indicate that no more messages can ever be received on this channel.
1154 /// However, since channels are buffered, messages sent before the disconnect
1155 /// will still be properly received.
1156 ///
1157 /// [`Sender`]: struct.Sender.html
1158 /// [`SyncSender`]: struct.SyncSender.html
1159 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1160 ///
1161 /// # Examples
1162 ///
1163 /// ```
1164 /// use std::sync::mpsc;
1165 /// use std::thread;
1166 ///
1167 /// let (send, recv) = mpsc::channel();
1168 /// let handle = thread::spawn(move || {
1169 /// send.send(1u8).unwrap();
1170 /// });
1171 ///
1172 /// handle.join().unwrap();
1173 ///
1174 /// assert_eq!(Ok(1), recv.recv());
1175 /// ```
1176 ///
1177 /// Buffering behavior:
1178 ///
1179 /// ```
1180 /// use std::sync::mpsc;
1181 /// use std::thread;
1182 /// use std::sync::mpsc::RecvError;
1183 ///
1184 /// let (send, recv) = mpsc::channel();
1185 /// let handle = thread::spawn(move || {
1186 /// send.send(1u8).unwrap();
1187 /// send.send(2).unwrap();
1188 /// send.send(3).unwrap();
1189 /// drop(send);
1190 /// });
1191 ///
1192 /// // wait for the thread to join so we ensure the sender is dropped
1193 /// handle.join().unwrap();
1194 ///
1195 /// assert_eq!(Ok(1), recv.recv());
1196 /// assert_eq!(Ok(2), recv.recv());
1197 /// assert_eq!(Ok(3), recv.recv());
1198 /// assert_eq!(Err(RecvError), recv.recv());
1199 /// ```
1200 #[stable(feature = "rust1", since = "1.0.0")]
1201 pub fn recv(&self) -> Result<T, RecvError> {
1202 loop {
1203 let new_port = match *unsafe { self.inner() } {
1204 Flavor::Oneshot(ref p) => {
1205 match p.recv(None) {
1206 Ok(t) => return Ok(t),
1207 Err(oneshot::Disconnected) => return Err(RecvError),
1208 Err(oneshot::Upgraded(rx)) => rx,
1209 Err(oneshot::Empty) => unreachable!(),
1210 }
1211 }
1212 Flavor::Stream(ref p) => {
1213 match p.recv(None) {
1214 Ok(t) => return Ok(t),
1215 Err(stream::Disconnected) => return Err(RecvError),
1216 Err(stream::Upgraded(rx)) => rx,
1217 Err(stream::Empty) => unreachable!(),
1218 }
1219 }
1220 Flavor::Shared(ref p) => {
1221 match p.recv(None) {
1222 Ok(t) => return Ok(t),
1223 Err(shared::Disconnected) => return Err(RecvError),
1224 Err(shared::Empty) => unreachable!(),
1225 }
1226 }
1227 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1228 };
1229 unsafe {
1230 mem::swap(self.inner_mut(), new_port.inner_mut());
1231 }
1232 }
1233 }
1234
1235 /// Attempts to wait for a value on this receiver, returning an error if the
1236 /// corresponding channel has hung up, or if it waits more than `timeout`.
1237 ///
1238 /// This function will always block the current thread if there is no data
1239 /// available and it's possible for more data to be sent. Once a message is
1240 /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
1241 /// receiver will wake up and return that message.
1242 ///
1243 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1244 /// this call is blocking, this call will wake up and return [`Err`] to
1245 /// indicate that no more messages can ever be received on this channel.
1246 /// However, since channels are buffered, messages sent before the disconnect
1247 /// will still be properly received.
1248 ///
1249 /// [`Sender`]: struct.Sender.html
1250 /// [`SyncSender`]: struct.SyncSender.html
1251 /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
1252 ///
1253 /// # Examples
1254 ///
1255 /// Successfully receiving value before encountering timeout:
1256 ///
1257 /// ```no_run
1258 /// use std::thread;
1259 /// use std::time::Duration;
1260 /// use std::sync::mpsc;
1261 ///
1262 /// let (send, recv) = mpsc::channel();
1263 ///
1264 /// thread::spawn(move || {
1265 /// send.send('a').unwrap();
1266 /// });
1267 ///
1268 /// assert_eq!(
1269 /// recv.recv_timeout(Duration::from_millis(400)),
1270 /// Ok('a')
1271 /// );
1272 /// ```
1273 ///
1274 /// Receiving an error upon reaching timeout:
1275 ///
1276 /// ```no_run
1277 /// use std::thread;
1278 /// use std::time::Duration;
1279 /// use std::sync::mpsc;
1280 ///
1281 /// let (send, recv) = mpsc::channel();
1282 ///
1283 /// thread::spawn(move || {
1284 /// thread::sleep(Duration::from_millis(800));
1285 /// send.send('a').unwrap();
1286 /// });
1287 ///
1288 /// assert_eq!(
1289 /// recv.recv_timeout(Duration::from_millis(400)),
1290 /// Err(mpsc::RecvTimeoutError::Timeout)
1291 /// );
1292 /// ```
1293 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
1294 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1295 // Do an optimistic try_recv to avoid the performance impact of
1296 // Instant::now() in the full-channel case.
1297 match self.try_recv() {
1298 Ok(result)
1299 => Ok(result),
1300 Err(TryRecvError::Disconnected)
1301 => Err(RecvTimeoutError::Disconnected),
1302 Err(TryRecvError::Empty)
1303 => self.recv_max_until(Instant::now() + timeout)
1304 }
1305 }
1306
1307 fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1308 use self::RecvTimeoutError::*;
1309
1310 loop {
1311 let port_or_empty = match *unsafe { self.inner() } {
1312 Flavor::Oneshot(ref p) => {
1313 match p.recv(Some(deadline)) {
1314 Ok(t) => return Ok(t),
1315 Err(oneshot::Disconnected) => return Err(Disconnected),
1316 Err(oneshot::Upgraded(rx)) => Some(rx),
1317 Err(oneshot::Empty) => None,
1318 }
1319 }
1320 Flavor::Stream(ref p) => {
1321 match p.recv(Some(deadline)) {
1322 Ok(t) => return Ok(t),
1323 Err(stream::Disconnected) => return Err(Disconnected),
1324 Err(stream::Upgraded(rx)) => Some(rx),
1325 Err(stream::Empty) => None,
1326 }
1327 }
1328 Flavor::Shared(ref p) => {
1329 match p.recv(Some(deadline)) {
1330 Ok(t) => return Ok(t),
1331 Err(shared::Disconnected) => return Err(Disconnected),
1332 Err(shared::Empty) => None,
1333 }
1334 }
1335 Flavor::Sync(ref p) => {
1336 match p.recv(Some(deadline)) {
1337 Ok(t) => return Ok(t),
1338 Err(sync::Disconnected) => return Err(Disconnected),
1339 Err(sync::Empty) => None,
1340 }
1341 }
1342 };
1343
1344 if let Some(new_port) = port_or_empty {
1345 unsafe {
1346 mem::swap(self.inner_mut(), new_port.inner_mut());
1347 }
1348 }
1349
1350 // If we're already passed the deadline, and we're here without
1351 // data, return a timeout, else try again.
1352 if Instant::now() >= deadline {
1353 return Err(Timeout);
1354 }
1355 }
1356 }
1357
1358 /// Returns an iterator that will block waiting for messages, but never
1359 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1360 ///
1361 /// [`panic!`]: ../../../std/macro.panic.html
1362 /// [`None`]: ../../../std/option/enum.Option.html#variant.None
1363 ///
1364 /// # Examples
1365 ///
1366 /// ```rust
1367 /// use std::sync::mpsc::channel;
1368 /// use std::thread;
1369 ///
1370 /// let (send, recv) = channel();
1371 ///
1372 /// thread::spawn(move || {
1373 /// send.send(1).unwrap();
1374 /// send.send(2).unwrap();
1375 /// send.send(3).unwrap();
1376 /// });
1377 ///
1378 /// let mut iter = recv.iter();
1379 /// assert_eq!(iter.next(), Some(1));
1380 /// assert_eq!(iter.next(), Some(2));
1381 /// assert_eq!(iter.next(), Some(3));
1382 /// assert_eq!(iter.next(), None);
1383 /// ```
1384 #[stable(feature = "rust1", since = "1.0.0")]
1385 pub fn iter(&self) -> Iter<T> {
1386 Iter { rx: self }
1387 }
1388
1389 /// Returns an iterator that will attempt to yield all pending values.
1390 /// It will return `None` if there are no more pending values or if the
1391 /// channel has hung up. The iterator will never [`panic!`] or block the
1392 /// user by waiting for values.
1393 ///
1394 /// [`panic!`]: ../../../std/macro.panic.html
1395 ///
1396 /// # Examples
1397 ///
1398 /// ```no_run
1399 /// use std::sync::mpsc::channel;
1400 /// use std::thread;
1401 /// use std::time::Duration;
1402 ///
1403 /// let (sender, receiver) = channel();
1404 ///
1405 /// // nothing is in the buffer yet
1406 /// assert!(receiver.try_iter().next().is_none());
1407 ///
1408 /// thread::spawn(move || {
1409 /// thread::sleep(Duration::from_secs(1));
1410 /// sender.send(1).unwrap();
1411 /// sender.send(2).unwrap();
1412 /// sender.send(3).unwrap();
1413 /// });
1414 ///
1415 /// // nothing is in the buffer yet
1416 /// assert!(receiver.try_iter().next().is_none());
1417 ///
1418 /// // block for two seconds
1419 /// thread::sleep(Duration::from_secs(2));
1420 ///
1421 /// let mut iter = receiver.try_iter();
1422 /// assert_eq!(iter.next(), Some(1));
1423 /// assert_eq!(iter.next(), Some(2));
1424 /// assert_eq!(iter.next(), Some(3));
1425 /// assert_eq!(iter.next(), None);
1426 /// ```
1427 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1428 pub fn try_iter(&self) -> TryIter<T> {
1429 TryIter { rx: self }
1430 }
1431
1432 }
1433
1434 impl<T> select::Packet for Receiver<T> {
1435 fn can_recv(&self) -> bool {
1436 loop {
1437 let new_port = match *unsafe { self.inner() } {
1438 Flavor::Oneshot(ref p) => {
1439 match p.can_recv() {
1440 Ok(ret) => return ret,
1441 Err(upgrade) => upgrade,
1442 }
1443 }
1444 Flavor::Stream(ref p) => {
1445 match p.can_recv() {
1446 Ok(ret) => return ret,
1447 Err(upgrade) => upgrade,
1448 }
1449 }
1450 Flavor::Shared(ref p) => return p.can_recv(),
1451 Flavor::Sync(ref p) => return p.can_recv(),
1452 };
1453 unsafe {
1454 mem::swap(self.inner_mut(),
1455 new_port.inner_mut());
1456 }
1457 }
1458 }
1459
1460 fn start_selection(&self, mut token: SignalToken) -> StartResult {
1461 loop {
1462 let (t, new_port) = match *unsafe { self.inner() } {
1463 Flavor::Oneshot(ref p) => {
1464 match p.start_selection(token) {
1465 oneshot::SelSuccess => return Installed,
1466 oneshot::SelCanceled => return Abort,
1467 oneshot::SelUpgraded(t, rx) => (t, rx),
1468 }
1469 }
1470 Flavor::Stream(ref p) => {
1471 match p.start_selection(token) {
1472 stream::SelSuccess => return Installed,
1473 stream::SelCanceled => return Abort,
1474 stream::SelUpgraded(t, rx) => (t, rx),
1475 }
1476 }
1477 Flavor::Shared(ref p) => return p.start_selection(token),
1478 Flavor::Sync(ref p) => return p.start_selection(token),
1479 };
1480 token = t;
1481 unsafe {
1482 mem::swap(self.inner_mut(), new_port.inner_mut());
1483 }
1484 }
1485 }
1486
1487 fn abort_selection(&self) -> bool {
1488 let mut was_upgrade = false;
1489 loop {
1490 let result = match *unsafe { self.inner() } {
1491 Flavor::Oneshot(ref p) => p.abort_selection(),
1492 Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
1493 Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
1494 Flavor::Sync(ref p) => return p.abort_selection(),
1495 };
1496 let new_port = match result { Ok(b) => return b, Err(p) => p };
1497 was_upgrade = true;
1498 unsafe {
1499 mem::swap(self.inner_mut(),
1500 new_port.inner_mut());
1501 }
1502 }
1503 }
1504 }
1505
1506 #[stable(feature = "rust1", since = "1.0.0")]
1507 impl<'a, T> Iterator for Iter<'a, T> {
1508 type Item = T;
1509
1510 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1511 }
1512
1513 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1514 impl<'a, T> Iterator for TryIter<'a, T> {
1515 type Item = T;
1516
1517 fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1518 }
1519
1520 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1521 impl<'a, T> IntoIterator for &'a Receiver<T> {
1522 type Item = T;
1523 type IntoIter = Iter<'a, T>;
1524
1525 fn into_iter(self) -> Iter<'a, T> { self.iter() }
1526 }
1527
1528 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1529 impl<T> Iterator for IntoIter<T> {
1530 type Item = T;
1531 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
1532 }
1533
1534 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
1535 impl <T> IntoIterator for Receiver<T> {
1536 type Item = T;
1537 type IntoIter = IntoIter<T>;
1538
1539 fn into_iter(self) -> IntoIter<T> {
1540 IntoIter { rx: self }
1541 }
1542 }
1543
1544 #[stable(feature = "rust1", since = "1.0.0")]
1545 impl<T> Drop for Receiver<T> {
1546 fn drop(&mut self) {
1547 match *unsafe { self.inner() } {
1548 Flavor::Oneshot(ref p) => p.drop_port(),
1549 Flavor::Stream(ref p) => p.drop_port(),
1550 Flavor::Shared(ref p) => p.drop_port(),
1551 Flavor::Sync(ref p) => p.drop_port(),
1552 }
1553 }
1554 }
1555
1556 #[stable(feature = "mpsc_debug", since = "1.8.0")]
1557 impl<T> fmt::Debug for Receiver<T> {
1558 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1559 write!(f, "Receiver {{ .. }}")
1560 }
1561 }
1562
1563 #[stable(feature = "rust1", since = "1.0.0")]
1564 impl<T> fmt::Debug for SendError<T> {
1565 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1566 "SendError(..)".fmt(f)
1567 }
1568 }
1569
1570 #[stable(feature = "rust1", since = "1.0.0")]
1571 impl<T> fmt::Display for SendError<T> {
1572 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1573 "sending on a closed channel".fmt(f)
1574 }
1575 }
1576
1577 #[stable(feature = "rust1", since = "1.0.0")]
1578 impl<T: Send> error::Error for SendError<T> {
1579 fn description(&self) -> &str {
1580 "sending on a closed channel"
1581 }
1582
1583 fn cause(&self) -> Option<&error::Error> {
1584 None
1585 }
1586 }
1587
1588 #[stable(feature = "rust1", since = "1.0.0")]
1589 impl<T> fmt::Debug for TrySendError<T> {
1590 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1591 match *self {
1592 TrySendError::Full(..) => "Full(..)".fmt(f),
1593 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1594 }
1595 }
1596 }
1597
1598 #[stable(feature = "rust1", since = "1.0.0")]
1599 impl<T> fmt::Display for TrySendError<T> {
1600 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1601 match *self {
1602 TrySendError::Full(..) => {
1603 "sending on a full channel".fmt(f)
1604 }
1605 TrySendError::Disconnected(..) => {
1606 "sending on a closed channel".fmt(f)
1607 }
1608 }
1609 }
1610 }
1611
1612 #[stable(feature = "rust1", since = "1.0.0")]
1613 impl<T: Send> error::Error for TrySendError<T> {
1614
1615 fn description(&self) -> &str {
1616 match *self {
1617 TrySendError::Full(..) => {
1618 "sending on a full channel"
1619 }
1620 TrySendError::Disconnected(..) => {
1621 "sending on a closed channel"
1622 }
1623 }
1624 }
1625
1626 fn cause(&self) -> Option<&error::Error> {
1627 None
1628 }
1629 }
1630
1631 #[stable(feature = "rust1", since = "1.0.0")]
1632 impl fmt::Display for RecvError {
1633 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1634 "receiving on a closed channel".fmt(f)
1635 }
1636 }
1637
1638 #[stable(feature = "rust1", since = "1.0.0")]
1639 impl error::Error for RecvError {
1640
1641 fn description(&self) -> &str {
1642 "receiving on a closed channel"
1643 }
1644
1645 fn cause(&self) -> Option<&error::Error> {
1646 None
1647 }
1648 }
1649
1650 #[stable(feature = "rust1", since = "1.0.0")]
1651 impl fmt::Display for TryRecvError {
1652 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1653 match *self {
1654 TryRecvError::Empty => {
1655 "receiving on an empty channel".fmt(f)
1656 }
1657 TryRecvError::Disconnected => {
1658 "receiving on a closed channel".fmt(f)
1659 }
1660 }
1661 }
1662 }
1663
1664 #[stable(feature = "rust1", since = "1.0.0")]
1665 impl error::Error for TryRecvError {
1666
1667 fn description(&self) -> &str {
1668 match *self {
1669 TryRecvError::Empty => {
1670 "receiving on an empty channel"
1671 }
1672 TryRecvError::Disconnected => {
1673 "receiving on a closed channel"
1674 }
1675 }
1676 }
1677
1678 fn cause(&self) -> Option<&error::Error> {
1679 None
1680 }
1681 }
1682
1683 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1684 impl fmt::Display for RecvTimeoutError {
1685 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1686 match *self {
1687 RecvTimeoutError::Timeout => {
1688 "timed out waiting on channel".fmt(f)
1689 }
1690 RecvTimeoutError::Disconnected => {
1691 "channel is empty and sending half is closed".fmt(f)
1692 }
1693 }
1694 }
1695 }
1696
1697 #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1698 impl error::Error for RecvTimeoutError {
1699 fn description(&self) -> &str {
1700 match *self {
1701 RecvTimeoutError::Timeout => {
1702 "timed out waiting on channel"
1703 }
1704 RecvTimeoutError::Disconnected => {
1705 "channel is empty and sending half is closed"
1706 }
1707 }
1708 }
1709
1710 fn cause(&self) -> Option<&error::Error> {
1711 None
1712 }
1713 }
1714
1715 #[cfg(all(test, not(target_os = "emscripten")))]
1716 mod tests {
1717 use env;
1718 use super::*;
1719 use thread;
1720 use time::{Duration, Instant};
1721
1722 pub fn stress_factor() -> usize {
1723 match env::var("RUST_TEST_STRESS") {
1724 Ok(val) => val.parse().unwrap(),
1725 Err(..) => 1,
1726 }
1727 }
1728
1729 #[test]
1730 fn smoke() {
1731 let (tx, rx) = channel::<i32>();
1732 tx.send(1).unwrap();
1733 assert_eq!(rx.recv().unwrap(), 1);
1734 }
1735
1736 #[test]
1737 fn drop_full() {
1738 let (tx, _rx) = channel::<Box<isize>>();
1739 tx.send(box 1).unwrap();
1740 }
1741
1742 #[test]
1743 fn drop_full_shared() {
1744 let (tx, _rx) = channel::<Box<isize>>();
1745 drop(tx.clone());
1746 drop(tx.clone());
1747 tx.send(box 1).unwrap();
1748 }
1749
1750 #[test]
1751 fn smoke_shared() {
1752 let (tx, rx) = channel::<i32>();
1753 tx.send(1).unwrap();
1754 assert_eq!(rx.recv().unwrap(), 1);
1755 let tx = tx.clone();
1756 tx.send(1).unwrap();
1757 assert_eq!(rx.recv().unwrap(), 1);
1758 }
1759
1760 #[test]
1761 fn smoke_threads() {
1762 let (tx, rx) = channel::<i32>();
1763 let _t = thread::spawn(move|| {
1764 tx.send(1).unwrap();
1765 });
1766 assert_eq!(rx.recv().unwrap(), 1);
1767 }
1768
1769 #[test]
1770 fn smoke_port_gone() {
1771 let (tx, rx) = channel::<i32>();
1772 drop(rx);
1773 assert!(tx.send(1).is_err());
1774 }
1775
1776 #[test]
1777 fn smoke_shared_port_gone() {
1778 let (tx, rx) = channel::<i32>();
1779 drop(rx);
1780 assert!(tx.send(1).is_err())
1781 }
1782
1783 #[test]
1784 fn smoke_shared_port_gone2() {
1785 let (tx, rx) = channel::<i32>();
1786 drop(rx);
1787 let tx2 = tx.clone();
1788 drop(tx);
1789 assert!(tx2.send(1).is_err());
1790 }
1791
1792 #[test]
1793 fn port_gone_concurrent() {
1794 let (tx, rx) = channel::<i32>();
1795 let _t = thread::spawn(move|| {
1796 rx.recv().unwrap();
1797 });
1798 while tx.send(1).is_ok() {}
1799 }
1800
1801 #[test]
1802 fn port_gone_concurrent_shared() {
1803 let (tx, rx) = channel::<i32>();
1804 let tx2 = tx.clone();
1805 let _t = thread::spawn(move|| {
1806 rx.recv().unwrap();
1807 });
1808 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1809 }
1810
1811 #[test]
1812 fn smoke_chan_gone() {
1813 let (tx, rx) = channel::<i32>();
1814 drop(tx);
1815 assert!(rx.recv().is_err());
1816 }
1817
1818 #[test]
1819 fn smoke_chan_gone_shared() {
1820 let (tx, rx) = channel::<()>();
1821 let tx2 = tx.clone();
1822 drop(tx);
1823 drop(tx2);
1824 assert!(rx.recv().is_err());
1825 }
1826
1827 #[test]
1828 fn chan_gone_concurrent() {
1829 let (tx, rx) = channel::<i32>();
1830 let _t = thread::spawn(move|| {
1831 tx.send(1).unwrap();
1832 tx.send(1).unwrap();
1833 });
1834 while rx.recv().is_ok() {}
1835 }
1836
1837 #[test]
1838 fn stress() {
1839 let (tx, rx) = channel::<i32>();
1840 let t = thread::spawn(move|| {
1841 for _ in 0..10000 { tx.send(1).unwrap(); }
1842 });
1843 for _ in 0..10000 {
1844 assert_eq!(rx.recv().unwrap(), 1);
1845 }
1846 t.join().ok().unwrap();
1847 }
1848
1849 #[test]
1850 fn stress_shared() {
1851 const AMT: u32 = 10000;
1852 const NTHREADS: u32 = 8;
1853 let (tx, rx) = channel::<i32>();
1854
1855 let t = thread::spawn(move|| {
1856 for _ in 0..AMT * NTHREADS {
1857 assert_eq!(rx.recv().unwrap(), 1);
1858 }
1859 match rx.try_recv() {
1860 Ok(..) => panic!(),
1861 _ => {}
1862 }
1863 });
1864
1865 for _ in 0..NTHREADS {
1866 let tx = tx.clone();
1867 thread::spawn(move|| {
1868 for _ in 0..AMT { tx.send(1).unwrap(); }
1869 });
1870 }
1871 drop(tx);
1872 t.join().ok().unwrap();
1873 }
1874
1875 #[test]
1876 fn send_from_outside_runtime() {
1877 let (tx1, rx1) = channel::<()>();
1878 let (tx2, rx2) = channel::<i32>();
1879 let t1 = thread::spawn(move|| {
1880 tx1.send(()).unwrap();
1881 for _ in 0..40 {
1882 assert_eq!(rx2.recv().unwrap(), 1);
1883 }
1884 });
1885 rx1.recv().unwrap();
1886 let t2 = thread::spawn(move|| {
1887 for _ in 0..40 {
1888 tx2.send(1).unwrap();
1889 }
1890 });
1891 t1.join().ok().unwrap();
1892 t2.join().ok().unwrap();
1893 }
1894
1895 #[test]
1896 fn recv_from_outside_runtime() {
1897 let (tx, rx) = channel::<i32>();
1898 let t = thread::spawn(move|| {
1899 for _ in 0..40 {
1900 assert_eq!(rx.recv().unwrap(), 1);
1901 }
1902 });
1903 for _ in 0..40 {
1904 tx.send(1).unwrap();
1905 }
1906 t.join().ok().unwrap();
1907 }
1908
1909 #[test]
1910 fn no_runtime() {
1911 let (tx1, rx1) = channel::<i32>();
1912 let (tx2, rx2) = channel::<i32>();
1913 let t1 = thread::spawn(move|| {
1914 assert_eq!(rx1.recv().unwrap(), 1);
1915 tx2.send(2).unwrap();
1916 });
1917 let t2 = thread::spawn(move|| {
1918 tx1.send(1).unwrap();
1919 assert_eq!(rx2.recv().unwrap(), 2);
1920 });
1921 t1.join().ok().unwrap();
1922 t2.join().ok().unwrap();
1923 }
1924
1925 #[test]
1926 fn oneshot_single_thread_close_port_first() {
1927 // Simple test of closing without sending
1928 let (_tx, rx) = channel::<i32>();
1929 drop(rx);
1930 }
1931
1932 #[test]
1933 fn oneshot_single_thread_close_chan_first() {
1934 // Simple test of closing without sending
1935 let (tx, _rx) = channel::<i32>();
1936 drop(tx);
1937 }
1938
1939 #[test]
1940 fn oneshot_single_thread_send_port_close() {
1941 // Testing that the sender cleans up the payload if receiver is closed
1942 let (tx, rx) = channel::<Box<i32>>();
1943 drop(rx);
1944 assert!(tx.send(box 0).is_err());
1945 }
1946
1947 #[test]
1948 fn oneshot_single_thread_recv_chan_close() {
1949 // Receiving on a closed chan will panic
1950 let res = thread::spawn(move|| {
1951 let (tx, rx) = channel::<i32>();
1952 drop(tx);
1953 rx.recv().unwrap();
1954 }).join();
1955 // What is our res?
1956 assert!(res.is_err());
1957 }
1958
1959 #[test]
1960 fn oneshot_single_thread_send_then_recv() {
1961 let (tx, rx) = channel::<Box<i32>>();
1962 tx.send(box 10).unwrap();
1963 assert!(*rx.recv().unwrap() == 10);
1964 }
1965
1966 #[test]
1967 fn oneshot_single_thread_try_send_open() {
1968 let (tx, rx) = channel::<i32>();
1969 assert!(tx.send(10).is_ok());
1970 assert!(rx.recv().unwrap() == 10);
1971 }
1972
1973 #[test]
1974 fn oneshot_single_thread_try_send_closed() {
1975 let (tx, rx) = channel::<i32>();
1976 drop(rx);
1977 assert!(tx.send(10).is_err());
1978 }
1979
1980 #[test]
1981 fn oneshot_single_thread_try_recv_open() {
1982 let (tx, rx) = channel::<i32>();
1983 tx.send(10).unwrap();
1984 assert!(rx.recv() == Ok(10));
1985 }
1986
1987 #[test]
1988 fn oneshot_single_thread_try_recv_closed() {
1989 let (tx, rx) = channel::<i32>();
1990 drop(tx);
1991 assert!(rx.recv().is_err());
1992 }
1993
1994 #[test]
1995 fn oneshot_single_thread_peek_data() {
1996 let (tx, rx) = channel::<i32>();
1997 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1998 tx.send(10).unwrap();
1999 assert_eq!(rx.try_recv(), Ok(10));
2000 }
2001
2002 #[test]
2003 fn oneshot_single_thread_peek_close() {
2004 let (tx, rx) = channel::<i32>();
2005 drop(tx);
2006 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2007 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2008 }
2009
2010 #[test]
2011 fn oneshot_single_thread_peek_open() {
2012 let (_tx, rx) = channel::<i32>();
2013 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2014 }
2015
2016 #[test]
2017 fn oneshot_multi_task_recv_then_send() {
2018 let (tx, rx) = channel::<Box<i32>>();
2019 let _t = thread::spawn(move|| {
2020 assert!(*rx.recv().unwrap() == 10);
2021 });
2022
2023 tx.send(box 10).unwrap();
2024 }
2025
2026 #[test]
2027 fn oneshot_multi_task_recv_then_close() {
2028 let (tx, rx) = channel::<Box<i32>>();
2029 let _t = thread::spawn(move|| {
2030 drop(tx);
2031 });
2032 let res = thread::spawn(move|| {
2033 assert!(*rx.recv().unwrap() == 10);
2034 }).join();
2035 assert!(res.is_err());
2036 }
2037
2038 #[test]
2039 fn oneshot_multi_thread_close_stress() {
2040 for _ in 0..stress_factor() {
2041 let (tx, rx) = channel::<i32>();
2042 let _t = thread::spawn(move|| {
2043 drop(rx);
2044 });
2045 drop(tx);
2046 }
2047 }
2048
2049 #[test]
2050 fn oneshot_multi_thread_send_close_stress() {
2051 for _ in 0..stress_factor() {
2052 let (tx, rx) = channel::<i32>();
2053 let _t = thread::spawn(move|| {
2054 drop(rx);
2055 });
2056 let _ = thread::spawn(move|| {
2057 tx.send(1).unwrap();
2058 }).join();
2059 }
2060 }
2061
2062 #[test]
2063 fn oneshot_multi_thread_recv_close_stress() {
2064 for _ in 0..stress_factor() {
2065 let (tx, rx) = channel::<i32>();
2066 thread::spawn(move|| {
2067 let res = thread::spawn(move|| {
2068 rx.recv().unwrap();
2069 }).join();
2070 assert!(res.is_err());
2071 });
2072 let _t = thread::spawn(move|| {
2073 thread::spawn(move|| {
2074 drop(tx);
2075 });
2076 });
2077 }
2078 }
2079
2080 #[test]
2081 fn oneshot_multi_thread_send_recv_stress() {
2082 for _ in 0..stress_factor() {
2083 let (tx, rx) = channel::<Box<isize>>();
2084 let _t = thread::spawn(move|| {
2085 tx.send(box 10).unwrap();
2086 });
2087 assert!(*rx.recv().unwrap() == 10);
2088 }
2089 }
2090
2091 #[test]
2092 fn stream_send_recv_stress() {
2093 for _ in 0..stress_factor() {
2094 let (tx, rx) = channel();
2095
2096 send(tx, 0);
2097 recv(rx, 0);
2098
2099 fn send(tx: Sender<Box<i32>>, i: i32) {
2100 if i == 10 { return }
2101
2102 thread::spawn(move|| {
2103 tx.send(box i).unwrap();
2104 send(tx, i + 1);
2105 });
2106 }
2107
2108 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2109 if i == 10 { return }
2110
2111 thread::spawn(move|| {
2112 assert!(*rx.recv().unwrap() == i);
2113 recv(rx, i + 1);
2114 });
2115 }
2116 }
2117 }
2118
2119 #[test]
2120 fn oneshot_single_thread_recv_timeout() {
2121 let (tx, rx) = channel();
2122 tx.send(()).unwrap();
2123 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2124 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2125 tx.send(()).unwrap();
2126 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2127 }
2128
2129 #[test]
2130 fn stress_recv_timeout_two_threads() {
2131 let (tx, rx) = channel();
2132 let stress = stress_factor() + 100;
2133 let timeout = Duration::from_millis(100);
2134
2135 thread::spawn(move || {
2136 for i in 0..stress {
2137 if i % 2 == 0 {
2138 thread::sleep(timeout * 2);
2139 }
2140 tx.send(1usize).unwrap();
2141 }
2142 });
2143
2144 let mut recv_count = 0;
2145 loop {
2146 match rx.recv_timeout(timeout) {
2147 Ok(n) => {
2148 assert_eq!(n, 1usize);
2149 recv_count += 1;
2150 }
2151 Err(RecvTimeoutError::Timeout) => continue,
2152 Err(RecvTimeoutError::Disconnected) => break,
2153 }
2154 }
2155
2156 assert_eq!(recv_count, stress);
2157 }
2158
2159 #[test]
2160 fn recv_timeout_upgrade() {
2161 let (tx, rx) = channel::<()>();
2162 let timeout = Duration::from_millis(1);
2163 let _tx_clone = tx.clone();
2164
2165 let start = Instant::now();
2166 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
2167 assert!(Instant::now() >= start + timeout);
2168 }
2169
2170 #[test]
2171 fn stress_recv_timeout_shared() {
2172 let (tx, rx) = channel();
2173 let stress = stress_factor() + 100;
2174
2175 for i in 0..stress {
2176 let tx = tx.clone();
2177 thread::spawn(move || {
2178 thread::sleep(Duration::from_millis(i as u64 * 10));
2179 tx.send(1usize).unwrap();
2180 });
2181 }
2182
2183 drop(tx);
2184
2185 let mut recv_count = 0;
2186 loop {
2187 match rx.recv_timeout(Duration::from_millis(10)) {
2188 Ok(n) => {
2189 assert_eq!(n, 1usize);
2190 recv_count += 1;
2191 }
2192 Err(RecvTimeoutError::Timeout) => continue,
2193 Err(RecvTimeoutError::Disconnected) => break,
2194 }
2195 }
2196
2197 assert_eq!(recv_count, stress);
2198 }
2199
2200 #[test]
2201 fn recv_a_lot() {
2202 // Regression test that we don't run out of stack in scheduler context
2203 let (tx, rx) = channel();
2204 for _ in 0..10000 { tx.send(()).unwrap(); }
2205 for _ in 0..10000 { rx.recv().unwrap(); }
2206 }
2207
2208 #[test]
2209 fn shared_recv_timeout() {
2210 let (tx, rx) = channel();
2211 let total = 5;
2212 for _ in 0..total {
2213 let tx = tx.clone();
2214 thread::spawn(move|| {
2215 tx.send(()).unwrap();
2216 });
2217 }
2218
2219 for _ in 0..total { rx.recv().unwrap(); }
2220
2221 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2222 tx.send(()).unwrap();
2223 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
2224 }
2225
2226 #[test]
2227 fn shared_chan_stress() {
2228 let (tx, rx) = channel();
2229 let total = stress_factor() + 100;
2230 for _ in 0..total {
2231 let tx = tx.clone();
2232 thread::spawn(move|| {
2233 tx.send(()).unwrap();
2234 });
2235 }
2236
2237 for _ in 0..total {
2238 rx.recv().unwrap();
2239 }
2240 }
2241
2242 #[test]
2243 fn test_nested_recv_iter() {
2244 let (tx, rx) = channel::<i32>();
2245 let (total_tx, total_rx) = channel::<i32>();
2246
2247 let _t = thread::spawn(move|| {
2248 let mut acc = 0;
2249 for x in rx.iter() {
2250 acc += x;
2251 }
2252 total_tx.send(acc).unwrap();
2253 });
2254
2255 tx.send(3).unwrap();
2256 tx.send(1).unwrap();
2257 tx.send(2).unwrap();
2258 drop(tx);
2259 assert_eq!(total_rx.recv().unwrap(), 6);
2260 }
2261
2262 #[test]
2263 fn test_recv_iter_break() {
2264 let (tx, rx) = channel::<i32>();
2265 let (count_tx, count_rx) = channel();
2266
2267 let _t = thread::spawn(move|| {
2268 let mut count = 0;
2269 for x in rx.iter() {
2270 if count >= 3 {
2271 break;
2272 } else {
2273 count += x;
2274 }
2275 }
2276 count_tx.send(count).unwrap();
2277 });
2278
2279 tx.send(2).unwrap();
2280 tx.send(2).unwrap();
2281 tx.send(2).unwrap();
2282 let _ = tx.send(2);
2283 drop(tx);
2284 assert_eq!(count_rx.recv().unwrap(), 4);
2285 }
2286
2287 #[test]
2288 fn test_recv_try_iter() {
2289 let (request_tx, request_rx) = channel();
2290 let (response_tx, response_rx) = channel();
2291
2292 // Request `x`s until we have `6`.
2293 let t = thread::spawn(move|| {
2294 let mut count = 0;
2295 loop {
2296 for x in response_rx.try_iter() {
2297 count += x;
2298 if count == 6 {
2299 return count;
2300 }
2301 }
2302 request_tx.send(()).unwrap();
2303 }
2304 });
2305
2306 for _ in request_rx.iter() {
2307 if response_tx.send(2).is_err() {
2308 break;
2309 }
2310 }
2311
2312 assert_eq!(t.join().unwrap(), 6);
2313 }
2314
2315 #[test]
2316 fn test_recv_into_iter_owned() {
2317 let mut iter = {
2318 let (tx, rx) = channel::<i32>();
2319 tx.send(1).unwrap();
2320 tx.send(2).unwrap();
2321
2322 rx.into_iter()
2323 };
2324 assert_eq!(iter.next().unwrap(), 1);
2325 assert_eq!(iter.next().unwrap(), 2);
2326 assert_eq!(iter.next().is_none(), true);
2327 }
2328
2329 #[test]
2330 fn test_recv_into_iter_borrowed() {
2331 let (tx, rx) = channel::<i32>();
2332 tx.send(1).unwrap();
2333 tx.send(2).unwrap();
2334 drop(tx);
2335 let mut iter = (&rx).into_iter();
2336 assert_eq!(iter.next().unwrap(), 1);
2337 assert_eq!(iter.next().unwrap(), 2);
2338 assert_eq!(iter.next().is_none(), true);
2339 }
2340
2341 #[test]
2342 fn try_recv_states() {
2343 let (tx1, rx1) = channel::<i32>();
2344 let (tx2, rx2) = channel::<()>();
2345 let (tx3, rx3) = channel::<()>();
2346 let _t = thread::spawn(move|| {
2347 rx2.recv().unwrap();
2348 tx1.send(1).unwrap();
2349 tx3.send(()).unwrap();
2350 rx2.recv().unwrap();
2351 drop(tx1);
2352 tx3.send(()).unwrap();
2353 });
2354
2355 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2356 tx2.send(()).unwrap();
2357 rx3.recv().unwrap();
2358 assert_eq!(rx1.try_recv(), Ok(1));
2359 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2360 tx2.send(()).unwrap();
2361 rx3.recv().unwrap();
2362 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2363 }
2364
2365 // This bug used to end up in a livelock inside of the Receiver destructor
2366 // because the internal state of the Shared packet was corrupted
2367 #[test]
2368 fn destroy_upgraded_shared_port_when_sender_still_active() {
2369 let (tx, rx) = channel();
2370 let (tx2, rx2) = channel();
2371 let _t = thread::spawn(move|| {
2372 rx.recv().unwrap(); // wait on a oneshot
2373 drop(rx); // destroy a shared
2374 tx2.send(()).unwrap();
2375 });
2376 // make sure the other thread has gone to sleep
2377 for _ in 0..5000 { thread::yield_now(); }
2378
2379 // upgrade to a shared chan and send a message
2380 let t = tx.clone();
2381 drop(tx);
2382 t.send(()).unwrap();
2383
2384 // wait for the child thread to exit before we exit
2385 rx2.recv().unwrap();
2386 }
2387
2388 #[test]
2389 fn issue_32114() {
2390 let (tx, _) = channel();
2391 let _ = tx.send(123);
2392 assert_eq!(tx.send(123), Err(SendError(123)));
2393 }
2394 }
2395
2396 #[cfg(all(test, not(target_os = "emscripten")))]
2397 mod sync_tests {
2398 use env;
2399 use thread;
2400 use super::*;
2401 use time::Duration;
2402
2403 pub fn stress_factor() -> usize {
2404 match env::var("RUST_TEST_STRESS") {
2405 Ok(val) => val.parse().unwrap(),
2406 Err(..) => 1,
2407 }
2408 }
2409
2410 #[test]
2411 fn smoke() {
2412 let (tx, rx) = sync_channel::<i32>(1);
2413 tx.send(1).unwrap();
2414 assert_eq!(rx.recv().unwrap(), 1);
2415 }
2416
2417 #[test]
2418 fn drop_full() {
2419 let (tx, _rx) = sync_channel::<Box<isize>>(1);
2420 tx.send(box 1).unwrap();
2421 }
2422
2423 #[test]
2424 fn smoke_shared() {
2425 let (tx, rx) = sync_channel::<i32>(1);
2426 tx.send(1).unwrap();
2427 assert_eq!(rx.recv().unwrap(), 1);
2428 let tx = tx.clone();
2429 tx.send(1).unwrap();
2430 assert_eq!(rx.recv().unwrap(), 1);
2431 }
2432
2433 #[test]
2434 fn recv_timeout() {
2435 let (tx, rx) = sync_channel::<i32>(1);
2436 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
2437 tx.send(1).unwrap();
2438 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
2439 }
2440
2441 #[test]
2442 fn smoke_threads() {
2443 let (tx, rx) = sync_channel::<i32>(0);
2444 let _t = thread::spawn(move|| {
2445 tx.send(1).unwrap();
2446 });
2447 assert_eq!(rx.recv().unwrap(), 1);
2448 }
2449
2450 #[test]
2451 fn smoke_port_gone() {
2452 let (tx, rx) = sync_channel::<i32>(0);
2453 drop(rx);
2454 assert!(tx.send(1).is_err());
2455 }
2456
2457 #[test]
2458 fn smoke_shared_port_gone2() {
2459 let (tx, rx) = sync_channel::<i32>(0);
2460 drop(rx);
2461 let tx2 = tx.clone();
2462 drop(tx);
2463 assert!(tx2.send(1).is_err());
2464 }
2465
2466 #[test]
2467 fn port_gone_concurrent() {
2468 let (tx, rx) = sync_channel::<i32>(0);
2469 let _t = thread::spawn(move|| {
2470 rx.recv().unwrap();
2471 });
2472 while tx.send(1).is_ok() {}
2473 }
2474
2475 #[test]
2476 fn port_gone_concurrent_shared() {
2477 let (tx, rx) = sync_channel::<i32>(0);
2478 let tx2 = tx.clone();
2479 let _t = thread::spawn(move|| {
2480 rx.recv().unwrap();
2481 });
2482 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
2483 }
2484
2485 #[test]
2486 fn smoke_chan_gone() {
2487 let (tx, rx) = sync_channel::<i32>(0);
2488 drop(tx);
2489 assert!(rx.recv().is_err());
2490 }
2491
2492 #[test]
2493 fn smoke_chan_gone_shared() {
2494 let (tx, rx) = sync_channel::<()>(0);
2495 let tx2 = tx.clone();
2496 drop(tx);
2497 drop(tx2);
2498 assert!(rx.recv().is_err());
2499 }
2500
2501 #[test]
2502 fn chan_gone_concurrent() {
2503 let (tx, rx) = sync_channel::<i32>(0);
2504 thread::spawn(move|| {
2505 tx.send(1).unwrap();
2506 tx.send(1).unwrap();
2507 });
2508 while rx.recv().is_ok() {}
2509 }
2510
2511 #[test]
2512 fn stress() {
2513 let (tx, rx) = sync_channel::<i32>(0);
2514 thread::spawn(move|| {
2515 for _ in 0..10000 { tx.send(1).unwrap(); }
2516 });
2517 for _ in 0..10000 {
2518 assert_eq!(rx.recv().unwrap(), 1);
2519 }
2520 }
2521
2522 #[test]
2523 fn stress_recv_timeout_two_threads() {
2524 let (tx, rx) = sync_channel::<i32>(0);
2525
2526 thread::spawn(move|| {
2527 for _ in 0..10000 { tx.send(1).unwrap(); }
2528 });
2529
2530 let mut recv_count = 0;
2531 loop {
2532 match rx.recv_timeout(Duration::from_millis(1)) {
2533 Ok(v) => {
2534 assert_eq!(v, 1);
2535 recv_count += 1;
2536 },
2537 Err(RecvTimeoutError::Timeout) => continue,
2538 Err(RecvTimeoutError::Disconnected) => break,
2539 }
2540 }
2541
2542 assert_eq!(recv_count, 10000);
2543 }
2544
2545 #[test]
2546 fn stress_recv_timeout_shared() {
2547 const AMT: u32 = 1000;
2548 const NTHREADS: u32 = 8;
2549 let (tx, rx) = sync_channel::<i32>(0);
2550 let (dtx, drx) = sync_channel::<()>(0);
2551
2552 thread::spawn(move|| {
2553 let mut recv_count = 0;
2554 loop {
2555 match rx.recv_timeout(Duration::from_millis(10)) {
2556 Ok(v) => {
2557 assert_eq!(v, 1);
2558 recv_count += 1;
2559 },
2560 Err(RecvTimeoutError::Timeout) => continue,
2561 Err(RecvTimeoutError::Disconnected) => break,
2562 }
2563 }
2564
2565 assert_eq!(recv_count, AMT * NTHREADS);
2566 assert!(rx.try_recv().is_err());
2567
2568 dtx.send(()).unwrap();
2569 });
2570
2571 for _ in 0..NTHREADS {
2572 let tx = tx.clone();
2573 thread::spawn(move|| {
2574 for _ in 0..AMT { tx.send(1).unwrap(); }
2575 });
2576 }
2577
2578 drop(tx);
2579
2580 drx.recv().unwrap();
2581 }
2582
2583 #[test]
2584 fn stress_shared() {
2585 const AMT: u32 = 1000;
2586 const NTHREADS: u32 = 8;
2587 let (tx, rx) = sync_channel::<i32>(0);
2588 let (dtx, drx) = sync_channel::<()>(0);
2589
2590 thread::spawn(move|| {
2591 for _ in 0..AMT * NTHREADS {
2592 assert_eq!(rx.recv().unwrap(), 1);
2593 }
2594 match rx.try_recv() {
2595 Ok(..) => panic!(),
2596 _ => {}
2597 }
2598 dtx.send(()).unwrap();
2599 });
2600
2601 for _ in 0..NTHREADS {
2602 let tx = tx.clone();
2603 thread::spawn(move|| {
2604 for _ in 0..AMT { tx.send(1).unwrap(); }
2605 });
2606 }
2607 drop(tx);
2608 drx.recv().unwrap();
2609 }
2610
2611 #[test]
2612 fn oneshot_single_thread_close_port_first() {
2613 // Simple test of closing without sending
2614 let (_tx, rx) = sync_channel::<i32>(0);
2615 drop(rx);
2616 }
2617
2618 #[test]
2619 fn oneshot_single_thread_close_chan_first() {
2620 // Simple test of closing without sending
2621 let (tx, _rx) = sync_channel::<i32>(0);
2622 drop(tx);
2623 }
2624
2625 #[test]
2626 fn oneshot_single_thread_send_port_close() {
2627 // Testing that the sender cleans up the payload if receiver is closed
2628 let (tx, rx) = sync_channel::<Box<i32>>(0);
2629 drop(rx);
2630 assert!(tx.send(box 0).is_err());
2631 }
2632
2633 #[test]
2634 fn oneshot_single_thread_recv_chan_close() {
2635 // Receiving on a closed chan will panic
2636 let res = thread::spawn(move|| {
2637 let (tx, rx) = sync_channel::<i32>(0);
2638 drop(tx);
2639 rx.recv().unwrap();
2640 }).join();
2641 // What is our res?
2642 assert!(res.is_err());
2643 }
2644
2645 #[test]
2646 fn oneshot_single_thread_send_then_recv() {
2647 let (tx, rx) = sync_channel::<Box<i32>>(1);
2648 tx.send(box 10).unwrap();
2649 assert!(*rx.recv().unwrap() == 10);
2650 }
2651
2652 #[test]
2653 fn oneshot_single_thread_try_send_open() {
2654 let (tx, rx) = sync_channel::<i32>(1);
2655 assert_eq!(tx.try_send(10), Ok(()));
2656 assert!(rx.recv().unwrap() == 10);
2657 }
2658
2659 #[test]
2660 fn oneshot_single_thread_try_send_closed() {
2661 let (tx, rx) = sync_channel::<i32>(0);
2662 drop(rx);
2663 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
2664 }
2665
2666 #[test]
2667 fn oneshot_single_thread_try_send_closed2() {
2668 let (tx, _rx) = sync_channel::<i32>(0);
2669 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
2670 }
2671
2672 #[test]
2673 fn oneshot_single_thread_try_recv_open() {
2674 let (tx, rx) = sync_channel::<i32>(1);
2675 tx.send(10).unwrap();
2676 assert!(rx.recv() == Ok(10));
2677 }
2678
2679 #[test]
2680 fn oneshot_single_thread_try_recv_closed() {
2681 let (tx, rx) = sync_channel::<i32>(0);
2682 drop(tx);
2683 assert!(rx.recv().is_err());
2684 }
2685
2686 #[test]
2687 fn oneshot_single_thread_try_recv_closed_with_data() {
2688 let (tx, rx) = sync_channel::<i32>(1);
2689 tx.send(10).unwrap();
2690 drop(tx);
2691 assert_eq!(rx.try_recv(), Ok(10));
2692 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2693 }
2694
2695 #[test]
2696 fn oneshot_single_thread_peek_data() {
2697 let (tx, rx) = sync_channel::<i32>(1);
2698 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2699 tx.send(10).unwrap();
2700 assert_eq!(rx.try_recv(), Ok(10));
2701 }
2702
2703 #[test]
2704 fn oneshot_single_thread_peek_close() {
2705 let (tx, rx) = sync_channel::<i32>(0);
2706 drop(tx);
2707 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2708 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
2709 }
2710
2711 #[test]
2712 fn oneshot_single_thread_peek_open() {
2713 let (_tx, rx) = sync_channel::<i32>(0);
2714 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
2715 }
2716
2717 #[test]
2718 fn oneshot_multi_task_recv_then_send() {
2719 let (tx, rx) = sync_channel::<Box<i32>>(0);
2720 let _t = thread::spawn(move|| {
2721 assert!(*rx.recv().unwrap() == 10);
2722 });
2723
2724 tx.send(box 10).unwrap();
2725 }
2726
2727 #[test]
2728 fn oneshot_multi_task_recv_then_close() {
2729 let (tx, rx) = sync_channel::<Box<i32>>(0);
2730 let _t = thread::spawn(move|| {
2731 drop(tx);
2732 });
2733 let res = thread::spawn(move|| {
2734 assert!(*rx.recv().unwrap() == 10);
2735 }).join();
2736 assert!(res.is_err());
2737 }
2738
2739 #[test]
2740 fn oneshot_multi_thread_close_stress() {
2741 for _ in 0..stress_factor() {
2742 let (tx, rx) = sync_channel::<i32>(0);
2743 let _t = thread::spawn(move|| {
2744 drop(rx);
2745 });
2746 drop(tx);
2747 }
2748 }
2749
2750 #[test]
2751 fn oneshot_multi_thread_send_close_stress() {
2752 for _ in 0..stress_factor() {
2753 let (tx, rx) = sync_channel::<i32>(0);
2754 let _t = thread::spawn(move|| {
2755 drop(rx);
2756 });
2757 let _ = thread::spawn(move || {
2758 tx.send(1).unwrap();
2759 }).join();
2760 }
2761 }
2762
2763 #[test]
2764 fn oneshot_multi_thread_recv_close_stress() {
2765 for _ in 0..stress_factor() {
2766 let (tx, rx) = sync_channel::<i32>(0);
2767 let _t = thread::spawn(move|| {
2768 let res = thread::spawn(move|| {
2769 rx.recv().unwrap();
2770 }).join();
2771 assert!(res.is_err());
2772 });
2773 let _t = thread::spawn(move|| {
2774 thread::spawn(move|| {
2775 drop(tx);
2776 });
2777 });
2778 }
2779 }
2780
2781 #[test]
2782 fn oneshot_multi_thread_send_recv_stress() {
2783 for _ in 0..stress_factor() {
2784 let (tx, rx) = sync_channel::<Box<i32>>(0);
2785 let _t = thread::spawn(move|| {
2786 tx.send(box 10).unwrap();
2787 });
2788 assert!(*rx.recv().unwrap() == 10);
2789 }
2790 }
2791
2792 #[test]
2793 fn stream_send_recv_stress() {
2794 for _ in 0..stress_factor() {
2795 let (tx, rx) = sync_channel::<Box<i32>>(0);
2796
2797 send(tx, 0);
2798 recv(rx, 0);
2799
2800 fn send(tx: SyncSender<Box<i32>>, i: i32) {
2801 if i == 10 { return }
2802
2803 thread::spawn(move|| {
2804 tx.send(box i).unwrap();
2805 send(tx, i + 1);
2806 });
2807 }
2808
2809 fn recv(rx: Receiver<Box<i32>>, i: i32) {
2810 if i == 10 { return }
2811
2812 thread::spawn(move|| {
2813 assert!(*rx.recv().unwrap() == i);
2814 recv(rx, i + 1);
2815 });
2816 }
2817 }
2818 }
2819
2820 #[test]
2821 fn recv_a_lot() {
2822 // Regression test that we don't run out of stack in scheduler context
2823 let (tx, rx) = sync_channel(10000);
2824 for _ in 0..10000 { tx.send(()).unwrap(); }
2825 for _ in 0..10000 { rx.recv().unwrap(); }
2826 }
2827
2828 #[test]
2829 fn shared_chan_stress() {
2830 let (tx, rx) = sync_channel(0);
2831 let total = stress_factor() + 100;
2832 for _ in 0..total {
2833 let tx = tx.clone();
2834 thread::spawn(move|| {
2835 tx.send(()).unwrap();
2836 });
2837 }
2838
2839 for _ in 0..total {
2840 rx.recv().unwrap();
2841 }
2842 }
2843
2844 #[test]
2845 fn test_nested_recv_iter() {
2846 let (tx, rx) = sync_channel::<i32>(0);
2847 let (total_tx, total_rx) = sync_channel::<i32>(0);
2848
2849 let _t = thread::spawn(move|| {
2850 let mut acc = 0;
2851 for x in rx.iter() {
2852 acc += x;
2853 }
2854 total_tx.send(acc).unwrap();
2855 });
2856
2857 tx.send(3).unwrap();
2858 tx.send(1).unwrap();
2859 tx.send(2).unwrap();
2860 drop(tx);
2861 assert_eq!(total_rx.recv().unwrap(), 6);
2862 }
2863
2864 #[test]
2865 fn test_recv_iter_break() {
2866 let (tx, rx) = sync_channel::<i32>(0);
2867 let (count_tx, count_rx) = sync_channel(0);
2868
2869 let _t = thread::spawn(move|| {
2870 let mut count = 0;
2871 for x in rx.iter() {
2872 if count >= 3 {
2873 break;
2874 } else {
2875 count += x;
2876 }
2877 }
2878 count_tx.send(count).unwrap();
2879 });
2880
2881 tx.send(2).unwrap();
2882 tx.send(2).unwrap();
2883 tx.send(2).unwrap();
2884 let _ = tx.try_send(2);
2885 drop(tx);
2886 assert_eq!(count_rx.recv().unwrap(), 4);
2887 }
2888
2889 #[test]
2890 fn try_recv_states() {
2891 let (tx1, rx1) = sync_channel::<i32>(1);
2892 let (tx2, rx2) = sync_channel::<()>(1);
2893 let (tx3, rx3) = sync_channel::<()>(1);
2894 let _t = thread::spawn(move|| {
2895 rx2.recv().unwrap();
2896 tx1.send(1).unwrap();
2897 tx3.send(()).unwrap();
2898 rx2.recv().unwrap();
2899 drop(tx1);
2900 tx3.send(()).unwrap();
2901 });
2902
2903 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2904 tx2.send(()).unwrap();
2905 rx3.recv().unwrap();
2906 assert_eq!(rx1.try_recv(), Ok(1));
2907 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
2908 tx2.send(()).unwrap();
2909 rx3.recv().unwrap();
2910 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
2911 }
2912
2913 // This bug used to end up in a livelock inside of the Receiver destructor
2914 // because the internal state of the Shared packet was corrupted
2915 #[test]
2916 fn destroy_upgraded_shared_port_when_sender_still_active() {
2917 let (tx, rx) = sync_channel::<()>(0);
2918 let (tx2, rx2) = sync_channel::<()>(0);
2919 let _t = thread::spawn(move|| {
2920 rx.recv().unwrap(); // wait on a oneshot
2921 drop(rx); // destroy a shared
2922 tx2.send(()).unwrap();
2923 });
2924 // make sure the other thread has gone to sleep
2925 for _ in 0..5000 { thread::yield_now(); }
2926
2927 // upgrade to a shared chan and send a message
2928 let t = tx.clone();
2929 drop(tx);
2930 t.send(()).unwrap();
2931
2932 // wait for the child thread to exit before we exit
2933 rx2.recv().unwrap();
2934 }
2935
2936 #[test]
2937 fn send1() {
2938 let (tx, rx) = sync_channel::<i32>(0);
2939 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2940 assert_eq!(tx.send(1), Ok(()));
2941 }
2942
2943 #[test]
2944 fn send2() {
2945 let (tx, rx) = sync_channel::<i32>(0);
2946 let _t = thread::spawn(move|| { drop(rx); });
2947 assert!(tx.send(1).is_err());
2948 }
2949
2950 #[test]
2951 fn send3() {
2952 let (tx, rx) = sync_channel::<i32>(1);
2953 assert_eq!(tx.send(1), Ok(()));
2954 let _t =thread::spawn(move|| { drop(rx); });
2955 assert!(tx.send(1).is_err());
2956 }
2957
2958 #[test]
2959 fn send4() {
2960 let (tx, rx) = sync_channel::<i32>(0);
2961 let tx2 = tx.clone();
2962 let (done, donerx) = channel();
2963 let done2 = done.clone();
2964 let _t = thread::spawn(move|| {
2965 assert!(tx.send(1).is_err());
2966 done.send(()).unwrap();
2967 });
2968 let _t = thread::spawn(move|| {
2969 assert!(tx2.send(2).is_err());
2970 done2.send(()).unwrap();
2971 });
2972 drop(rx);
2973 donerx.recv().unwrap();
2974 donerx.recv().unwrap();
2975 }
2976
2977 #[test]
2978 fn try_send1() {
2979 let (tx, _rx) = sync_channel::<i32>(0);
2980 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2981 }
2982
2983 #[test]
2984 fn try_send2() {
2985 let (tx, _rx) = sync_channel::<i32>(1);
2986 assert_eq!(tx.try_send(1), Ok(()));
2987 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2988 }
2989
2990 #[test]
2991 fn try_send3() {
2992 let (tx, rx) = sync_channel::<i32>(1);
2993 assert_eq!(tx.try_send(1), Ok(()));
2994 drop(rx);
2995 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2996 }
2997
2998 #[test]
2999 fn issue_15761() {
3000 fn repro() {
3001 let (tx1, rx1) = sync_channel::<()>(3);
3002 let (tx2, rx2) = sync_channel::<()>(3);
3003
3004 let _t = thread::spawn(move|| {
3005 rx1.recv().unwrap();
3006 tx2.try_send(()).unwrap();
3007 });
3008
3009 tx1.try_send(()).unwrap();
3010 rx2.recv().unwrap();
3011 }
3012
3013 for _ in 0..100 {
3014 repro()
3015 }
3016 }
3017
3018 #[test]
3019 fn fmt_debug_sender() {
3020 let (tx, _) = channel::<i32>();
3021 assert_eq!(format!("{:?}", tx), "Sender { .. }");
3022 }
3023
3024 #[test]
3025 fn fmt_debug_recv() {
3026 let (_, rx) = channel::<i32>();
3027 assert_eq!(format!("{:?}", rx), "Receiver { .. }");
3028 }
3029
3030 #[test]
3031 fn fmt_debug_sync_sender() {
3032 let (tx, _) = sync_channel::<i32>(1);
3033 assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
3034 }
3035 }