]> git.proxmox.com Git - rustc.git/blame - library/std/src/sync/mpsc/mod.rs
Merge tag 'debian/1.52.1+dfsg1-1_exp2' into proxmox/buster
[rustc.git] / library / std / src / sync / mpsc / mod.rs
CommitLineData
85aaf69f 1//! Multi-producer, single-consumer FIFO queue communication primitives.
1a4d82fc
JJ
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined among three types:
5//!
cc61c64b
XL
6//! * [`Sender`]
7//! * [`SyncSender`]
8//! * [`Receiver`]
1a4d82fc 9//!
cc61c64b 10//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
1a4d82fc
JJ
11//! senders are clone-able (multi-producer) such that many threads can send
12//! simultaneously to one receiver (single-consumer).
13//!
14//! These channels come in two flavors:
15//!
cc61c64b 16//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
1a4d82fc
JJ
17//! will return a `(Sender, Receiver)` tuple where all sends will be
18//! **asynchronous** (they never block). The channel conceptually has an
19//! infinite buffer.
20//!
cc61c64b
XL
21//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22//! return a `(SyncSender, Receiver)` tuple where the storage for pending
23//! messages is a pre-allocated buffer of a fixed size. All sends will be
1a4d82fc 24//! **synchronous** by blocking until there is buffer space available. Note
cc61c64b
XL
25//! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26//! channel where each sender atomically hands off a message to a receiver.
27//!
3dfed10e 28//! [`send`]: Sender::send
1a4d82fc
JJ
29//!
30//! ## Disconnection
31//!
cc61c64b 32//! The send and receive operations on channels will all return a [`Result`]
1a4d82fc
JJ
33//! indicating whether the operation succeeded or not. An unsuccessful operation
34//! is normally indicative of the other half of a channel having "hung up" by
35//! being dropped in its corresponding thread.
36//!
37//! Once half of a channel has been deallocated, most operations can no longer
cc61c64b
XL
38//! continue to make progress, so [`Err`] will be returned. Many applications
39//! will continue to [`unwrap`] the results returned from this module,
40//! instigating a propagation of failure among threads if one unexpectedly dies.
41//!
3dfed10e 42//! [`unwrap`]: Result::unwrap
1a4d82fc
JJ
43//!
44//! # Examples
45//!
46//! Simple usage:
47//!
48//! ```
85aaf69f 49//! use std::thread;
1a4d82fc
JJ
50//! use std::sync::mpsc::channel;
51//!
52//! // Create a simple streaming channel
53//! let (tx, rx) = channel();
85aaf69f
SL
54//! thread::spawn(move|| {
55//! tx.send(10).unwrap();
1a4d82fc 56//! });
85aaf69f 57//! assert_eq!(rx.recv().unwrap(), 10);
1a4d82fc
JJ
58//! ```
59//!
60//! Shared usage:
61//!
62//! ```
85aaf69f 63//! use std::thread;
1a4d82fc
JJ
64//! use std::sync::mpsc::channel;
65//!
66//! // Create a shared channel that can be sent along from many threads
67//! // where tx is the sending half (tx for transmission), and rx is the receiving
68//! // half (rx for receiving).
69//! let (tx, rx) = channel();
85aaf69f 70//! for i in 0..10 {
1a4d82fc 71//! let tx = tx.clone();
85aaf69f 72//! thread::spawn(move|| {
1a4d82fc
JJ
73//! tx.send(i).unwrap();
74//! });
75//! }
76//!
85aaf69f 77//! for _ in 0..10 {
1a4d82fc
JJ
78//! let j = rx.recv().unwrap();
79//! assert!(0 <= j && j < 10);
80//! }
81//! ```
82//!
83//! Propagating panics:
84//!
85//! ```
86//! use std::sync::mpsc::channel;
87//!
88//! // The call to recv() will return an error because the channel has already
89//! // hung up (or been deallocated)
c34b1796 90//! let (tx, rx) = channel::<i32>();
1a4d82fc
JJ
91//! drop(tx);
92//! assert!(rx.recv().is_err());
93//! ```
94//!
95//! Synchronous channels:
96//!
97//! ```
85aaf69f 98//! use std::thread;
1a4d82fc
JJ
99//! use std::sync::mpsc::sync_channel;
100//!
c34b1796 101//! let (tx, rx) = sync_channel::<i32>(0);
85aaf69f 102//! thread::spawn(move|| {
bd371182 103//! // This will wait for the parent thread to start receiving
1a4d82fc
JJ
104//! tx.send(53).unwrap();
105//! });
106//! rx.recv().unwrap();
107//! ```
1a4d82fc 108
85aaf69f 109#![stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 110
1b1a35ee
XL
111#[cfg(all(test, not(target_os = "emscripten")))]
112mod tests;
113
114#[cfg(all(test, not(target_os = "emscripten")))]
115mod sync_tests;
116
1a4d82fc
JJ
117// A description of how Rust's channel implementation works
118//
119// Channels are supposed to be the basic building block for all other
120// concurrent primitives that are used in Rust. As a result, the channel type
121// needs to be highly optimized, flexible, and broad enough for use everywhere.
122//
123// The choice of implementation of all channels is to be built on lock-free data
124// structures. The channels themselves are then consequently also lock-free data
125// structures. As always with lock-free code, this is a very "here be dragons"
126// territory, especially because I'm unaware of any academic papers that have
127// gone into great length about channels of these flavors.
128//
129// ## Flavors of channels
130//
131// From the perspective of a consumer of this library, there is only one flavor
132// of channel. This channel can be used as a stream and cloned to allow multiple
133// senders. Under the hood, however, there are actually three flavors of
134// channels in play.
135//
3157f602
XL
136// * Flavor::Oneshots - these channels are highly optimized for the one-send use
137// case. They contain as few atomics as possible and
138// involve one and exactly one allocation.
1a4d82fc
JJ
139// * Streams - these channels are optimized for the non-shared use case. They
140// use a different concurrent queue that is more tailored for this
141// use case. The initial allocation of this flavor of channel is not
142// optimized.
143// * Shared - this is the most general form of channel that this module offers,
144// a channel with multiple senders. This type is as optimized as it
145// can be, but the previous two types mentioned are much faster for
146// their use-cases.
147//
148// ## Concurrent queues
149//
3157f602
XL
150// The basic idea of Rust's Sender/Receiver types is that send() never blocks,
151// but recv() obviously blocks. This means that under the hood there must be
152// some shared and concurrent queue holding all of the actual data.
1a4d82fc
JJ
153//
154// With two flavors of channels, two flavors of queues are also used. We have
155// chosen to use queues from a well-known author that are abbreviated as SPSC
156// and MPSC (single producer, single consumer and multiple producer, single
157// consumer). SPSC queues are used for streams while MPSC queues are used for
158// shared channels.
159//
160// ### SPSC optimizations
161//
162// The SPSC queue found online is essentially a linked list of nodes where one
163// half of the nodes are the "queue of data" and the other half of nodes are a
164// cache of unused nodes. The unused nodes are used such that an allocation is
165// not required on every push() and a free doesn't need to happen on every
166// pop().
167//
168// As found online, however, the cache of nodes is of an infinite size. This
169// means that if a channel at one point in its life had 50k items in the queue,
170// then the queue will always have the capacity for 50k items. I believed that
171// this was an unnecessary limitation of the implementation, so I have altered
172// the queue to optionally have a bound on the cache size.
173//
174// By default, streams will have an unbounded SPSC queue with a small-ish cache
175// size. The hope is that the cache is still large enough to have very fast
176// send() operations while not too large such that millions of channels can
177// coexist at once.
178//
179// ### MPSC optimizations
180//
181// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
182// a linked list under the hood to earn its unboundedness, but I have not put
183// forth much effort into having a cache of nodes similar to the SPSC queue.
184//
185// For now, I believe that this is "ok" because shared channels are not the most
186// common type, but soon we may wish to revisit this queue choice and determine
187// another candidate for backend storage of shared channels.
188//
189// ## Overview of the Implementation
190//
191// Now that there's a little background on the concurrent queues used, it's
192// worth going into much more detail about the channels themselves. The basic
193// pseudocode for a send/recv are:
194//
195//
196// send(t) recv()
197// queue.push(t) return if queue.pop()
198// if increment() == -1 deschedule {
199// wakeup() if decrement() > 0
200// cancel_deschedule()
201// }
202// queue.pop()
203//
204// As mentioned before, there are no locks in this implementation, only atomic
205// instructions are used.
206//
207// ### The internal atomic counter
208//
209// Every channel has a shared counter with each half to keep track of the size
210// of the queue. This counter is used to abort descheduling by the receiver and
211// to know when to wake up on the sending side.
212//
213// As seen in the pseudocode, senders will increment this count and receivers
214// will decrement the count. The theory behind this is that if a sender sees a
215// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
216// then it doesn't need to block.
217//
218// The recv() method has a beginning call to pop(), and if successful, it needs
219// to decrement the count. It is a crucial implementation detail that this
220// decrement does *not* happen to the shared counter. If this were the case,
221// then it would be possible for the counter to be very negative when there were
222// no receivers waiting, in which case the senders would have to determine when
223// it was actually appropriate to wake up a receiver.
224//
225// Instead, the "steal count" is kept track of separately (not atomically
226// because it's only used by receivers), and then the decrement() call when
227// descheduling will lump in all of the recent steals into one large decrement.
228//
229// The implication of this is that if a sender sees a -1 count, then there's
230// guaranteed to be a waiter waiting!
231//
232// ## Native Implementation
233//
234// A major goal of these channels is to work seamlessly on and off the runtime.
235// All of the previous race conditions have been worded in terms of
236// scheduler-isms (which is obviously not available without the runtime).
237//
238// For now, native usage of channels (off the runtime) will fall back onto
239// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
240// is still entirely lock-free, the "deschedule" blocks above are surrounded by
241// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
242// condition variable.
243//
244// ## Select
245//
246// Being able to support selection over channels has greatly influenced this
247// design, and not only does selection need to work inside the runtime, but also
248// outside the runtime.
249//
250// The implementation is fairly straightforward. The goal of select() is not to
251// return some data, but only to return which channel can receive data without
252// blocking. The implementation is essentially the entire blocking procedure
253// followed by an increment as soon as its woken up. The cancellation procedure
254// involves an increment and swapping out of to_wake to acquire ownership of the
bd371182 255// thread to unblock.
1a4d82fc
JJ
256//
257// Sadly this current implementation requires multiple allocations, so I have
258// seen the throughput of select() be much worse than it should be. I do not
259// believe that there is anything fundamental that needs to change about these
260// channels, however, in order to support a more efficient select().
261//
48663c56
XL
262// FIXME: Select is now removed, so these factors are ready to be cleaned up!
263//
1a4d82fc
JJ
264// # Conclusion
265//
266// And now that you've seen all the races that I found and attempted to fix,
267// here's the code for you to find some more!
268
60c5eb7d 269use crate::cell::UnsafeCell;
532ac7d7
XL
270use crate::error;
271use crate::fmt;
272use crate::mem;
60c5eb7d 273use crate::sync::Arc;
532ac7d7 274use crate::time::{Duration, Instant};
1a4d82fc 275
1a4d82fc 276mod blocking;
60c5eb7d 277mod mpsc_queue;
1a4d82fc 278mod oneshot;
1a4d82fc 279mod shared;
60c5eb7d 280mod spsc_queue;
1a4d82fc
JJ
281mod stream;
282mod sync;
1a4d82fc 283
abe05a73
XL
284mod cache_aligned;
285
dfeec247 286/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
7cac9316 287/// This half can only be owned by one thread.
cc61c64b
XL
288///
289/// Messages sent to the channel can be retrieved using [`recv`].
290///
3dfed10e 291/// [`recv`]: Receiver::recv
cc61c64b
XL
292///
293/// # Examples
294///
295/// ```rust
296/// use std::sync::mpsc::channel;
297/// use std::thread;
298/// use std::time::Duration;
299///
300/// let (send, recv) = channel();
301///
302/// thread::spawn(move || {
303/// send.send("Hello world!").unwrap();
304/// thread::sleep(Duration::from_secs(2)); // block for two seconds
305/// send.send("Delayed for 2 seconds").unwrap();
306/// });
307///
308/// println!("{}", recv.recv().unwrap()); // Received immediately
309/// println!("Waiting...");
310/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
311/// ```
85aaf69f 312#[stable(feature = "rust1", since = "1.0.0")]
6a06907d 313#[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
1a4d82fc
JJ
314pub struct Receiver<T> {
315 inner: UnsafeCell<Flavor<T>>,
316}
317
318// The receiver port can be sent from place to place, so long as it
319// is not used to receive non-sendable things.
92a42be0 320#[stable(feature = "rust1", since = "1.0.0")]
60c5eb7d 321unsafe impl<T: Send> Send for Receiver<T> {}
1a4d82fc 322
54a0048b 323#[stable(feature = "rust1", since = "1.0.0")]
60c5eb7d 324impl<T> !Sync for Receiver<T> {}
54a0048b 325
7cac9316
XL
326/// An iterator over messages on a [`Receiver`], created by [`iter`].
327///
328/// This iterator will block whenever [`next`] is called,
329/// waiting for a new message, and [`None`] will be returned
cc61c64b
XL
330/// when the corresponding channel has hung up.
331///
3dfed10e
XL
332/// [`iter`]: Receiver::iter
333/// [`next`]: Iterator::next
7cac9316
XL
334///
335/// # Examples
336///
337/// ```rust
338/// use std::sync::mpsc::channel;
339/// use std::thread;
340///
341/// let (send, recv) = channel();
342///
343/// thread::spawn(move || {
344/// send.send(1u8).unwrap();
345/// send.send(2u8).unwrap();
346/// send.send(3u8).unwrap();
347/// });
348///
349/// for x in recv.iter() {
350/// println!("Got: {}", x);
351/// }
352/// ```
85aaf69f 353#[stable(feature = "rust1", since = "1.0.0")]
32a655c1 354#[derive(Debug)]
c34b1796 355pub struct Iter<'a, T: 'a> {
60c5eb7d 356 rx: &'a Receiver<T>,
1a4d82fc
JJ
357}
358
7cac9316
XL
359/// An iterator that attempts to yield all pending values for a [`Receiver`],
360/// created by [`try_iter`].
5bcae85e 361///
7cac9316
XL
362/// [`None`] will be returned when there are no pending values remaining or
363/// if the corresponding channel has hung up.
364///
365/// This iterator will never block the caller in order to wait for data to
cc61c64b
XL
366/// become available. Instead, it will return [`None`].
367///
3dfed10e 368/// [`try_iter`]: Receiver::try_iter
7cac9316
XL
369///
370/// # Examples
371///
372/// ```rust
373/// use std::sync::mpsc::channel;
374/// use std::thread;
375/// use std::time::Duration;
376///
377/// let (sender, receiver) = channel();
378///
379/// // Nothing is in the buffer yet
380/// assert!(receiver.try_iter().next().is_none());
381/// println!("Nothing in the buffer...");
382///
383/// thread::spawn(move || {
384/// sender.send(1).unwrap();
385/// sender.send(2).unwrap();
386/// sender.send(3).unwrap();
387/// });
388///
389/// println!("Going to sleep...");
390/// thread::sleep(Duration::from_secs(2)); // block for two seconds
391///
392/// for x in receiver.try_iter() {
393/// println!("Got: {}", x);
394/// }
395/// ```
476ff2be 396#[stable(feature = "receiver_try_iter", since = "1.15.0")]
32a655c1 397#[derive(Debug)]
5bcae85e 398pub struct TryIter<'a, T: 'a> {
60c5eb7d 399 rx: &'a Receiver<T>,
5bcae85e
SL
400}
401
7cac9316
XL
402/// An owning iterator over messages on a [`Receiver`],
403/// created by **Receiver::into_iter**.
cc61c64b 404///
7cac9316
XL
405/// This iterator will block whenever [`next`]
406/// is called, waiting for a new message, and [`None`] will be
407/// returned if the corresponding channel has hung up.
408///
3dfed10e 409/// [`next`]: Iterator::next
cc61c64b 410///
7cac9316
XL
411/// # Examples
412///
413/// ```rust
414/// use std::sync::mpsc::channel;
415/// use std::thread;
416///
417/// let (send, recv) = channel();
418///
419/// thread::spawn(move || {
420/// send.send(1u8).unwrap();
421/// send.send(2u8).unwrap();
422/// send.send(3u8).unwrap();
423/// });
424///
425/// for x in recv.into_iter() {
426/// println!("Got: {}", x);
427/// }
428/// ```
d9579d0f 429#[stable(feature = "receiver_into_iter", since = "1.1.0")]
32a655c1 430#[derive(Debug)]
d9579d0f 431pub struct IntoIter<T> {
60c5eb7d 432 rx: Receiver<T>,
d9579d0f
AL
433}
434
7cac9316 435/// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
bd371182 436/// owned by one thread, but it can be cloned to send to other threads.
cc61c64b
XL
437///
438/// Messages can be sent through this channel with [`send`].
439///
3dfed10e 440/// [`send`]: Sender::send
cc61c64b
XL
441///
442/// # Examples
443///
444/// ```rust
445/// use std::sync::mpsc::channel;
446/// use std::thread;
447///
448/// let (sender, receiver) = channel();
449/// let sender2 = sender.clone();
450///
451/// // First thread owns sender
452/// thread::spawn(move || {
453/// sender.send(1).unwrap();
454/// });
455///
456/// // Second thread owns sender2
457/// thread::spawn(move || {
458/// sender2.send(2).unwrap();
459/// });
460///
461/// let msg = receiver.recv().unwrap();
462/// let msg2 = receiver.recv().unwrap();
463///
464/// assert_eq!(3, msg + msg2);
465/// ```
85aaf69f 466#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
467pub struct Sender<T> {
468 inner: UnsafeCell<Flavor<T>>,
469}
470
471// The send port can be sent from place to place, so long as it
472// is not used to send non-sendable things.
92a42be0 473#[stable(feature = "rust1", since = "1.0.0")]
60c5eb7d 474unsafe impl<T: Send> Send for Sender<T> {}
1a4d82fc 475
54a0048b 476#[stable(feature = "rust1", since = "1.0.0")]
60c5eb7d 477impl<T> !Sync for Sender<T> {}
54a0048b 478
7cac9316 479/// The sending-half of Rust's synchronous [`sync_channel`] type.
cc61c64b 480///
7cac9316
XL
481/// Messages can be sent through this channel with [`send`] or [`try_send`].
482///
483/// [`send`] will block if there is no space in the internal buffer.
484///
3dfed10e
XL
485/// [`send`]: SyncSender::send
486/// [`try_send`]: SyncSender::try_send
cc61c64b 487///
7cac9316
XL
488/// # Examples
489///
490/// ```rust
491/// use std::sync::mpsc::sync_channel;
492/// use std::thread;
493///
494/// // Create a sync_channel with buffer size 2
495/// let (sync_sender, receiver) = sync_channel(2);
496/// let sync_sender2 = sync_sender.clone();
497///
498/// // First thread owns sync_sender
499/// thread::spawn(move || {
500/// sync_sender.send(1).unwrap();
501/// sync_sender.send(2).unwrap();
502/// });
503///
504/// // Second thread owns sync_sender2
505/// thread::spawn(move || {
506/// sync_sender2.send(3).unwrap();
507/// // thread will now block since the buffer is full
508/// println!("Thread unblocked!");
509/// });
510///
511/// let mut msg;
512///
513/// msg = receiver.recv().unwrap();
514/// println!("message {} received", msg);
515///
516/// // "Thread unblocked!" will be printed now
517///
518/// msg = receiver.recv().unwrap();
519/// println!("message {} received", msg);
520///
521/// msg = receiver.recv().unwrap();
522///
523/// println!("message {} received", msg);
524/// ```
85aaf69f 525#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 526pub struct SyncSender<T> {
476ff2be 527 inner: Arc<sync::Packet<T>>,
1a4d82fc
JJ
528}
529
92a42be0 530#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 531unsafe impl<T: Send> Send for SyncSender<T> {}
85aaf69f 532
cc61c64b
XL
533/// An error returned from the [`Sender::send`] or [`SyncSender::send`]
534/// function on **channel**s.
1a4d82fc 535///
cc61c64b 536/// A **send** operation can only fail if the receiving end of a channel is
1a4d82fc
JJ
537/// disconnected, implying that the data could never be received. The error
538/// contains the data being sent as a payload so it can be recovered.
85aaf69f
SL
539#[stable(feature = "rust1", since = "1.0.0")]
540#[derive(PartialEq, Eq, Clone, Copy)]
c34b1796 541pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
1a4d82fc 542
cc61c64b
XL
543/// An error returned from the [`recv`] function on a [`Receiver`].
544///
545/// The [`recv`] operation can only fail if the sending half of a
dfeec247 546/// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further
cc61c64b 547/// messages will ever be received.
1a4d82fc 548///
3dfed10e 549/// [`recv`]: Receiver::recv
85aaf69f
SL
550#[derive(PartialEq, Eq, Clone, Copy, Debug)]
551#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
552pub struct RecvError;
553
cc61c64b
XL
554/// This enumeration is the list of the possible reasons that [`try_recv`] could
555/// not return data when called. This can occur with both a [`channel`] and
556/// a [`sync_channel`].
557///
3dfed10e 558/// [`try_recv`]: Receiver::try_recv
85aaf69f
SL
559#[derive(PartialEq, Eq, Clone, Copy, Debug)]
560#[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 561pub enum TryRecvError {
cc61c64b 562 /// This **channel** is currently empty, but the **Sender**(s) have not yet
1a4d82fc 563 /// disconnected, so data may yet become available.
85aaf69f 564 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
565 Empty,
566
cc61c64b
XL
567 /// The **channel**'s sending half has become disconnected, and there will
568 /// never be any more data received on it.
85aaf69f 569 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
570 Disconnected,
571}
572
cc61c64b
XL
573/// This enumeration is the list of possible errors that made [`recv_timeout`]
574/// unable to return data when called. This can occur with both a [`channel`] and
575/// a [`sync_channel`].
576///
3dfed10e 577/// [`recv_timeout`]: Receiver::recv_timeout
3157f602 578#[derive(PartialEq, Eq, Clone, Copy, Debug)]
5bcae85e 579#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602 580pub enum RecvTimeoutError {
cc61c64b 581 /// This **channel** is currently empty, but the **Sender**(s) have not yet
3157f602 582 /// disconnected, so data may yet become available.
5bcae85e 583 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602 584 Timeout,
cc61c64b
XL
585 /// The **channel**'s sending half has become disconnected, and there will
586 /// never be any more data received on it.
5bcae85e 587 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602
XL
588 Disconnected,
589}
590
1a4d82fc 591/// This enumeration is the list of the possible error outcomes for the
cc61c64b
XL
592/// [`try_send`] method.
593///
3dfed10e 594/// [`try_send`]: SyncSender::try_send
85aaf69f
SL
595#[stable(feature = "rust1", since = "1.0.0")]
596#[derive(PartialEq, Eq, Clone, Copy)]
1a4d82fc 597pub enum TrySendError<T> {
cc61c64b 598 /// The data could not be sent on the [`sync_channel`] because it would require that
1a4d82fc
JJ
599 /// the callee block to send the data.
600 ///
601 /// If this is a buffered channel, then the buffer is full at this time. If
cc61c64b 602 /// this is not a buffered channel, then there is no [`Receiver`] available to
1a4d82fc 603 /// acquire the data.
85aaf69f 604 #[stable(feature = "rust1", since = "1.0.0")]
7453a54e 605 Full(#[stable(feature = "rust1", since = "1.0.0")] T),
1a4d82fc 606
cc61c64b 607 /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
1a4d82fc 608 /// sent. The data is returned back to the callee in this case.
85aaf69f 609 #[stable(feature = "rust1", since = "1.0.0")]
7453a54e 610 Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
1a4d82fc
JJ
611}
612
613enum Flavor<T> {
476ff2be
SL
614 Oneshot(Arc<oneshot::Packet<T>>),
615 Stream(Arc<stream::Packet<T>>),
616 Shared(Arc<shared::Packet<T>>),
617 Sync(Arc<sync::Packet<T>>),
1a4d82fc
JJ
618}
619
620#[doc(hidden)]
621trait UnsafeFlavor<T> {
e9174d1e 622 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
7453a54e 623 unsafe fn inner_mut(&self) -> &mut Flavor<T> {
1a4d82fc
JJ
624 &mut *self.inner_unsafe().get()
625 }
7453a54e 626 unsafe fn inner(&self) -> &Flavor<T> {
1a4d82fc
JJ
627 &*self.inner_unsafe().get()
628 }
629}
630impl<T> UnsafeFlavor<T> for Sender<T> {
e9174d1e 631 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
1a4d82fc
JJ
632 &self.inner
633 }
634}
635impl<T> UnsafeFlavor<T> for Receiver<T> {
e9174d1e 636 fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
1a4d82fc
JJ
637 &self.inner
638 }
639}
640
641/// Creates a new asynchronous channel, returning the sender/receiver halves.
cc61c64b
XL
642/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
643/// the same order as it was sent, and no [`send`] will block the calling thread
644/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
645/// block after its buffer limit is reached). [`recv`] will block until a message
646/// is available.
647///
648/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
649/// only one [`Receiver`] is supported.
1a4d82fc 650///
cc61c64b 651/// If the [`Receiver`] is disconnected while trying to [`send`] with the
8faf50e0 652/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
cc61c64b
XL
653/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
654/// return a [`RecvError`].
476ff2be 655///
3dfed10e
XL
656/// [`send`]: Sender::send
657/// [`recv`]: Receiver::recv
476ff2be 658///
c34b1796 659/// # Examples
1a4d82fc
JJ
660///
661/// ```
662/// use std::sync::mpsc::channel;
85aaf69f 663/// use std::thread;
1a4d82fc 664///
cc61c64b 665/// let (sender, receiver) = channel();
1a4d82fc
JJ
666///
667/// // Spawn off an expensive computation
85aaf69f 668/// thread::spawn(move|| {
1a4d82fc 669/// # fn expensive_computation() {}
cc61c64b 670/// sender.send(expensive_computation()).unwrap();
1a4d82fc
JJ
671/// });
672///
673/// // Do some useful work for awhile
674///
675/// // Let's see what that answer was
cc61c64b 676/// println!("{:?}", receiver.recv().unwrap());
1a4d82fc 677/// ```
85aaf69f 678#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 679pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
476ff2be 680 let a = Arc::new(oneshot::Packet::new());
1a4d82fc
JJ
681 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
682}
683
684/// Creates a new synchronous, bounded channel.
cc61c64b
XL
685/// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
686/// in the same order as it was sent. Like asynchronous [`channel`]s, the
687/// [`Receiver`] will block until a message becomes available. `sync_channel`
688/// differs greatly in the semantics of the sender, however.
1a4d82fc 689///
476ff2be
SL
690/// This channel has an internal buffer on which messages will be queued.
691/// `bound` specifies the buffer size. When the internal buffer becomes full,
692/// future sends will *block* waiting for the buffer to open up. Note that a
693/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
cc61c64b
XL
694/// where each [`send`] will not return until a [`recv`] is paired with it.
695///
696/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
697/// times, but only one [`Receiver`] is supported.
1a4d82fc 698///
cc61c64b
XL
699/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
700/// to [`send`] with the [`SyncSender`], the [`send`] method will return a
701/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
702/// to [`recv`], the [`recv`] method will return a [`RecvError`].
476ff2be 703///
3dfed10e
XL
704/// [`send`]: SyncSender::send
705/// [`recv`]: Receiver::recv
1a4d82fc 706///
c34b1796 707/// # Examples
1a4d82fc
JJ
708///
709/// ```
710/// use std::sync::mpsc::sync_channel;
85aaf69f 711/// use std::thread;
1a4d82fc 712///
cc61c64b 713/// let (sender, receiver) = sync_channel(1);
1a4d82fc
JJ
714///
715/// // this returns immediately
cc61c64b 716/// sender.send(1).unwrap();
1a4d82fc 717///
85aaf69f 718/// thread::spawn(move|| {
1a4d82fc 719/// // this will block until the previous message has been received
cc61c64b 720/// sender.send(2).unwrap();
1a4d82fc
JJ
721/// });
722///
cc61c64b
XL
723/// assert_eq!(receiver.recv().unwrap(), 1);
724/// assert_eq!(receiver.recv().unwrap(), 2);
1a4d82fc 725/// ```
85aaf69f 726#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 727pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
476ff2be 728 let a = Arc::new(sync::Packet::new(bound));
1a4d82fc
JJ
729 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
730}
731
732////////////////////////////////////////////////////////////////////////////////
733// Sender
734////////////////////////////////////////////////////////////////////////////////
735
c34b1796 736impl<T> Sender<T> {
1a4d82fc 737 fn new(inner: Flavor<T>) -> Sender<T> {
60c5eb7d 738 Sender { inner: UnsafeCell::new(inner) }
1a4d82fc
JJ
739 }
740
741 /// Attempts to send a value on this channel, returning it back if it could
742 /// not be sent.
743 ///
744 /// A successful send occurs when it is determined that the other end of
745 /// the channel has not hung up already. An unsuccessful send would be one
746 /// where the corresponding receiver has already been deallocated. Note
cc61c64b
XL
747 /// that a return value of [`Err`] means that the data will never be
748 /// received, but a return value of [`Ok`] does *not* mean that the data
9fa01778 749 /// will be received. It is possible for the corresponding receiver to
cc61c64b
XL
750 /// hang up immediately after this function returns [`Ok`].
751 ///
1a4d82fc
JJ
752 /// This method will never block the current thread.
753 ///
c34b1796 754 /// # Examples
1a4d82fc
JJ
755 ///
756 /// ```
757 /// use std::sync::mpsc::channel;
758 ///
759 /// let (tx, rx) = channel();
760 ///
761 /// // This send is always successful
85aaf69f 762 /// tx.send(1).unwrap();
1a4d82fc
JJ
763 ///
764 /// // This send will fail because the receiver is gone
765 /// drop(rx);
a7813a04 766 /// assert_eq!(tx.send(1).unwrap_err().0, 1);
1a4d82fc 767 /// ```
85aaf69f 768 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
769 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
770 let (new_inner, ret) = match *unsafe { self.inner() } {
771 Flavor::Oneshot(ref p) => {
476ff2be
SL
772 if !p.sent() {
773 return p.send(t).map_err(SendError);
774 } else {
775 let a = Arc::new(stream::Packet::new());
776 let rx = Receiver::new(Flavor::Stream(a.clone()));
777 match p.upgrade(rx) {
778 oneshot::UpSuccess => {
779 let ret = a.send(t);
780 (a, ret)
781 }
782 oneshot::UpDisconnected => (a, Err(t)),
783 oneshot::UpWoke(token) => {
784 // This send cannot panic because the thread is
785 // asleep (we're looking at it), so the receiver
786 // can't go away.
787 a.send(t).ok().unwrap();
788 token.signal();
789 (a, Ok(()))
1a4d82fc
JJ
790 }
791 }
792 }
793 }
476ff2be
SL
794 Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
795 Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
1a4d82fc
JJ
796 Flavor::Sync(..) => unreachable!(),
797 };
798
799 unsafe {
800 let tmp = Sender::new(Flavor::Stream(new_inner));
801 mem::swap(self.inner_mut(), tmp.inner_mut());
802 }
803 ret.map_err(SendError)
804 }
805}
806
85aaf69f 807#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 808impl<T> Clone for Sender<T> {
1a4d82fc 809 fn clone(&self) -> Sender<T> {
476ff2be 810 let packet = match *unsafe { self.inner() } {
1a4d82fc 811 Flavor::Oneshot(ref p) => {
476ff2be
SL
812 let a = Arc::new(shared::Packet::new());
813 {
814 let guard = a.postinit_lock();
1a4d82fc 815 let rx = Receiver::new(Flavor::Shared(a.clone()));
476ff2be 816 let sleeper = match p.upgrade(rx) {
60c5eb7d 817 oneshot::UpSuccess | oneshot::UpDisconnected => None,
476ff2be
SL
818 oneshot::UpWoke(task) => Some(task),
819 };
820 a.inherit_blocker(sleeper, guard);
1a4d82fc 821 }
476ff2be 822 a
1a4d82fc
JJ
823 }
824 Flavor::Stream(ref p) => {
476ff2be
SL
825 let a = Arc::new(shared::Packet::new());
826 {
827 let guard = a.postinit_lock();
1a4d82fc 828 let rx = Receiver::new(Flavor::Shared(a.clone()));
476ff2be 829 let sleeper = match p.upgrade(rx) {
60c5eb7d 830 stream::UpSuccess | stream::UpDisconnected => None,
476ff2be
SL
831 stream::UpWoke(task) => Some(task),
832 };
833 a.inherit_blocker(sleeper, guard);
1a4d82fc 834 }
476ff2be 835 a
1a4d82fc
JJ
836 }
837 Flavor::Shared(ref p) => {
476ff2be 838 p.clone_chan();
1a4d82fc
JJ
839 return Sender::new(Flavor::Shared(p.clone()));
840 }
841 Flavor::Sync(..) => unreachable!(),
842 };
843
844 unsafe {
1a4d82fc
JJ
845 let tmp = Sender::new(Flavor::Shared(packet.clone()));
846 mem::swap(self.inner_mut(), tmp.inner_mut());
847 }
848 Sender::new(Flavor::Shared(packet))
849 }
850}
851
85aaf69f 852#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 853impl<T> Drop for Sender<T> {
1a4d82fc 854 fn drop(&mut self) {
476ff2be
SL
855 match *unsafe { self.inner() } {
856 Flavor::Oneshot(ref p) => p.drop_chan(),
857 Flavor::Stream(ref p) => p.drop_chan(),
858 Flavor::Shared(ref p) => p.drop_chan(),
1a4d82fc
JJ
859 Flavor::Sync(..) => unreachable!(),
860 }
861 }
862}
863
7cac9316 864#[stable(feature = "mpsc_debug", since = "1.8.0")]
7453a54e 865impl<T> fmt::Debug for Sender<T> {
532ac7d7 866 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
abe05a73 867 f.debug_struct("Sender").finish()
7453a54e
SL
868 }
869}
870
1a4d82fc
JJ
871////////////////////////////////////////////////////////////////////////////////
872// SyncSender
873////////////////////////////////////////////////////////////////////////////////
874
c34b1796 875impl<T> SyncSender<T> {
476ff2be 876 fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
a1dfa0c6 877 SyncSender { inner }
1a4d82fc
JJ
878 }
879
880 /// Sends a value on this synchronous channel.
881 ///
882 /// This function will *block* until space in the internal buffer becomes
883 /// available or a receiver is available to hand off the message to.
884 ///
885 /// Note that a successful send does *not* guarantee that the receiver will
886 /// ever see the data if there is a buffer on this channel. Items may be
887 /// enqueued in the internal buffer for the receiver to receive at a later
7cac9316
XL
888 /// time. If the buffer size is 0, however, the channel becomes a rendezvous
889 /// channel and it guarantees that the receiver has indeed received
890 /// the data if this function returns success.
1a4d82fc 891 ///
cc61c64b
XL
892 /// This function will never panic, but it may return [`Err`] if the
893 /// [`Receiver`] has disconnected and is no longer able to receive
1a4d82fc 894 /// information.
cc61c64b 895 ///
7cac9316
XL
896 /// # Examples
897 ///
898 /// ```rust
899 /// use std::sync::mpsc::sync_channel;
900 /// use std::thread;
901 ///
902 /// // Create a rendezvous sync_channel with buffer size 0
903 /// let (sync_sender, receiver) = sync_channel(0);
904 ///
905 /// thread::spawn(move || {
906 /// println!("sending message...");
907 /// sync_sender.send(1).unwrap();
908 /// // Thread is now blocked until the message is received
909 ///
910 /// println!("...message received!");
911 /// });
912 ///
913 /// let msg = receiver.recv().unwrap();
914 /// assert_eq!(1, msg);
915 /// ```
85aaf69f 916 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 917 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
476ff2be 918 self.inner.send(t).map_err(SendError)
1a4d82fc
JJ
919 }
920
921 /// Attempts to send a value on this channel without blocking.
922 ///
cc61c64b 923 /// This method differs from [`send`] by returning immediately if the
1a4d82fc 924 /// channel's buffer is full or no receiver is waiting to acquire some
cc61c64b 925 /// data. Compared with [`send`], this function has two failure cases
1a4d82fc
JJ
926 /// instead of one (one for disconnection, one for a full buffer).
927 ///
7cac9316 928 /// See [`send`] for notes about guarantees of whether the
1a4d82fc 929 /// receiver has received the data or not if this function is successful.
cc61c64b 930 ///
3dfed10e 931 /// [`send`]: Self::send
7cac9316
XL
932 ///
933 /// # Examples
934 ///
935 /// ```rust
936 /// use std::sync::mpsc::sync_channel;
937 /// use std::thread;
938 ///
939 /// // Create a sync_channel with buffer size 1
940 /// let (sync_sender, receiver) = sync_channel(1);
941 /// let sync_sender2 = sync_sender.clone();
942 ///
943 /// // First thread owns sync_sender
944 /// thread::spawn(move || {
945 /// sync_sender.send(1).unwrap();
946 /// sync_sender.send(2).unwrap();
947 /// // Thread blocked
948 /// });
949 ///
950 /// // Second thread owns sync_sender2
951 /// thread::spawn(move || {
952 /// // This will return an error and send
953 /// // no message if the buffer is full
48663c56 954 /// let _ = sync_sender2.try_send(3);
7cac9316
XL
955 /// });
956 ///
957 /// let mut msg;
958 /// msg = receiver.recv().unwrap();
959 /// println!("message {} received", msg);
960 ///
961 /// msg = receiver.recv().unwrap();
962 /// println!("message {} received", msg);
963 ///
964 /// // Third message may have never been sent
965 /// match receiver.try_recv() {
966 /// Ok(msg) => println!("message {} received", msg),
967 /// Err(_) => println!("the third message was never sent"),
968 /// }
969 /// ```
85aaf69f 970 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc 971 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
476ff2be 972 self.inner.try_send(t)
1a4d82fc
JJ
973 }
974}
975
85aaf69f 976#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 977impl<T> Clone for SyncSender<T> {
1a4d82fc 978 fn clone(&self) -> SyncSender<T> {
476ff2be 979 self.inner.clone_chan();
e9174d1e 980 SyncSender::new(self.inner.clone())
1a4d82fc
JJ
981 }
982}
983
85aaf69f 984#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 985impl<T> Drop for SyncSender<T> {
1a4d82fc 986 fn drop(&mut self) {
476ff2be 987 self.inner.drop_chan();
1a4d82fc
JJ
988 }
989}
990
7cac9316 991#[stable(feature = "mpsc_debug", since = "1.8.0")]
7453a54e 992impl<T> fmt::Debug for SyncSender<T> {
532ac7d7 993 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
abe05a73 994 f.debug_struct("SyncSender").finish()
7453a54e
SL
995 }
996}
997
1a4d82fc
JJ
998////////////////////////////////////////////////////////////////////////////////
999// Receiver
1000////////////////////////////////////////////////////////////////////////////////
1001
c34b1796 1002impl<T> Receiver<T> {
1a4d82fc
JJ
1003 fn new(inner: Flavor<T>) -> Receiver<T> {
1004 Receiver { inner: UnsafeCell::new(inner) }
1005 }
1006
7cac9316 1007 /// Attempts to return a pending value on this receiver without blocking.
1a4d82fc
JJ
1008 ///
1009 /// This method will never block the caller in order to wait for data to
1010 /// become available. Instead, this will always return immediately with a
1011 /// possible option of pending data on the channel.
1012 ///
1013 /// This is useful for a flavor of "optimistic check" before deciding to
1014 /// block on a receiver.
7cac9316
XL
1015 ///
1016 /// Compared with [`recv`], this function has two failure cases instead of one
1017 /// (one for disconnection, one for an empty buffer).
1018 ///
3dfed10e 1019 /// [`recv`]: Self::recv
7cac9316
XL
1020 ///
1021 /// # Examples
1022 ///
1023 /// ```rust
1024 /// use std::sync::mpsc::{Receiver, channel};
1025 ///
1026 /// let (_, receiver): (_, Receiver<i32>) = channel();
1027 ///
1028 /// assert!(receiver.try_recv().is_err());
1029 /// ```
85aaf69f 1030 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
1031 pub fn try_recv(&self) -> Result<T, TryRecvError> {
1032 loop {
1033 let new_port = match *unsafe { self.inner() } {
60c5eb7d
XL
1034 Flavor::Oneshot(ref p) => match p.try_recv() {
1035 Ok(t) => return Ok(t),
1036 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
1037 Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
1038 Err(oneshot::Upgraded(rx)) => rx,
1039 },
1040 Flavor::Stream(ref p) => match p.try_recv() {
1041 Ok(t) => return Ok(t),
1042 Err(stream::Empty) => return Err(TryRecvError::Empty),
1043 Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
1044 Err(stream::Upgraded(rx)) => rx,
1045 },
1046 Flavor::Shared(ref p) => match p.try_recv() {
1047 Ok(t) => return Ok(t),
1048 Err(shared::Empty) => return Err(TryRecvError::Empty),
1049 Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
1050 },
1051 Flavor::Sync(ref p) => match p.try_recv() {
1052 Ok(t) => return Ok(t),
1053 Err(sync::Empty) => return Err(TryRecvError::Empty),
1054 Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
1055 },
1a4d82fc
JJ
1056 };
1057 unsafe {
60c5eb7d 1058 mem::swap(self.inner_mut(), new_port.inner_mut());
1a4d82fc
JJ
1059 }
1060 }
1061 }
1062
9346a6ac 1063 /// Attempts to wait for a value on this receiver, returning an error if the
1a4d82fc
JJ
1064 /// corresponding channel has hung up.
1065 ///
1066 /// This function will always block the current thread if there is no data
1067 /// available and it's possible for more data to be sent. Once a message is
dfeec247 1068 /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
7cac9316 1069 /// receiver will wake up and return that message.
1a4d82fc 1070 ///
cc61c64b
XL
1071 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1072 /// this call is blocking, this call will wake up and return [`Err`] to
1a4d82fc 1073 /// indicate that no more messages can ever be received on this channel.
c1a9b12d
SL
1074 /// However, since channels are buffered, messages sent before the disconnect
1075 /// will still be properly received.
1076 ///
1077 /// # Examples
1078 ///
1079 /// ```
1080 /// use std::sync::mpsc;
1081 /// use std::thread;
1082 ///
1083 /// let (send, recv) = mpsc::channel();
1084 /// let handle = thread::spawn(move || {
1085 /// send.send(1u8).unwrap();
1086 /// });
1087 ///
1088 /// handle.join().unwrap();
1089 ///
1090 /// assert_eq!(Ok(1), recv.recv());
1091 /// ```
1092 ///
1093 /// Buffering behavior:
1094 ///
1095 /// ```
1096 /// use std::sync::mpsc;
1097 /// use std::thread;
1098 /// use std::sync::mpsc::RecvError;
1099 ///
1100 /// let (send, recv) = mpsc::channel();
1101 /// let handle = thread::spawn(move || {
1102 /// send.send(1u8).unwrap();
1103 /// send.send(2).unwrap();
1104 /// send.send(3).unwrap();
1105 /// drop(send);
1106 /// });
1107 ///
1108 /// // wait for the thread to join so we ensure the sender is dropped
1109 /// handle.join().unwrap();
1110 ///
1111 /// assert_eq!(Ok(1), recv.recv());
1112 /// assert_eq!(Ok(2), recv.recv());
1113 /// assert_eq!(Ok(3), recv.recv());
1114 /// assert_eq!(Err(RecvError), recv.recv());
1115 /// ```
85aaf69f 1116 #[stable(feature = "rust1", since = "1.0.0")]
1a4d82fc
JJ
1117 pub fn recv(&self) -> Result<T, RecvError> {
1118 loop {
1119 let new_port = match *unsafe { self.inner() } {
60c5eb7d
XL
1120 Flavor::Oneshot(ref p) => match p.recv(None) {
1121 Ok(t) => return Ok(t),
1122 Err(oneshot::Disconnected) => return Err(RecvError),
1123 Err(oneshot::Upgraded(rx)) => rx,
1124 Err(oneshot::Empty) => unreachable!(),
1125 },
1126 Flavor::Stream(ref p) => match p.recv(None) {
1127 Ok(t) => return Ok(t),
1128 Err(stream::Disconnected) => return Err(RecvError),
1129 Err(stream::Upgraded(rx)) => rx,
1130 Err(stream::Empty) => unreachable!(),
1131 },
1132 Flavor::Shared(ref p) => match p.recv(None) {
1133 Ok(t) => return Ok(t),
1134 Err(shared::Disconnected) => return Err(RecvError),
1135 Err(shared::Empty) => unreachable!(),
1136 },
476ff2be 1137 Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
1a4d82fc
JJ
1138 };
1139 unsafe {
1140 mem::swap(self.inner_mut(), new_port.inner_mut());
1141 }
1142 }
1143 }
1144
3157f602
XL
1145 /// Attempts to wait for a value on this receiver, returning an error if the
1146 /// corresponding channel has hung up, or if it waits more than `timeout`.
1147 ///
1148 /// This function will always block the current thread if there is no data
1149 /// available and it's possible for more data to be sent. Once a message is
dfeec247 1150 /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
7cac9316 1151 /// receiver will wake up and return that message.
3157f602 1152 ///
cc61c64b
XL
1153 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1154 /// this call is blocking, this call will wake up and return [`Err`] to
3157f602
XL
1155 /// indicate that no more messages can ever be received on this channel.
1156 /// However, since channels are buffered, messages sent before the disconnect
1157 /// will still be properly received.
1158 ///
b7449926
XL
1159 /// # Known Issues
1160 ///
1161 /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
1162 /// to panic unexpectedly with the following example:
1163 ///
1164 /// ```no_run
1165 /// use std::sync::mpsc::channel;
1166 /// use std::thread;
1167 /// use std::time::Duration;
1168 ///
1169 /// let (tx, rx) = channel::<String>();
1170 ///
1171 /// thread::spawn(move || {
1172 /// let d = Duration::from_millis(10);
1173 /// loop {
1174 /// println!("recv");
1175 /// let _r = rx.recv_timeout(d);
1176 /// }
1177 /// });
1178 ///
1179 /// thread::sleep(Duration::from_millis(100));
1180 /// let _c1 = tx.clone();
1181 ///
1182 /// thread::sleep(Duration::from_secs(1));
1183 /// ```
1184 ///
1185 /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
1186 ///
3157f602
XL
1187 /// # Examples
1188 ///
7cac9316
XL
1189 /// Successfully receiving value before encountering timeout:
1190 ///
3157f602 1191 /// ```no_run
7cac9316
XL
1192 /// use std::thread;
1193 /// use std::time::Duration;
1194 /// use std::sync::mpsc;
1195 ///
1196 /// let (send, recv) = mpsc::channel();
1197 ///
1198 /// thread::spawn(move || {
1199 /// send.send('a').unwrap();
1200 /// });
1201 ///
1202 /// assert_eq!(
1203 /// recv.recv_timeout(Duration::from_millis(400)),
1204 /// Ok('a')
1205 /// );
1206 /// ```
1207 ///
1208 /// Receiving an error upon reaching timeout:
1209 ///
1210 /// ```no_run
1211 /// use std::thread;
3157f602 1212 /// use std::time::Duration;
7cac9316 1213 /// use std::sync::mpsc;
3157f602 1214 ///
7cac9316 1215 /// let (send, recv) = mpsc::channel();
3157f602 1216 ///
7cac9316
XL
1217 /// thread::spawn(move || {
1218 /// thread::sleep(Duration::from_millis(800));
1219 /// send.send('a').unwrap();
1220 /// });
1221 ///
1222 /// assert_eq!(
1223 /// recv.recv_timeout(Duration::from_millis(400)),
1224 /// Err(mpsc::RecvTimeoutError::Timeout)
1225 /// );
3157f602 1226 /// ```
5bcae85e 1227 #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
3157f602
XL
1228 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1229 // Do an optimistic try_recv to avoid the performance impact of
1230 // Instant::now() in the full-channel case.
1231 match self.try_recv() {
0731742a
XL
1232 Ok(result) => Ok(result),
1233 Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
1234 Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
1235 Some(deadline) => self.recv_deadline(deadline),
1236 // So far in the future that it's practically the same as waiting indefinitely.
1237 None => self.recv().map_err(RecvTimeoutError::from),
1238 },
3157f602
XL
1239 }
1240 }
1241
ff7c6d11
XL
1242 /// Attempts to wait for a value on this receiver, returning an error if the
1243 /// corresponding channel has hung up, or if `deadline` is reached.
1244 ///
1245 /// This function will always block the current thread if there is no data
1246 /// available and it's possible for more data to be sent. Once a message is
dfeec247 1247 /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
ff7c6d11
XL
1248 /// receiver will wake up and return that message.
1249 ///
1250 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1251 /// this call is blocking, this call will wake up and return [`Err`] to
1252 /// indicate that no more messages can ever be received on this channel.
1253 /// However, since channels are buffered, messages sent before the disconnect
1254 /// will still be properly received.
1255 ///
ff7c6d11
XL
1256 /// # Examples
1257 ///
1258 /// Successfully receiving value before reaching deadline:
1259 ///
1260 /// ```no_run
1261 /// #![feature(deadline_api)]
1262 /// use std::thread;
1263 /// use std::time::{Duration, Instant};
1264 /// use std::sync::mpsc;
1265 ///
1266 /// let (send, recv) = mpsc::channel();
1267 ///
1268 /// thread::spawn(move || {
1269 /// send.send('a').unwrap();
1270 /// });
1271 ///
1272 /// assert_eq!(
1273 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1274 /// Ok('a')
1275 /// );
1276 /// ```
1277 ///
1278 /// Receiving an error upon reaching deadline:
1279 ///
1280 /// ```no_run
1281 /// #![feature(deadline_api)]
1282 /// use std::thread;
1283 /// use std::time::{Duration, Instant};
1284 /// use std::sync::mpsc;
1285 ///
1286 /// let (send, recv) = mpsc::channel();
1287 ///
1288 /// thread::spawn(move || {
1289 /// thread::sleep(Duration::from_millis(800));
1290 /// send.send('a').unwrap();
1291 /// });
1292 ///
1293 /// assert_eq!(
1294 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1295 /// Err(mpsc::RecvTimeoutError::Timeout)
1296 /// );
1297 /// ```
1298 #[unstable(feature = "deadline_api", issue = "46316")]
1299 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
3157f602
XL
1300 use self::RecvTimeoutError::*;
1301
1302 loop {
1303 let port_or_empty = match *unsafe { self.inner() } {
60c5eb7d
XL
1304 Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
1305 Ok(t) => return Ok(t),
1306 Err(oneshot::Disconnected) => return Err(Disconnected),
1307 Err(oneshot::Upgraded(rx)) => Some(rx),
1308 Err(oneshot::Empty) => None,
1309 },
1310 Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
1311 Ok(t) => return Ok(t),
1312 Err(stream::Disconnected) => return Err(Disconnected),
1313 Err(stream::Upgraded(rx)) => Some(rx),
1314 Err(stream::Empty) => None,
1315 },
1316 Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
1317 Ok(t) => return Ok(t),
1318 Err(shared::Disconnected) => return Err(Disconnected),
1319 Err(shared::Empty) => None,
1320 },
1321 Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
1322 Ok(t) => return Ok(t),
1323 Err(sync::Disconnected) => return Err(Disconnected),
1324 Err(sync::Empty) => None,
1325 },
3157f602
XL
1326 };
1327
1328 if let Some(new_port) = port_or_empty {
1329 unsafe {
1330 mem::swap(self.inner_mut(), new_port.inner_mut());
1331 }
1332 }
1333
1334 // If we're already passed the deadline, and we're here without
1335 // data, return a timeout, else try again.
1336 if Instant::now() >= deadline {
1337 return Err(Timeout);
1338 }
1339 }
1340 }
1341
1a4d82fc 1342 /// Returns an iterator that will block waiting for messages, but never
cc61c64b
XL
1343 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1344 ///
cc61c64b
XL
1345 /// # Examples
1346 ///
1347 /// ```rust
1348 /// use std::sync::mpsc::channel;
1349 /// use std::thread;
1350 ///
1351 /// let (send, recv) = channel();
1352 ///
1353 /// thread::spawn(move || {
7cac9316
XL
1354 /// send.send(1).unwrap();
1355 /// send.send(2).unwrap();
1356 /// send.send(3).unwrap();
cc61c64b
XL
1357 /// });
1358 ///
7cac9316
XL
1359 /// let mut iter = recv.iter();
1360 /// assert_eq!(iter.next(), Some(1));
1361 /// assert_eq!(iter.next(), Some(2));
1362 /// assert_eq!(iter.next(), Some(3));
1363 /// assert_eq!(iter.next(), None);
cc61c64b 1364 /// ```
85aaf69f 1365 #[stable(feature = "rust1", since = "1.0.0")]
532ac7d7 1366 pub fn iter(&self) -> Iter<'_, T> {
1a4d82fc
JJ
1367 Iter { rx: self }
1368 }
5bcae85e
SL
1369
1370 /// Returns an iterator that will attempt to yield all pending values.
1371 /// It will return `None` if there are no more pending values or if the
cc61c64b 1372 /// channel has hung up. The iterator will never [`panic!`] or block the
5bcae85e 1373 /// user by waiting for values.
cc61c64b 1374 ///
7cac9316
XL
1375 /// # Examples
1376 ///
1377 /// ```no_run
1378 /// use std::sync::mpsc::channel;
1379 /// use std::thread;
1380 /// use std::time::Duration;
1381 ///
1382 /// let (sender, receiver) = channel();
1383 ///
1384 /// // nothing is in the buffer yet
1385 /// assert!(receiver.try_iter().next().is_none());
1386 ///
1387 /// thread::spawn(move || {
1388 /// thread::sleep(Duration::from_secs(1));
1389 /// sender.send(1).unwrap();
1390 /// sender.send(2).unwrap();
1391 /// sender.send(3).unwrap();
1392 /// });
1393 ///
1394 /// // nothing is in the buffer yet
1395 /// assert!(receiver.try_iter().next().is_none());
1396 ///
1397 /// // block for two seconds
1398 /// thread::sleep(Duration::from_secs(2));
1399 ///
1400 /// let mut iter = receiver.try_iter();
1401 /// assert_eq!(iter.next(), Some(1));
1402 /// assert_eq!(iter.next(), Some(2));
1403 /// assert_eq!(iter.next(), Some(3));
1404 /// assert_eq!(iter.next(), None);
1405 /// ```
476ff2be 1406 #[stable(feature = "receiver_try_iter", since = "1.15.0")]
532ac7d7 1407 pub fn try_iter(&self) -> TryIter<'_, T> {
5bcae85e
SL
1408 TryIter { rx: self }
1409 }
1a4d82fc
JJ
1410}
1411
85aaf69f 1412#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 1413impl<'a, T> Iterator for Iter<'a, T> {
1a4d82fc
JJ
1414 type Item = T;
1415
60c5eb7d
XL
1416 fn next(&mut self) -> Option<T> {
1417 self.rx.recv().ok()
1418 }
1a4d82fc
JJ
1419}
1420
476ff2be 1421#[stable(feature = "receiver_try_iter", since = "1.15.0")]
5bcae85e
SL
1422impl<'a, T> Iterator for TryIter<'a, T> {
1423 type Item = T;
1424
60c5eb7d
XL
1425 fn next(&mut self) -> Option<T> {
1426 self.rx.try_recv().ok()
1427 }
5bcae85e
SL
1428}
1429
d9579d0f
AL
1430#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1431impl<'a, T> IntoIterator for &'a Receiver<T> {
1432 type Item = T;
1433 type IntoIter = Iter<'a, T>;
1434
60c5eb7d
XL
1435 fn into_iter(self) -> Iter<'a, T> {
1436 self.iter()
1437 }
d9579d0f
AL
1438}
1439
92a42be0 1440#[stable(feature = "receiver_into_iter", since = "1.1.0")]
d9579d0f
AL
1441impl<T> Iterator for IntoIter<T> {
1442 type Item = T;
60c5eb7d
XL
1443 fn next(&mut self) -> Option<T> {
1444 self.rx.recv().ok()
1445 }
d9579d0f
AL
1446}
1447
1448#[stable(feature = "receiver_into_iter", since = "1.1.0")]
60c5eb7d 1449impl<T> IntoIterator for Receiver<T> {
d9579d0f
AL
1450 type Item = T;
1451 type IntoIter = IntoIter<T>;
1452
1453 fn into_iter(self) -> IntoIter<T> {
1454 IntoIter { rx: self }
1455 }
1456}
1457
85aaf69f 1458#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 1459impl<T> Drop for Receiver<T> {
1a4d82fc 1460 fn drop(&mut self) {
476ff2be
SL
1461 match *unsafe { self.inner() } {
1462 Flavor::Oneshot(ref p) => p.drop_port(),
1463 Flavor::Stream(ref p) => p.drop_port(),
1464 Flavor::Shared(ref p) => p.drop_port(),
1465 Flavor::Sync(ref p) => p.drop_port(),
1a4d82fc
JJ
1466 }
1467 }
1468}
1469
7cac9316 1470#[stable(feature = "mpsc_debug", since = "1.8.0")]
7453a54e 1471impl<T> fmt::Debug for Receiver<T> {
532ac7d7 1472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
abe05a73 1473 f.debug_struct("Receiver").finish()
7453a54e
SL
1474 }
1475}
1476
85aaf69f
SL
1477#[stable(feature = "rust1", since = "1.0.0")]
1478impl<T> fmt::Debug for SendError<T> {
532ac7d7 1479 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85aaf69f 1480 "SendError(..)".fmt(f)
1a4d82fc 1481 }
85aaf69f 1482}
1a4d82fc 1483
85aaf69f
SL
1484#[stable(feature = "rust1", since = "1.0.0")]
1485impl<T> fmt::Display for SendError<T> {
532ac7d7 1486 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85aaf69f 1487 "sending on a closed channel".fmt(f)
1a4d82fc 1488 }
1a4d82fc
JJ
1489}
1490
c34b1796 1491#[stable(feature = "rust1", since = "1.0.0")]
9e0c209e 1492impl<T: Send> error::Error for SendError<T> {
dfeec247 1493 #[allow(deprecated)]
c34b1796
AL
1494 fn description(&self) -> &str {
1495 "sending on a closed channel"
1496 }
c34b1796
AL
1497}
1498
85aaf69f
SL
1499#[stable(feature = "rust1", since = "1.0.0")]
1500impl<T> fmt::Debug for TrySendError<T> {
532ac7d7 1501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85aaf69f
SL
1502 match *self {
1503 TrySendError::Full(..) => "Full(..)".fmt(f),
1504 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1505 }
1a4d82fc
JJ
1506 }
1507}
1508
85aaf69f
SL
1509#[stable(feature = "rust1", since = "1.0.0")]
1510impl<T> fmt::Display for TrySendError<T> {
532ac7d7 1511 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1a4d82fc 1512 match *self {
60c5eb7d
XL
1513 TrySendError::Full(..) => "sending on a full channel".fmt(f),
1514 TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1a4d82fc
JJ
1515 }
1516 }
1517}
1518
c34b1796 1519#[stable(feature = "rust1", since = "1.0.0")]
9e0c209e 1520impl<T: Send> error::Error for TrySendError<T> {
dfeec247 1521 #[allow(deprecated)]
c34b1796
AL
1522 fn description(&self) -> &str {
1523 match *self {
60c5eb7d
XL
1524 TrySendError::Full(..) => "sending on a full channel",
1525 TrySendError::Disconnected(..) => "sending on a closed channel",
c34b1796
AL
1526 }
1527 }
c34b1796
AL
1528}
1529
ff7c6d11
XL
1530#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1531impl<T> From<SendError<T>> for TrySendError<T> {
1b1a35ee
XL
1532 /// Converts a `SendError<T>` into a `TrySendError<T>`.
1533 ///
1534 /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError<T>`.
1535 ///
1536 /// No data is allocated on the heap.
ff7c6d11
XL
1537 fn from(err: SendError<T>) -> TrySendError<T> {
1538 match err {
1539 SendError(t) => TrySendError::Disconnected(t),
1540 }
1541 }
1542}
1543
85aaf69f
SL
1544#[stable(feature = "rust1", since = "1.0.0")]
1545impl fmt::Display for RecvError {
532ac7d7 1546 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1a4d82fc
JJ
1547 "receiving on a closed channel".fmt(f)
1548 }
1549}
1550
c34b1796
AL
1551#[stable(feature = "rust1", since = "1.0.0")]
1552impl error::Error for RecvError {
dfeec247 1553 #[allow(deprecated)]
c34b1796
AL
1554 fn description(&self) -> &str {
1555 "receiving on a closed channel"
1556 }
c34b1796
AL
1557}
1558
85aaf69f
SL
1559#[stable(feature = "rust1", since = "1.0.0")]
1560impl fmt::Display for TryRecvError {
532ac7d7 1561 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1a4d82fc 1562 match *self {
60c5eb7d
XL
1563 TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1564 TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1a4d82fc
JJ
1565 }
1566 }
1567}
1568
c34b1796
AL
1569#[stable(feature = "rust1", since = "1.0.0")]
1570impl error::Error for TryRecvError {
dfeec247 1571 #[allow(deprecated)]
c34b1796
AL
1572 fn description(&self) -> &str {
1573 match *self {
60c5eb7d
XL
1574 TryRecvError::Empty => "receiving on an empty channel",
1575 TryRecvError::Disconnected => "receiving on a closed channel",
c34b1796
AL
1576 }
1577 }
c34b1796
AL
1578}
1579
ff7c6d11
XL
1580#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1581impl From<RecvError> for TryRecvError {
1b1a35ee
XL
1582 /// Converts a `RecvError` into a `TryRecvError`.
1583 ///
1584 /// This conversion always returns `TryRecvError::Disconnected`.
1585 ///
1586 /// No data is allocated on the heap.
ff7c6d11
XL
1587 fn from(err: RecvError) -> TryRecvError {
1588 match err {
1589 RecvError => TryRecvError::Disconnected,
1590 }
1591 }
1592}
1593
7cac9316 1594#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
476ff2be 1595impl fmt::Display for RecvTimeoutError {
532ac7d7 1596 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
476ff2be 1597 match *self {
60c5eb7d
XL
1598 RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1599 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
476ff2be
SL
1600 }
1601 }
1602}
1603
7cac9316 1604#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
476ff2be 1605impl error::Error for RecvTimeoutError {
dfeec247 1606 #[allow(deprecated)]
476ff2be
SL
1607 fn description(&self) -> &str {
1608 match *self {
60c5eb7d
XL
1609 RecvTimeoutError::Timeout => "timed out waiting on channel",
1610 RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
476ff2be
SL
1611 }
1612 }
476ff2be
SL
1613}
1614
ff7c6d11
XL
1615#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1616impl From<RecvError> for RecvTimeoutError {
1b1a35ee
XL
1617 /// Converts a `RecvError` into a `RecvTimeoutError`.
1618 ///
1619 /// This conversion always returns `RecvTimeoutError::Disconnected`.
1620 ///
1621 /// No data is allocated on the heap.
ff7c6d11
XL
1622 fn from(err: RecvError) -> RecvTimeoutError {
1623 match err {
1624 RecvError => RecvTimeoutError::Disconnected,
1625 }
1626 }
1627}