]>
Commit | Line | Data |
---|---|---|
85aaf69f | 1 | //! Multi-producer, single-consumer FIFO queue communication primitives. |
1a4d82fc JJ |
2 | //! |
3 | //! This module provides message-based communication over channels, concretely | |
4 | //! defined among three types: | |
5 | //! | |
cc61c64b XL |
6 | //! * [`Sender`] |
7 | //! * [`SyncSender`] | |
8 | //! * [`Receiver`] | |
1a4d82fc | 9 | //! |
cc61c64b | 10 | //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both |
1a4d82fc JJ |
11 | //! senders are clone-able (multi-producer) such that many threads can send |
12 | //! simultaneously to one receiver (single-consumer). | |
13 | //! | |
14 | //! These channels come in two flavors: | |
15 | //! | |
cc61c64b | 16 | //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function |
1a4d82fc JJ |
17 | //! will return a `(Sender, Receiver)` tuple where all sends will be |
18 | //! **asynchronous** (they never block). The channel conceptually has an | |
19 | //! infinite buffer. | |
20 | //! | |
cc61c64b XL |
21 | //! 2. A synchronous, bounded channel. The [`sync_channel`] function will |
22 | //! return a `(SyncSender, Receiver)` tuple where the storage for pending | |
23 | //! messages is a pre-allocated buffer of a fixed size. All sends will be | |
1a4d82fc | 24 | //! **synchronous** by blocking until there is buffer space available. Note |
cc61c64b XL |
25 | //! that a bound of 0 is allowed, causing the channel to become a "rendezvous" |
26 | //! channel where each sender atomically hands off a message to a receiver. | |
27 | //! | |
3dfed10e | 28 | //! [`send`]: Sender::send |
1a4d82fc JJ |
29 | //! |
30 | //! ## Disconnection | |
31 | //! | |
cc61c64b | 32 | //! The send and receive operations on channels will all return a [`Result`] |
1a4d82fc JJ |
33 | //! indicating whether the operation succeeded or not. An unsuccessful operation |
34 | //! is normally indicative of the other half of a channel having "hung up" by | |
35 | //! being dropped in its corresponding thread. | |
36 | //! | |
37 | //! Once half of a channel has been deallocated, most operations can no longer | |
cc61c64b XL |
38 | //! continue to make progress, so [`Err`] will be returned. Many applications |
39 | //! will continue to [`unwrap`] the results returned from this module, | |
40 | //! instigating a propagation of failure among threads if one unexpectedly dies. | |
41 | //! | |
3dfed10e | 42 | //! [`unwrap`]: Result::unwrap |
1a4d82fc JJ |
43 | //! |
44 | //! # Examples | |
45 | //! | |
46 | //! Simple usage: | |
47 | //! | |
48 | //! ``` | |
85aaf69f | 49 | //! use std::thread; |
1a4d82fc JJ |
50 | //! use std::sync::mpsc::channel; |
51 | //! | |
52 | //! // Create a simple streaming channel | |
53 | //! let (tx, rx) = channel(); | |
85aaf69f SL |
54 | //! thread::spawn(move|| { |
55 | //! tx.send(10).unwrap(); | |
1a4d82fc | 56 | //! }); |
85aaf69f | 57 | //! assert_eq!(rx.recv().unwrap(), 10); |
1a4d82fc JJ |
58 | //! ``` |
59 | //! | |
60 | //! Shared usage: | |
61 | //! | |
62 | //! ``` | |
85aaf69f | 63 | //! use std::thread; |
1a4d82fc JJ |
64 | //! use std::sync::mpsc::channel; |
65 | //! | |
66 | //! // Create a shared channel that can be sent along from many threads | |
67 | //! // where tx is the sending half (tx for transmission), and rx is the receiving | |
68 | //! // half (rx for receiving). | |
69 | //! let (tx, rx) = channel(); | |
85aaf69f | 70 | //! for i in 0..10 { |
1a4d82fc | 71 | //! let tx = tx.clone(); |
85aaf69f | 72 | //! thread::spawn(move|| { |
1a4d82fc JJ |
73 | //! tx.send(i).unwrap(); |
74 | //! }); | |
75 | //! } | |
76 | //! | |
85aaf69f | 77 | //! for _ in 0..10 { |
1a4d82fc JJ |
78 | //! let j = rx.recv().unwrap(); |
79 | //! assert!(0 <= j && j < 10); | |
80 | //! } | |
81 | //! ``` | |
82 | //! | |
83 | //! Propagating panics: | |
84 | //! | |
85 | //! ``` | |
86 | //! use std::sync::mpsc::channel; | |
87 | //! | |
88 | //! // The call to recv() will return an error because the channel has already | |
89 | //! // hung up (or been deallocated) | |
c34b1796 | 90 | //! let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
91 | //! drop(tx); |
92 | //! assert!(rx.recv().is_err()); | |
93 | //! ``` | |
94 | //! | |
95 | //! Synchronous channels: | |
96 | //! | |
97 | //! ``` | |
85aaf69f | 98 | //! use std::thread; |
1a4d82fc JJ |
99 | //! use std::sync::mpsc::sync_channel; |
100 | //! | |
c34b1796 | 101 | //! let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 102 | //! thread::spawn(move|| { |
bd371182 | 103 | //! // This will wait for the parent thread to start receiving |
1a4d82fc JJ |
104 | //! tx.send(53).unwrap(); |
105 | //! }); | |
106 | //! rx.recv().unwrap(); | |
107 | //! ``` | |
1a4d82fc | 108 | |
85aaf69f | 109 | #![stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc | 110 | |
1b1a35ee XL |
111 | #[cfg(all(test, not(target_os = "emscripten")))] |
112 | mod tests; | |
113 | ||
114 | #[cfg(all(test, not(target_os = "emscripten")))] | |
115 | mod sync_tests; | |
116 | ||
1a4d82fc JJ |
117 | // A description of how Rust's channel implementation works |
118 | // | |
119 | // Channels are supposed to be the basic building block for all other | |
120 | // concurrent primitives that are used in Rust. As a result, the channel type | |
121 | // needs to be highly optimized, flexible, and broad enough for use everywhere. | |
122 | // | |
123 | // The choice of implementation of all channels is to be built on lock-free data | |
124 | // structures. The channels themselves are then consequently also lock-free data | |
125 | // structures. As always with lock-free code, this is a very "here be dragons" | |
126 | // territory, especially because I'm unaware of any academic papers that have | |
127 | // gone into great length about channels of these flavors. | |
128 | // | |
129 | // ## Flavors of channels | |
130 | // | |
131 | // From the perspective of a consumer of this library, there is only one flavor | |
132 | // of channel. This channel can be used as a stream and cloned to allow multiple | |
133 | // senders. Under the hood, however, there are actually three flavors of | |
134 | // channels in play. | |
135 | // | |
3157f602 XL |
136 | // * Flavor::Oneshots - these channels are highly optimized for the one-send use |
137 | // case. They contain as few atomics as possible and | |
138 | // involve one and exactly one allocation. | |
1a4d82fc JJ |
139 | // * Streams - these channels are optimized for the non-shared use case. They |
140 | // use a different concurrent queue that is more tailored for this | |
141 | // use case. The initial allocation of this flavor of channel is not | |
142 | // optimized. | |
143 | // * Shared - this is the most general form of channel that this module offers, | |
144 | // a channel with multiple senders. This type is as optimized as it | |
145 | // can be, but the previous two types mentioned are much faster for | |
146 | // their use-cases. | |
147 | // | |
148 | // ## Concurrent queues | |
149 | // | |
3157f602 XL |
150 | // The basic idea of Rust's Sender/Receiver types is that send() never blocks, |
151 | // but recv() obviously blocks. This means that under the hood there must be | |
152 | // some shared and concurrent queue holding all of the actual data. | |
1a4d82fc JJ |
153 | // |
154 | // With two flavors of channels, two flavors of queues are also used. We have | |
155 | // chosen to use queues from a well-known author that are abbreviated as SPSC | |
156 | // and MPSC (single producer, single consumer and multiple producer, single | |
157 | // consumer). SPSC queues are used for streams while MPSC queues are used for | |
158 | // shared channels. | |
159 | // | |
160 | // ### SPSC optimizations | |
161 | // | |
162 | // The SPSC queue found online is essentially a linked list of nodes where one | |
163 | // half of the nodes are the "queue of data" and the other half of nodes are a | |
164 | // cache of unused nodes. The unused nodes are used such that an allocation is | |
165 | // not required on every push() and a free doesn't need to happen on every | |
166 | // pop(). | |
167 | // | |
168 | // As found online, however, the cache of nodes is of an infinite size. This | |
169 | // means that if a channel at one point in its life had 50k items in the queue, | |
170 | // then the queue will always have the capacity for 50k items. I believed that | |
171 | // this was an unnecessary limitation of the implementation, so I have altered | |
172 | // the queue to optionally have a bound on the cache size. | |
173 | // | |
174 | // By default, streams will have an unbounded SPSC queue with a small-ish cache | |
175 | // size. The hope is that the cache is still large enough to have very fast | |
176 | // send() operations while not too large such that millions of channels can | |
177 | // coexist at once. | |
178 | // | |
179 | // ### MPSC optimizations | |
180 | // | |
181 | // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses | |
182 | // a linked list under the hood to earn its unboundedness, but I have not put | |
183 | // forth much effort into having a cache of nodes similar to the SPSC queue. | |
184 | // | |
185 | // For now, I believe that this is "ok" because shared channels are not the most | |
186 | // common type, but soon we may wish to revisit this queue choice and determine | |
187 | // another candidate for backend storage of shared channels. | |
188 | // | |
189 | // ## Overview of the Implementation | |
190 | // | |
191 | // Now that there's a little background on the concurrent queues used, it's | |
192 | // worth going into much more detail about the channels themselves. The basic | |
193 | // pseudocode for a send/recv are: | |
194 | // | |
195 | // | |
196 | // send(t) recv() | |
197 | // queue.push(t) return if queue.pop() | |
198 | // if increment() == -1 deschedule { | |
199 | // wakeup() if decrement() > 0 | |
200 | // cancel_deschedule() | |
201 | // } | |
202 | // queue.pop() | |
203 | // | |
204 | // As mentioned before, there are no locks in this implementation, only atomic | |
205 | // instructions are used. | |
206 | // | |
207 | // ### The internal atomic counter | |
208 | // | |
209 | // Every channel has a shared counter with each half to keep track of the size | |
210 | // of the queue. This counter is used to abort descheduling by the receiver and | |
211 | // to know when to wake up on the sending side. | |
212 | // | |
213 | // As seen in the pseudocode, senders will increment this count and receivers | |
214 | // will decrement the count. The theory behind this is that if a sender sees a | |
215 | // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, | |
216 | // then it doesn't need to block. | |
217 | // | |
218 | // The recv() method has a beginning call to pop(), and if successful, it needs | |
219 | // to decrement the count. It is a crucial implementation detail that this | |
220 | // decrement does *not* happen to the shared counter. If this were the case, | |
221 | // then it would be possible for the counter to be very negative when there were | |
222 | // no receivers waiting, in which case the senders would have to determine when | |
223 | // it was actually appropriate to wake up a receiver. | |
224 | // | |
225 | // Instead, the "steal count" is kept track of separately (not atomically | |
226 | // because it's only used by receivers), and then the decrement() call when | |
227 | // descheduling will lump in all of the recent steals into one large decrement. | |
228 | // | |
229 | // The implication of this is that if a sender sees a -1 count, then there's | |
230 | // guaranteed to be a waiter waiting! | |
231 | // | |
232 | // ## Native Implementation | |
233 | // | |
234 | // A major goal of these channels is to work seamlessly on and off the runtime. | |
235 | // All of the previous race conditions have been worded in terms of | |
236 | // scheduler-isms (which is obviously not available without the runtime). | |
237 | // | |
238 | // For now, native usage of channels (off the runtime) will fall back onto | |
239 | // mutexes/cond vars for descheduling/atomic decisions. The no-contention path | |
240 | // is still entirely lock-free, the "deschedule" blocks above are surrounded by | |
241 | // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a | |
242 | // condition variable. | |
243 | // | |
244 | // ## Select | |
245 | // | |
246 | // Being able to support selection over channels has greatly influenced this | |
247 | // design, and not only does selection need to work inside the runtime, but also | |
248 | // outside the runtime. | |
249 | // | |
250 | // The implementation is fairly straightforward. The goal of select() is not to | |
251 | // return some data, but only to return which channel can receive data without | |
252 | // blocking. The implementation is essentially the entire blocking procedure | |
253 | // followed by an increment as soon as its woken up. The cancellation procedure | |
254 | // involves an increment and swapping out of to_wake to acquire ownership of the | |
bd371182 | 255 | // thread to unblock. |
1a4d82fc JJ |
256 | // |
257 | // Sadly this current implementation requires multiple allocations, so I have | |
258 | // seen the throughput of select() be much worse than it should be. I do not | |
259 | // believe that there is anything fundamental that needs to change about these | |
260 | // channels, however, in order to support a more efficient select(). | |
261 | // | |
48663c56 XL |
262 | // FIXME: Select is now removed, so these factors are ready to be cleaned up! |
263 | // | |
1a4d82fc JJ |
264 | // # Conclusion |
265 | // | |
266 | // And now that you've seen all the races that I found and attempted to fix, | |
267 | // here's the code for you to find some more! | |
268 | ||
60c5eb7d | 269 | use crate::cell::UnsafeCell; |
532ac7d7 XL |
270 | use crate::error; |
271 | use crate::fmt; | |
272 | use crate::mem; | |
60c5eb7d | 273 | use crate::sync::Arc; |
532ac7d7 | 274 | use crate::time::{Duration, Instant}; |
1a4d82fc | 275 | |
1a4d82fc | 276 | mod blocking; |
60c5eb7d | 277 | mod mpsc_queue; |
1a4d82fc | 278 | mod oneshot; |
1a4d82fc | 279 | mod shared; |
60c5eb7d | 280 | mod spsc_queue; |
1a4d82fc JJ |
281 | mod stream; |
282 | mod sync; | |
1a4d82fc | 283 | |
abe05a73 XL |
284 | mod cache_aligned; |
285 | ||
dfeec247 | 286 | /// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type. |
7cac9316 | 287 | /// This half can only be owned by one thread. |
cc61c64b XL |
288 | /// |
289 | /// Messages sent to the channel can be retrieved using [`recv`]. | |
290 | /// | |
3dfed10e | 291 | /// [`recv`]: Receiver::recv |
cc61c64b XL |
292 | /// |
293 | /// # Examples | |
294 | /// | |
295 | /// ```rust | |
296 | /// use std::sync::mpsc::channel; | |
297 | /// use std::thread; | |
298 | /// use std::time::Duration; | |
299 | /// | |
300 | /// let (send, recv) = channel(); | |
301 | /// | |
302 | /// thread::spawn(move || { | |
303 | /// send.send("Hello world!").unwrap(); | |
304 | /// thread::sleep(Duration::from_secs(2)); // block for two seconds | |
305 | /// send.send("Delayed for 2 seconds").unwrap(); | |
306 | /// }); | |
307 | /// | |
308 | /// println!("{}", recv.recv().unwrap()); // Received immediately | |
309 | /// println!("Waiting..."); | |
310 | /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds | |
311 | /// ``` | |
85aaf69f | 312 | #[stable(feature = "rust1", since = "1.0.0")] |
6a06907d | 313 | #[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")] |
1a4d82fc JJ |
314 | pub struct Receiver<T> { |
315 | inner: UnsafeCell<Flavor<T>>, | |
316 | } | |
317 | ||
318 | // The receiver port can be sent from place to place, so long as it | |
319 | // is not used to receive non-sendable things. | |
92a42be0 | 320 | #[stable(feature = "rust1", since = "1.0.0")] |
60c5eb7d | 321 | unsafe impl<T: Send> Send for Receiver<T> {} |
1a4d82fc | 322 | |
54a0048b | 323 | #[stable(feature = "rust1", since = "1.0.0")] |
60c5eb7d | 324 | impl<T> !Sync for Receiver<T> {} |
54a0048b | 325 | |
7cac9316 XL |
326 | /// An iterator over messages on a [`Receiver`], created by [`iter`]. |
327 | /// | |
328 | /// This iterator will block whenever [`next`] is called, | |
329 | /// waiting for a new message, and [`None`] will be returned | |
cc61c64b XL |
330 | /// when the corresponding channel has hung up. |
331 | /// | |
3dfed10e XL |
332 | /// [`iter`]: Receiver::iter |
333 | /// [`next`]: Iterator::next | |
7cac9316 XL |
334 | /// |
335 | /// # Examples | |
336 | /// | |
337 | /// ```rust | |
338 | /// use std::sync::mpsc::channel; | |
339 | /// use std::thread; | |
340 | /// | |
341 | /// let (send, recv) = channel(); | |
342 | /// | |
343 | /// thread::spawn(move || { | |
344 | /// send.send(1u8).unwrap(); | |
345 | /// send.send(2u8).unwrap(); | |
346 | /// send.send(3u8).unwrap(); | |
347 | /// }); | |
348 | /// | |
349 | /// for x in recv.iter() { | |
350 | /// println!("Got: {}", x); | |
351 | /// } | |
352 | /// ``` | |
85aaf69f | 353 | #[stable(feature = "rust1", since = "1.0.0")] |
32a655c1 | 354 | #[derive(Debug)] |
c34b1796 | 355 | pub struct Iter<'a, T: 'a> { |
60c5eb7d | 356 | rx: &'a Receiver<T>, |
1a4d82fc JJ |
357 | } |
358 | ||
7cac9316 XL |
359 | /// An iterator that attempts to yield all pending values for a [`Receiver`], |
360 | /// created by [`try_iter`]. | |
5bcae85e | 361 | /// |
7cac9316 XL |
362 | /// [`None`] will be returned when there are no pending values remaining or |
363 | /// if the corresponding channel has hung up. | |
364 | /// | |
365 | /// This iterator will never block the caller in order to wait for data to | |
cc61c64b XL |
366 | /// become available. Instead, it will return [`None`]. |
367 | /// | |
3dfed10e | 368 | /// [`try_iter`]: Receiver::try_iter |
7cac9316 XL |
369 | /// |
370 | /// # Examples | |
371 | /// | |
372 | /// ```rust | |
373 | /// use std::sync::mpsc::channel; | |
374 | /// use std::thread; | |
375 | /// use std::time::Duration; | |
376 | /// | |
377 | /// let (sender, receiver) = channel(); | |
378 | /// | |
379 | /// // Nothing is in the buffer yet | |
380 | /// assert!(receiver.try_iter().next().is_none()); | |
381 | /// println!("Nothing in the buffer..."); | |
382 | /// | |
383 | /// thread::spawn(move || { | |
384 | /// sender.send(1).unwrap(); | |
385 | /// sender.send(2).unwrap(); | |
386 | /// sender.send(3).unwrap(); | |
387 | /// }); | |
388 | /// | |
389 | /// println!("Going to sleep..."); | |
390 | /// thread::sleep(Duration::from_secs(2)); // block for two seconds | |
391 | /// | |
392 | /// for x in receiver.try_iter() { | |
393 | /// println!("Got: {}", x); | |
394 | /// } | |
395 | /// ``` | |
476ff2be | 396 | #[stable(feature = "receiver_try_iter", since = "1.15.0")] |
32a655c1 | 397 | #[derive(Debug)] |
5bcae85e | 398 | pub struct TryIter<'a, T: 'a> { |
60c5eb7d | 399 | rx: &'a Receiver<T>, |
5bcae85e SL |
400 | } |
401 | ||
7cac9316 XL |
402 | /// An owning iterator over messages on a [`Receiver`], |
403 | /// created by **Receiver::into_iter**. | |
cc61c64b | 404 | /// |
7cac9316 XL |
405 | /// This iterator will block whenever [`next`] |
406 | /// is called, waiting for a new message, and [`None`] will be | |
407 | /// returned if the corresponding channel has hung up. | |
408 | /// | |
3dfed10e | 409 | /// [`next`]: Iterator::next |
cc61c64b | 410 | /// |
7cac9316 XL |
411 | /// # Examples |
412 | /// | |
413 | /// ```rust | |
414 | /// use std::sync::mpsc::channel; | |
415 | /// use std::thread; | |
416 | /// | |
417 | /// let (send, recv) = channel(); | |
418 | /// | |
419 | /// thread::spawn(move || { | |
420 | /// send.send(1u8).unwrap(); | |
421 | /// send.send(2u8).unwrap(); | |
422 | /// send.send(3u8).unwrap(); | |
423 | /// }); | |
424 | /// | |
425 | /// for x in recv.into_iter() { | |
426 | /// println!("Got: {}", x); | |
427 | /// } | |
428 | /// ``` | |
d9579d0f | 429 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] |
32a655c1 | 430 | #[derive(Debug)] |
d9579d0f | 431 | pub struct IntoIter<T> { |
60c5eb7d | 432 | rx: Receiver<T>, |
d9579d0f AL |
433 | } |
434 | ||
7cac9316 | 435 | /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be |
bd371182 | 436 | /// owned by one thread, but it can be cloned to send to other threads. |
cc61c64b XL |
437 | /// |
438 | /// Messages can be sent through this channel with [`send`]. | |
439 | /// | |
3dfed10e | 440 | /// [`send`]: Sender::send |
cc61c64b XL |
441 | /// |
442 | /// # Examples | |
443 | /// | |
444 | /// ```rust | |
445 | /// use std::sync::mpsc::channel; | |
446 | /// use std::thread; | |
447 | /// | |
448 | /// let (sender, receiver) = channel(); | |
449 | /// let sender2 = sender.clone(); | |
450 | /// | |
451 | /// // First thread owns sender | |
452 | /// thread::spawn(move || { | |
453 | /// sender.send(1).unwrap(); | |
454 | /// }); | |
455 | /// | |
456 | /// // Second thread owns sender2 | |
457 | /// thread::spawn(move || { | |
458 | /// sender2.send(2).unwrap(); | |
459 | /// }); | |
460 | /// | |
461 | /// let msg = receiver.recv().unwrap(); | |
462 | /// let msg2 = receiver.recv().unwrap(); | |
463 | /// | |
464 | /// assert_eq!(3, msg + msg2); | |
465 | /// ``` | |
85aaf69f | 466 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
467 | pub struct Sender<T> { |
468 | inner: UnsafeCell<Flavor<T>>, | |
469 | } | |
470 | ||
471 | // The send port can be sent from place to place, so long as it | |
472 | // is not used to send non-sendable things. | |
92a42be0 | 473 | #[stable(feature = "rust1", since = "1.0.0")] |
60c5eb7d | 474 | unsafe impl<T: Send> Send for Sender<T> {} |
1a4d82fc | 475 | |
54a0048b | 476 | #[stable(feature = "rust1", since = "1.0.0")] |
60c5eb7d | 477 | impl<T> !Sync for Sender<T> {} |
54a0048b | 478 | |
7cac9316 | 479 | /// The sending-half of Rust's synchronous [`sync_channel`] type. |
cc61c64b | 480 | /// |
7cac9316 XL |
481 | /// Messages can be sent through this channel with [`send`] or [`try_send`]. |
482 | /// | |
483 | /// [`send`] will block if there is no space in the internal buffer. | |
484 | /// | |
3dfed10e XL |
485 | /// [`send`]: SyncSender::send |
486 | /// [`try_send`]: SyncSender::try_send | |
cc61c64b | 487 | /// |
7cac9316 XL |
488 | /// # Examples |
489 | /// | |
490 | /// ```rust | |
491 | /// use std::sync::mpsc::sync_channel; | |
492 | /// use std::thread; | |
493 | /// | |
494 | /// // Create a sync_channel with buffer size 2 | |
495 | /// let (sync_sender, receiver) = sync_channel(2); | |
496 | /// let sync_sender2 = sync_sender.clone(); | |
497 | /// | |
498 | /// // First thread owns sync_sender | |
499 | /// thread::spawn(move || { | |
500 | /// sync_sender.send(1).unwrap(); | |
501 | /// sync_sender.send(2).unwrap(); | |
502 | /// }); | |
503 | /// | |
504 | /// // Second thread owns sync_sender2 | |
505 | /// thread::spawn(move || { | |
506 | /// sync_sender2.send(3).unwrap(); | |
507 | /// // thread will now block since the buffer is full | |
508 | /// println!("Thread unblocked!"); | |
509 | /// }); | |
510 | /// | |
511 | /// let mut msg; | |
512 | /// | |
513 | /// msg = receiver.recv().unwrap(); | |
514 | /// println!("message {} received", msg); | |
515 | /// | |
516 | /// // "Thread unblocked!" will be printed now | |
517 | /// | |
518 | /// msg = receiver.recv().unwrap(); | |
519 | /// println!("message {} received", msg); | |
520 | /// | |
521 | /// msg = receiver.recv().unwrap(); | |
522 | /// | |
523 | /// println!("message {} received", msg); | |
524 | /// ``` | |
85aaf69f | 525 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc | 526 | pub struct SyncSender<T> { |
476ff2be | 527 | inner: Arc<sync::Packet<T>>, |
1a4d82fc JJ |
528 | } |
529 | ||
92a42be0 | 530 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 531 | unsafe impl<T: Send> Send for SyncSender<T> {} |
85aaf69f | 532 | |
cc61c64b XL |
533 | /// An error returned from the [`Sender::send`] or [`SyncSender::send`] |
534 | /// function on **channel**s. | |
1a4d82fc | 535 | /// |
cc61c64b | 536 | /// A **send** operation can only fail if the receiving end of a channel is |
1a4d82fc JJ |
537 | /// disconnected, implying that the data could never be received. The error |
538 | /// contains the data being sent as a payload so it can be recovered. | |
85aaf69f SL |
539 | #[stable(feature = "rust1", since = "1.0.0")] |
540 | #[derive(PartialEq, Eq, Clone, Copy)] | |
c34b1796 | 541 | pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T); |
1a4d82fc | 542 | |
cc61c64b XL |
543 | /// An error returned from the [`recv`] function on a [`Receiver`]. |
544 | /// | |
545 | /// The [`recv`] operation can only fail if the sending half of a | |
dfeec247 | 546 | /// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further |
cc61c64b | 547 | /// messages will ever be received. |
1a4d82fc | 548 | /// |
3dfed10e | 549 | /// [`recv`]: Receiver::recv |
85aaf69f SL |
550 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
551 | #[stable(feature = "rust1", since = "1.0.0")] | |
1a4d82fc JJ |
552 | pub struct RecvError; |
553 | ||
cc61c64b XL |
554 | /// This enumeration is the list of the possible reasons that [`try_recv`] could |
555 | /// not return data when called. This can occur with both a [`channel`] and | |
556 | /// a [`sync_channel`]. | |
557 | /// | |
3dfed10e | 558 | /// [`try_recv`]: Receiver::try_recv |
85aaf69f SL |
559 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
560 | #[stable(feature = "rust1", since = "1.0.0")] | |
1a4d82fc | 561 | pub enum TryRecvError { |
cc61c64b | 562 | /// This **channel** is currently empty, but the **Sender**(s) have not yet |
1a4d82fc | 563 | /// disconnected, so data may yet become available. |
85aaf69f | 564 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
565 | Empty, |
566 | ||
cc61c64b XL |
567 | /// The **channel**'s sending half has become disconnected, and there will |
568 | /// never be any more data received on it. | |
85aaf69f | 569 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
570 | Disconnected, |
571 | } | |
572 | ||
cc61c64b XL |
573 | /// This enumeration is the list of possible errors that made [`recv_timeout`] |
574 | /// unable to return data when called. This can occur with both a [`channel`] and | |
575 | /// a [`sync_channel`]. | |
576 | /// | |
3dfed10e | 577 | /// [`recv_timeout`]: Receiver::recv_timeout |
3157f602 | 578 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
5bcae85e | 579 | #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] |
3157f602 | 580 | pub enum RecvTimeoutError { |
cc61c64b | 581 | /// This **channel** is currently empty, but the **Sender**(s) have not yet |
3157f602 | 582 | /// disconnected, so data may yet become available. |
5bcae85e | 583 | #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] |
3157f602 | 584 | Timeout, |
cc61c64b XL |
585 | /// The **channel**'s sending half has become disconnected, and there will |
586 | /// never be any more data received on it. | |
5bcae85e | 587 | #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] |
3157f602 XL |
588 | Disconnected, |
589 | } | |
590 | ||
1a4d82fc | 591 | /// This enumeration is the list of the possible error outcomes for the |
cc61c64b XL |
592 | /// [`try_send`] method. |
593 | /// | |
3dfed10e | 594 | /// [`try_send`]: SyncSender::try_send |
85aaf69f SL |
595 | #[stable(feature = "rust1", since = "1.0.0")] |
596 | #[derive(PartialEq, Eq, Clone, Copy)] | |
1a4d82fc | 597 | pub enum TrySendError<T> { |
cc61c64b | 598 | /// The data could not be sent on the [`sync_channel`] because it would require that |
1a4d82fc JJ |
599 | /// the callee block to send the data. |
600 | /// | |
601 | /// If this is a buffered channel, then the buffer is full at this time. If | |
cc61c64b | 602 | /// this is not a buffered channel, then there is no [`Receiver`] available to |
1a4d82fc | 603 | /// acquire the data. |
85aaf69f | 604 | #[stable(feature = "rust1", since = "1.0.0")] |
7453a54e | 605 | Full(#[stable(feature = "rust1", since = "1.0.0")] T), |
1a4d82fc | 606 | |
cc61c64b | 607 | /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be |
1a4d82fc | 608 | /// sent. The data is returned back to the callee in this case. |
85aaf69f | 609 | #[stable(feature = "rust1", since = "1.0.0")] |
7453a54e | 610 | Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T), |
1a4d82fc JJ |
611 | } |
612 | ||
613 | enum Flavor<T> { | |
476ff2be SL |
614 | Oneshot(Arc<oneshot::Packet<T>>), |
615 | Stream(Arc<stream::Packet<T>>), | |
616 | Shared(Arc<shared::Packet<T>>), | |
617 | Sync(Arc<sync::Packet<T>>), | |
1a4d82fc JJ |
618 | } |
619 | ||
620 | #[doc(hidden)] | |
621 | trait UnsafeFlavor<T> { | |
e9174d1e | 622 | fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>; |
7453a54e | 623 | unsafe fn inner_mut(&self) -> &mut Flavor<T> { |
1a4d82fc JJ |
624 | &mut *self.inner_unsafe().get() |
625 | } | |
7453a54e | 626 | unsafe fn inner(&self) -> &Flavor<T> { |
1a4d82fc JJ |
627 | &*self.inner_unsafe().get() |
628 | } | |
629 | } | |
630 | impl<T> UnsafeFlavor<T> for Sender<T> { | |
e9174d1e | 631 | fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> { |
1a4d82fc JJ |
632 | &self.inner |
633 | } | |
634 | } | |
635 | impl<T> UnsafeFlavor<T> for Receiver<T> { | |
e9174d1e | 636 | fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> { |
1a4d82fc JJ |
637 | &self.inner |
638 | } | |
639 | } | |
640 | ||
641 | /// Creates a new asynchronous channel, returning the sender/receiver halves. | |
cc61c64b XL |
642 | /// All data sent on the [`Sender`] will become available on the [`Receiver`] in |
643 | /// the same order as it was sent, and no [`send`] will block the calling thread | |
644 | /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will | |
645 | /// block after its buffer limit is reached). [`recv`] will block until a message | |
646 | /// is available. | |
647 | /// | |
648 | /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but | |
649 | /// only one [`Receiver`] is supported. | |
1a4d82fc | 650 | /// |
cc61c64b | 651 | /// If the [`Receiver`] is disconnected while trying to [`send`] with the |
8faf50e0 | 652 | /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the |
cc61c64b XL |
653 | /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will |
654 | /// return a [`RecvError`]. | |
476ff2be | 655 | /// |
3dfed10e XL |
656 | /// [`send`]: Sender::send |
657 | /// [`recv`]: Receiver::recv | |
476ff2be | 658 | /// |
c34b1796 | 659 | /// # Examples |
1a4d82fc JJ |
660 | /// |
661 | /// ``` | |
662 | /// use std::sync::mpsc::channel; | |
85aaf69f | 663 | /// use std::thread; |
1a4d82fc | 664 | /// |
cc61c64b | 665 | /// let (sender, receiver) = channel(); |
1a4d82fc JJ |
666 | /// |
667 | /// // Spawn off an expensive computation | |
85aaf69f | 668 | /// thread::spawn(move|| { |
1a4d82fc | 669 | /// # fn expensive_computation() {} |
cc61c64b | 670 | /// sender.send(expensive_computation()).unwrap(); |
1a4d82fc JJ |
671 | /// }); |
672 | /// | |
673 | /// // Do some useful work for awhile | |
674 | /// | |
675 | /// // Let's see what that answer was | |
cc61c64b | 676 | /// println!("{:?}", receiver.recv().unwrap()); |
1a4d82fc | 677 | /// ``` |
85aaf69f | 678 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 679 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
476ff2be | 680 | let a = Arc::new(oneshot::Packet::new()); |
1a4d82fc JJ |
681 | (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) |
682 | } | |
683 | ||
684 | /// Creates a new synchronous, bounded channel. | |
cc61c64b XL |
685 | /// All data sent on the [`SyncSender`] will become available on the [`Receiver`] |
686 | /// in the same order as it was sent. Like asynchronous [`channel`]s, the | |
687 | /// [`Receiver`] will block until a message becomes available. `sync_channel` | |
688 | /// differs greatly in the semantics of the sender, however. | |
1a4d82fc | 689 | /// |
476ff2be SL |
690 | /// This channel has an internal buffer on which messages will be queued. |
691 | /// `bound` specifies the buffer size. When the internal buffer becomes full, | |
692 | /// future sends will *block* waiting for the buffer to open up. Note that a | |
693 | /// buffer size of 0 is valid, in which case this becomes "rendezvous channel" | |
cc61c64b XL |
694 | /// where each [`send`] will not return until a [`recv`] is paired with it. |
695 | /// | |
696 | /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple | |
697 | /// times, but only one [`Receiver`] is supported. | |
1a4d82fc | 698 | /// |
cc61c64b XL |
699 | /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying |
700 | /// to [`send`] with the [`SyncSender`], the [`send`] method will return a | |
701 | /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying | |
702 | /// to [`recv`], the [`recv`] method will return a [`RecvError`]. | |
476ff2be | 703 | /// |
3dfed10e XL |
704 | /// [`send`]: SyncSender::send |
705 | /// [`recv`]: Receiver::recv | |
1a4d82fc | 706 | /// |
c34b1796 | 707 | /// # Examples |
1a4d82fc JJ |
708 | /// |
709 | /// ``` | |
710 | /// use std::sync::mpsc::sync_channel; | |
85aaf69f | 711 | /// use std::thread; |
1a4d82fc | 712 | /// |
cc61c64b | 713 | /// let (sender, receiver) = sync_channel(1); |
1a4d82fc JJ |
714 | /// |
715 | /// // this returns immediately | |
cc61c64b | 716 | /// sender.send(1).unwrap(); |
1a4d82fc | 717 | /// |
85aaf69f | 718 | /// thread::spawn(move|| { |
1a4d82fc | 719 | /// // this will block until the previous message has been received |
cc61c64b | 720 | /// sender.send(2).unwrap(); |
1a4d82fc JJ |
721 | /// }); |
722 | /// | |
cc61c64b XL |
723 | /// assert_eq!(receiver.recv().unwrap(), 1); |
724 | /// assert_eq!(receiver.recv().unwrap(), 2); | |
1a4d82fc | 725 | /// ``` |
85aaf69f | 726 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 727 | pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { |
476ff2be | 728 | let a = Arc::new(sync::Packet::new(bound)); |
1a4d82fc JJ |
729 | (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) |
730 | } | |
731 | ||
732 | //////////////////////////////////////////////////////////////////////////////// | |
733 | // Sender | |
734 | //////////////////////////////////////////////////////////////////////////////// | |
735 | ||
c34b1796 | 736 | impl<T> Sender<T> { |
1a4d82fc | 737 | fn new(inner: Flavor<T>) -> Sender<T> { |
60c5eb7d | 738 | Sender { inner: UnsafeCell::new(inner) } |
1a4d82fc JJ |
739 | } |
740 | ||
741 | /// Attempts to send a value on this channel, returning it back if it could | |
742 | /// not be sent. | |
743 | /// | |
744 | /// A successful send occurs when it is determined that the other end of | |
745 | /// the channel has not hung up already. An unsuccessful send would be one | |
746 | /// where the corresponding receiver has already been deallocated. Note | |
cc61c64b XL |
747 | /// that a return value of [`Err`] means that the data will never be |
748 | /// received, but a return value of [`Ok`] does *not* mean that the data | |
9fa01778 | 749 | /// will be received. It is possible for the corresponding receiver to |
cc61c64b XL |
750 | /// hang up immediately after this function returns [`Ok`]. |
751 | /// | |
1a4d82fc JJ |
752 | /// This method will never block the current thread. |
753 | /// | |
c34b1796 | 754 | /// # Examples |
1a4d82fc JJ |
755 | /// |
756 | /// ``` | |
757 | /// use std::sync::mpsc::channel; | |
758 | /// | |
759 | /// let (tx, rx) = channel(); | |
760 | /// | |
761 | /// // This send is always successful | |
85aaf69f | 762 | /// tx.send(1).unwrap(); |
1a4d82fc JJ |
763 | /// |
764 | /// // This send will fail because the receiver is gone | |
765 | /// drop(rx); | |
a7813a04 | 766 | /// assert_eq!(tx.send(1).unwrap_err().0, 1); |
1a4d82fc | 767 | /// ``` |
85aaf69f | 768 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
769 | pub fn send(&self, t: T) -> Result<(), SendError<T>> { |
770 | let (new_inner, ret) = match *unsafe { self.inner() } { | |
771 | Flavor::Oneshot(ref p) => { | |
476ff2be SL |
772 | if !p.sent() { |
773 | return p.send(t).map_err(SendError); | |
774 | } else { | |
775 | let a = Arc::new(stream::Packet::new()); | |
776 | let rx = Receiver::new(Flavor::Stream(a.clone())); | |
777 | match p.upgrade(rx) { | |
778 | oneshot::UpSuccess => { | |
779 | let ret = a.send(t); | |
780 | (a, ret) | |
781 | } | |
782 | oneshot::UpDisconnected => (a, Err(t)), | |
783 | oneshot::UpWoke(token) => { | |
784 | // This send cannot panic because the thread is | |
785 | // asleep (we're looking at it), so the receiver | |
786 | // can't go away. | |
787 | a.send(t).ok().unwrap(); | |
788 | token.signal(); | |
789 | (a, Ok(())) | |
1a4d82fc JJ |
790 | } |
791 | } | |
792 | } | |
793 | } | |
476ff2be SL |
794 | Flavor::Stream(ref p) => return p.send(t).map_err(SendError), |
795 | Flavor::Shared(ref p) => return p.send(t).map_err(SendError), | |
1a4d82fc JJ |
796 | Flavor::Sync(..) => unreachable!(), |
797 | }; | |
798 | ||
799 | unsafe { | |
800 | let tmp = Sender::new(Flavor::Stream(new_inner)); | |
801 | mem::swap(self.inner_mut(), tmp.inner_mut()); | |
802 | } | |
803 | ret.map_err(SendError) | |
804 | } | |
805 | } | |
806 | ||
85aaf69f | 807 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 808 | impl<T> Clone for Sender<T> { |
1a4d82fc | 809 | fn clone(&self) -> Sender<T> { |
476ff2be | 810 | let packet = match *unsafe { self.inner() } { |
1a4d82fc | 811 | Flavor::Oneshot(ref p) => { |
476ff2be SL |
812 | let a = Arc::new(shared::Packet::new()); |
813 | { | |
814 | let guard = a.postinit_lock(); | |
1a4d82fc | 815 | let rx = Receiver::new(Flavor::Shared(a.clone())); |
476ff2be | 816 | let sleeper = match p.upgrade(rx) { |
60c5eb7d | 817 | oneshot::UpSuccess | oneshot::UpDisconnected => None, |
476ff2be SL |
818 | oneshot::UpWoke(task) => Some(task), |
819 | }; | |
820 | a.inherit_blocker(sleeper, guard); | |
1a4d82fc | 821 | } |
476ff2be | 822 | a |
1a4d82fc JJ |
823 | } |
824 | Flavor::Stream(ref p) => { | |
476ff2be SL |
825 | let a = Arc::new(shared::Packet::new()); |
826 | { | |
827 | let guard = a.postinit_lock(); | |
1a4d82fc | 828 | let rx = Receiver::new(Flavor::Shared(a.clone())); |
476ff2be | 829 | let sleeper = match p.upgrade(rx) { |
60c5eb7d | 830 | stream::UpSuccess | stream::UpDisconnected => None, |
476ff2be SL |
831 | stream::UpWoke(task) => Some(task), |
832 | }; | |
833 | a.inherit_blocker(sleeper, guard); | |
1a4d82fc | 834 | } |
476ff2be | 835 | a |
1a4d82fc JJ |
836 | } |
837 | Flavor::Shared(ref p) => { | |
476ff2be | 838 | p.clone_chan(); |
1a4d82fc JJ |
839 | return Sender::new(Flavor::Shared(p.clone())); |
840 | } | |
841 | Flavor::Sync(..) => unreachable!(), | |
842 | }; | |
843 | ||
844 | unsafe { | |
1a4d82fc JJ |
845 | let tmp = Sender::new(Flavor::Shared(packet.clone())); |
846 | mem::swap(self.inner_mut(), tmp.inner_mut()); | |
847 | } | |
848 | Sender::new(Flavor::Shared(packet)) | |
849 | } | |
850 | } | |
851 | ||
85aaf69f | 852 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 853 | impl<T> Drop for Sender<T> { |
1a4d82fc | 854 | fn drop(&mut self) { |
476ff2be SL |
855 | match *unsafe { self.inner() } { |
856 | Flavor::Oneshot(ref p) => p.drop_chan(), | |
857 | Flavor::Stream(ref p) => p.drop_chan(), | |
858 | Flavor::Shared(ref p) => p.drop_chan(), | |
1a4d82fc JJ |
859 | Flavor::Sync(..) => unreachable!(), |
860 | } | |
861 | } | |
862 | } | |
863 | ||
7cac9316 | 864 | #[stable(feature = "mpsc_debug", since = "1.8.0")] |
7453a54e | 865 | impl<T> fmt::Debug for Sender<T> { |
532ac7d7 | 866 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
abe05a73 | 867 | f.debug_struct("Sender").finish() |
7453a54e SL |
868 | } |
869 | } | |
870 | ||
1a4d82fc JJ |
871 | //////////////////////////////////////////////////////////////////////////////// |
872 | // SyncSender | |
873 | //////////////////////////////////////////////////////////////////////////////// | |
874 | ||
c34b1796 | 875 | impl<T> SyncSender<T> { |
476ff2be | 876 | fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> { |
a1dfa0c6 | 877 | SyncSender { inner } |
1a4d82fc JJ |
878 | } |
879 | ||
880 | /// Sends a value on this synchronous channel. | |
881 | /// | |
882 | /// This function will *block* until space in the internal buffer becomes | |
883 | /// available or a receiver is available to hand off the message to. | |
884 | /// | |
885 | /// Note that a successful send does *not* guarantee that the receiver will | |
886 | /// ever see the data if there is a buffer on this channel. Items may be | |
887 | /// enqueued in the internal buffer for the receiver to receive at a later | |
7cac9316 XL |
888 | /// time. If the buffer size is 0, however, the channel becomes a rendezvous |
889 | /// channel and it guarantees that the receiver has indeed received | |
890 | /// the data if this function returns success. | |
1a4d82fc | 891 | /// |
cc61c64b XL |
892 | /// This function will never panic, but it may return [`Err`] if the |
893 | /// [`Receiver`] has disconnected and is no longer able to receive | |
1a4d82fc | 894 | /// information. |
cc61c64b | 895 | /// |
7cac9316 XL |
896 | /// # Examples |
897 | /// | |
898 | /// ```rust | |
899 | /// use std::sync::mpsc::sync_channel; | |
900 | /// use std::thread; | |
901 | /// | |
902 | /// // Create a rendezvous sync_channel with buffer size 0 | |
903 | /// let (sync_sender, receiver) = sync_channel(0); | |
904 | /// | |
905 | /// thread::spawn(move || { | |
906 | /// println!("sending message..."); | |
907 | /// sync_sender.send(1).unwrap(); | |
908 | /// // Thread is now blocked until the message is received | |
909 | /// | |
910 | /// println!("...message received!"); | |
911 | /// }); | |
912 | /// | |
913 | /// let msg = receiver.recv().unwrap(); | |
914 | /// assert_eq!(1, msg); | |
915 | /// ``` | |
85aaf69f | 916 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc | 917 | pub fn send(&self, t: T) -> Result<(), SendError<T>> { |
476ff2be | 918 | self.inner.send(t).map_err(SendError) |
1a4d82fc JJ |
919 | } |
920 | ||
921 | /// Attempts to send a value on this channel without blocking. | |
922 | /// | |
cc61c64b | 923 | /// This method differs from [`send`] by returning immediately if the |
1a4d82fc | 924 | /// channel's buffer is full or no receiver is waiting to acquire some |
cc61c64b | 925 | /// data. Compared with [`send`], this function has two failure cases |
1a4d82fc JJ |
926 | /// instead of one (one for disconnection, one for a full buffer). |
927 | /// | |
7cac9316 | 928 | /// See [`send`] for notes about guarantees of whether the |
1a4d82fc | 929 | /// receiver has received the data or not if this function is successful. |
cc61c64b | 930 | /// |
3dfed10e | 931 | /// [`send`]: Self::send |
7cac9316 XL |
932 | /// |
933 | /// # Examples | |
934 | /// | |
935 | /// ```rust | |
936 | /// use std::sync::mpsc::sync_channel; | |
937 | /// use std::thread; | |
938 | /// | |
939 | /// // Create a sync_channel with buffer size 1 | |
940 | /// let (sync_sender, receiver) = sync_channel(1); | |
941 | /// let sync_sender2 = sync_sender.clone(); | |
942 | /// | |
943 | /// // First thread owns sync_sender | |
944 | /// thread::spawn(move || { | |
945 | /// sync_sender.send(1).unwrap(); | |
946 | /// sync_sender.send(2).unwrap(); | |
947 | /// // Thread blocked | |
948 | /// }); | |
949 | /// | |
950 | /// // Second thread owns sync_sender2 | |
951 | /// thread::spawn(move || { | |
952 | /// // This will return an error and send | |
953 | /// // no message if the buffer is full | |
48663c56 | 954 | /// let _ = sync_sender2.try_send(3); |
7cac9316 XL |
955 | /// }); |
956 | /// | |
957 | /// let mut msg; | |
958 | /// msg = receiver.recv().unwrap(); | |
959 | /// println!("message {} received", msg); | |
960 | /// | |
961 | /// msg = receiver.recv().unwrap(); | |
962 | /// println!("message {} received", msg); | |
963 | /// | |
964 | /// // Third message may have never been sent | |
965 | /// match receiver.try_recv() { | |
966 | /// Ok(msg) => println!("message {} received", msg), | |
967 | /// Err(_) => println!("the third message was never sent"), | |
968 | /// } | |
969 | /// ``` | |
85aaf69f | 970 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc | 971 | pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { |
476ff2be | 972 | self.inner.try_send(t) |
1a4d82fc JJ |
973 | } |
974 | } | |
975 | ||
85aaf69f | 976 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 977 | impl<T> Clone for SyncSender<T> { |
1a4d82fc | 978 | fn clone(&self) -> SyncSender<T> { |
476ff2be | 979 | self.inner.clone_chan(); |
e9174d1e | 980 | SyncSender::new(self.inner.clone()) |
1a4d82fc JJ |
981 | } |
982 | } | |
983 | ||
85aaf69f | 984 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 985 | impl<T> Drop for SyncSender<T> { |
1a4d82fc | 986 | fn drop(&mut self) { |
476ff2be | 987 | self.inner.drop_chan(); |
1a4d82fc JJ |
988 | } |
989 | } | |
990 | ||
7cac9316 | 991 | #[stable(feature = "mpsc_debug", since = "1.8.0")] |
7453a54e | 992 | impl<T> fmt::Debug for SyncSender<T> { |
532ac7d7 | 993 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
abe05a73 | 994 | f.debug_struct("SyncSender").finish() |
7453a54e SL |
995 | } |
996 | } | |
997 | ||
1a4d82fc JJ |
998 | //////////////////////////////////////////////////////////////////////////////// |
999 | // Receiver | |
1000 | //////////////////////////////////////////////////////////////////////////////// | |
1001 | ||
c34b1796 | 1002 | impl<T> Receiver<T> { |
1a4d82fc JJ |
1003 | fn new(inner: Flavor<T>) -> Receiver<T> { |
1004 | Receiver { inner: UnsafeCell::new(inner) } | |
1005 | } | |
1006 | ||
7cac9316 | 1007 | /// Attempts to return a pending value on this receiver without blocking. |
1a4d82fc JJ |
1008 | /// |
1009 | /// This method will never block the caller in order to wait for data to | |
1010 | /// become available. Instead, this will always return immediately with a | |
1011 | /// possible option of pending data on the channel. | |
1012 | /// | |
1013 | /// This is useful for a flavor of "optimistic check" before deciding to | |
1014 | /// block on a receiver. | |
7cac9316 XL |
1015 | /// |
1016 | /// Compared with [`recv`], this function has two failure cases instead of one | |
1017 | /// (one for disconnection, one for an empty buffer). | |
1018 | /// | |
3dfed10e | 1019 | /// [`recv`]: Self::recv |
7cac9316 XL |
1020 | /// |
1021 | /// # Examples | |
1022 | /// | |
1023 | /// ```rust | |
1024 | /// use std::sync::mpsc::{Receiver, channel}; | |
1025 | /// | |
1026 | /// let (_, receiver): (_, Receiver<i32>) = channel(); | |
1027 | /// | |
1028 | /// assert!(receiver.try_recv().is_err()); | |
1029 | /// ``` | |
85aaf69f | 1030 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
1031 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
1032 | loop { | |
1033 | let new_port = match *unsafe { self.inner() } { | |
60c5eb7d XL |
1034 | Flavor::Oneshot(ref p) => match p.try_recv() { |
1035 | Ok(t) => return Ok(t), | |
1036 | Err(oneshot::Empty) => return Err(TryRecvError::Empty), | |
1037 | Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected), | |
1038 | Err(oneshot::Upgraded(rx)) => rx, | |
1039 | }, | |
1040 | Flavor::Stream(ref p) => match p.try_recv() { | |
1041 | Ok(t) => return Ok(t), | |
1042 | Err(stream::Empty) => return Err(TryRecvError::Empty), | |
1043 | Err(stream::Disconnected) => return Err(TryRecvError::Disconnected), | |
1044 | Err(stream::Upgraded(rx)) => rx, | |
1045 | }, | |
1046 | Flavor::Shared(ref p) => match p.try_recv() { | |
1047 | Ok(t) => return Ok(t), | |
1048 | Err(shared::Empty) => return Err(TryRecvError::Empty), | |
1049 | Err(shared::Disconnected) => return Err(TryRecvError::Disconnected), | |
1050 | }, | |
1051 | Flavor::Sync(ref p) => match p.try_recv() { | |
1052 | Ok(t) => return Ok(t), | |
1053 | Err(sync::Empty) => return Err(TryRecvError::Empty), | |
1054 | Err(sync::Disconnected) => return Err(TryRecvError::Disconnected), | |
1055 | }, | |
1a4d82fc JJ |
1056 | }; |
1057 | unsafe { | |
60c5eb7d | 1058 | mem::swap(self.inner_mut(), new_port.inner_mut()); |
1a4d82fc JJ |
1059 | } |
1060 | } | |
1061 | } | |
1062 | ||
9346a6ac | 1063 | /// Attempts to wait for a value on this receiver, returning an error if the |
1a4d82fc JJ |
1064 | /// corresponding channel has hung up. |
1065 | /// | |
1066 | /// This function will always block the current thread if there is no data | |
1067 | /// available and it's possible for more data to be sent. Once a message is | |
dfeec247 | 1068 | /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this |
7cac9316 | 1069 | /// receiver will wake up and return that message. |
1a4d82fc | 1070 | /// |
cc61c64b XL |
1071 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
1072 | /// this call is blocking, this call will wake up and return [`Err`] to | |
1a4d82fc | 1073 | /// indicate that no more messages can ever be received on this channel. |
c1a9b12d SL |
1074 | /// However, since channels are buffered, messages sent before the disconnect |
1075 | /// will still be properly received. | |
1076 | /// | |
1077 | /// # Examples | |
1078 | /// | |
1079 | /// ``` | |
1080 | /// use std::sync::mpsc; | |
1081 | /// use std::thread; | |
1082 | /// | |
1083 | /// let (send, recv) = mpsc::channel(); | |
1084 | /// let handle = thread::spawn(move || { | |
1085 | /// send.send(1u8).unwrap(); | |
1086 | /// }); | |
1087 | /// | |
1088 | /// handle.join().unwrap(); | |
1089 | /// | |
1090 | /// assert_eq!(Ok(1), recv.recv()); | |
1091 | /// ``` | |
1092 | /// | |
1093 | /// Buffering behavior: | |
1094 | /// | |
1095 | /// ``` | |
1096 | /// use std::sync::mpsc; | |
1097 | /// use std::thread; | |
1098 | /// use std::sync::mpsc::RecvError; | |
1099 | /// | |
1100 | /// let (send, recv) = mpsc::channel(); | |
1101 | /// let handle = thread::spawn(move || { | |
1102 | /// send.send(1u8).unwrap(); | |
1103 | /// send.send(2).unwrap(); | |
1104 | /// send.send(3).unwrap(); | |
1105 | /// drop(send); | |
1106 | /// }); | |
1107 | /// | |
1108 | /// // wait for the thread to join so we ensure the sender is dropped | |
1109 | /// handle.join().unwrap(); | |
1110 | /// | |
1111 | /// assert_eq!(Ok(1), recv.recv()); | |
1112 | /// assert_eq!(Ok(2), recv.recv()); | |
1113 | /// assert_eq!(Ok(3), recv.recv()); | |
1114 | /// assert_eq!(Err(RecvError), recv.recv()); | |
1115 | /// ``` | |
85aaf69f | 1116 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
1117 | pub fn recv(&self) -> Result<T, RecvError> { |
1118 | loop { | |
1119 | let new_port = match *unsafe { self.inner() } { | |
60c5eb7d XL |
1120 | Flavor::Oneshot(ref p) => match p.recv(None) { |
1121 | Ok(t) => return Ok(t), | |
1122 | Err(oneshot::Disconnected) => return Err(RecvError), | |
1123 | Err(oneshot::Upgraded(rx)) => rx, | |
1124 | Err(oneshot::Empty) => unreachable!(), | |
1125 | }, | |
1126 | Flavor::Stream(ref p) => match p.recv(None) { | |
1127 | Ok(t) => return Ok(t), | |
1128 | Err(stream::Disconnected) => return Err(RecvError), | |
1129 | Err(stream::Upgraded(rx)) => rx, | |
1130 | Err(stream::Empty) => unreachable!(), | |
1131 | }, | |
1132 | Flavor::Shared(ref p) => match p.recv(None) { | |
1133 | Ok(t) => return Ok(t), | |
1134 | Err(shared::Disconnected) => return Err(RecvError), | |
1135 | Err(shared::Empty) => unreachable!(), | |
1136 | }, | |
476ff2be | 1137 | Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError), |
1a4d82fc JJ |
1138 | }; |
1139 | unsafe { | |
1140 | mem::swap(self.inner_mut(), new_port.inner_mut()); | |
1141 | } | |
1142 | } | |
1143 | } | |
1144 | ||
3157f602 XL |
1145 | /// Attempts to wait for a value on this receiver, returning an error if the |
1146 | /// corresponding channel has hung up, or if it waits more than `timeout`. | |
1147 | /// | |
1148 | /// This function will always block the current thread if there is no data | |
1149 | /// available and it's possible for more data to be sent. Once a message is | |
dfeec247 | 1150 | /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this |
7cac9316 | 1151 | /// receiver will wake up and return that message. |
3157f602 | 1152 | /// |
cc61c64b XL |
1153 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
1154 | /// this call is blocking, this call will wake up and return [`Err`] to | |
3157f602 XL |
1155 | /// indicate that no more messages can ever be received on this channel. |
1156 | /// However, since channels are buffered, messages sent before the disconnect | |
1157 | /// will still be properly received. | |
1158 | /// | |
b7449926 XL |
1159 | /// # Known Issues |
1160 | /// | |
1161 | /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout` | |
1162 | /// to panic unexpectedly with the following example: | |
1163 | /// | |
1164 | /// ```no_run | |
1165 | /// use std::sync::mpsc::channel; | |
1166 | /// use std::thread; | |
1167 | /// use std::time::Duration; | |
1168 | /// | |
1169 | /// let (tx, rx) = channel::<String>(); | |
1170 | /// | |
1171 | /// thread::spawn(move || { | |
1172 | /// let d = Duration::from_millis(10); | |
1173 | /// loop { | |
1174 | /// println!("recv"); | |
1175 | /// let _r = rx.recv_timeout(d); | |
1176 | /// } | |
1177 | /// }); | |
1178 | /// | |
1179 | /// thread::sleep(Duration::from_millis(100)); | |
1180 | /// let _c1 = tx.clone(); | |
1181 | /// | |
1182 | /// thread::sleep(Duration::from_secs(1)); | |
1183 | /// ``` | |
1184 | /// | |
1185 | /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364 | |
1186 | /// | |
3157f602 XL |
1187 | /// # Examples |
1188 | /// | |
7cac9316 XL |
1189 | /// Successfully receiving value before encountering timeout: |
1190 | /// | |
3157f602 | 1191 | /// ```no_run |
7cac9316 XL |
1192 | /// use std::thread; |
1193 | /// use std::time::Duration; | |
1194 | /// use std::sync::mpsc; | |
1195 | /// | |
1196 | /// let (send, recv) = mpsc::channel(); | |
1197 | /// | |
1198 | /// thread::spawn(move || { | |
1199 | /// send.send('a').unwrap(); | |
1200 | /// }); | |
1201 | /// | |
1202 | /// assert_eq!( | |
1203 | /// recv.recv_timeout(Duration::from_millis(400)), | |
1204 | /// Ok('a') | |
1205 | /// ); | |
1206 | /// ``` | |
1207 | /// | |
1208 | /// Receiving an error upon reaching timeout: | |
1209 | /// | |
1210 | /// ```no_run | |
1211 | /// use std::thread; | |
3157f602 | 1212 | /// use std::time::Duration; |
7cac9316 | 1213 | /// use std::sync::mpsc; |
3157f602 | 1214 | /// |
7cac9316 | 1215 | /// let (send, recv) = mpsc::channel(); |
3157f602 | 1216 | /// |
7cac9316 XL |
1217 | /// thread::spawn(move || { |
1218 | /// thread::sleep(Duration::from_millis(800)); | |
1219 | /// send.send('a').unwrap(); | |
1220 | /// }); | |
1221 | /// | |
1222 | /// assert_eq!( | |
1223 | /// recv.recv_timeout(Duration::from_millis(400)), | |
1224 | /// Err(mpsc::RecvTimeoutError::Timeout) | |
1225 | /// ); | |
3157f602 | 1226 | /// ``` |
5bcae85e | 1227 | #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] |
3157f602 XL |
1228 | pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { |
1229 | // Do an optimistic try_recv to avoid the performance impact of | |
1230 | // Instant::now() in the full-channel case. | |
1231 | match self.try_recv() { | |
0731742a XL |
1232 | Ok(result) => Ok(result), |
1233 | Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected), | |
1234 | Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) { | |
1235 | Some(deadline) => self.recv_deadline(deadline), | |
1236 | // So far in the future that it's practically the same as waiting indefinitely. | |
1237 | None => self.recv().map_err(RecvTimeoutError::from), | |
1238 | }, | |
3157f602 XL |
1239 | } |
1240 | } | |
1241 | ||
ff7c6d11 XL |
1242 | /// Attempts to wait for a value on this receiver, returning an error if the |
1243 | /// corresponding channel has hung up, or if `deadline` is reached. | |
1244 | /// | |
1245 | /// This function will always block the current thread if there is no data | |
1246 | /// available and it's possible for more data to be sent. Once a message is | |
dfeec247 | 1247 | /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this |
ff7c6d11 XL |
1248 | /// receiver will wake up and return that message. |
1249 | /// | |
1250 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while | |
1251 | /// this call is blocking, this call will wake up and return [`Err`] to | |
1252 | /// indicate that no more messages can ever be received on this channel. | |
1253 | /// However, since channels are buffered, messages sent before the disconnect | |
1254 | /// will still be properly received. | |
1255 | /// | |
ff7c6d11 XL |
1256 | /// # Examples |
1257 | /// | |
1258 | /// Successfully receiving value before reaching deadline: | |
1259 | /// | |
1260 | /// ```no_run | |
1261 | /// #![feature(deadline_api)] | |
1262 | /// use std::thread; | |
1263 | /// use std::time::{Duration, Instant}; | |
1264 | /// use std::sync::mpsc; | |
1265 | /// | |
1266 | /// let (send, recv) = mpsc::channel(); | |
1267 | /// | |
1268 | /// thread::spawn(move || { | |
1269 | /// send.send('a').unwrap(); | |
1270 | /// }); | |
1271 | /// | |
1272 | /// assert_eq!( | |
1273 | /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), | |
1274 | /// Ok('a') | |
1275 | /// ); | |
1276 | /// ``` | |
1277 | /// | |
1278 | /// Receiving an error upon reaching deadline: | |
1279 | /// | |
1280 | /// ```no_run | |
1281 | /// #![feature(deadline_api)] | |
1282 | /// use std::thread; | |
1283 | /// use std::time::{Duration, Instant}; | |
1284 | /// use std::sync::mpsc; | |
1285 | /// | |
1286 | /// let (send, recv) = mpsc::channel(); | |
1287 | /// | |
1288 | /// thread::spawn(move || { | |
1289 | /// thread::sleep(Duration::from_millis(800)); | |
1290 | /// send.send('a').unwrap(); | |
1291 | /// }); | |
1292 | /// | |
1293 | /// assert_eq!( | |
1294 | /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), | |
1295 | /// Err(mpsc::RecvTimeoutError::Timeout) | |
1296 | /// ); | |
1297 | /// ``` | |
1298 | #[unstable(feature = "deadline_api", issue = "46316")] | |
1299 | pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { | |
3157f602 XL |
1300 | use self::RecvTimeoutError::*; |
1301 | ||
1302 | loop { | |
1303 | let port_or_empty = match *unsafe { self.inner() } { | |
60c5eb7d XL |
1304 | Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) { |
1305 | Ok(t) => return Ok(t), | |
1306 | Err(oneshot::Disconnected) => return Err(Disconnected), | |
1307 | Err(oneshot::Upgraded(rx)) => Some(rx), | |
1308 | Err(oneshot::Empty) => None, | |
1309 | }, | |
1310 | Flavor::Stream(ref p) => match p.recv(Some(deadline)) { | |
1311 | Ok(t) => return Ok(t), | |
1312 | Err(stream::Disconnected) => return Err(Disconnected), | |
1313 | Err(stream::Upgraded(rx)) => Some(rx), | |
1314 | Err(stream::Empty) => None, | |
1315 | }, | |
1316 | Flavor::Shared(ref p) => match p.recv(Some(deadline)) { | |
1317 | Ok(t) => return Ok(t), | |
1318 | Err(shared::Disconnected) => return Err(Disconnected), | |
1319 | Err(shared::Empty) => None, | |
1320 | }, | |
1321 | Flavor::Sync(ref p) => match p.recv(Some(deadline)) { | |
1322 | Ok(t) => return Ok(t), | |
1323 | Err(sync::Disconnected) => return Err(Disconnected), | |
1324 | Err(sync::Empty) => None, | |
1325 | }, | |
3157f602 XL |
1326 | }; |
1327 | ||
1328 | if let Some(new_port) = port_or_empty { | |
1329 | unsafe { | |
1330 | mem::swap(self.inner_mut(), new_port.inner_mut()); | |
1331 | } | |
1332 | } | |
1333 | ||
1334 | // If we're already passed the deadline, and we're here without | |
1335 | // data, return a timeout, else try again. | |
1336 | if Instant::now() >= deadline { | |
1337 | return Err(Timeout); | |
1338 | } | |
1339 | } | |
1340 | } | |
1341 | ||
1a4d82fc | 1342 | /// Returns an iterator that will block waiting for messages, but never |
cc61c64b XL |
1343 | /// [`panic!`]. It will return [`None`] when the channel has hung up. |
1344 | /// | |
cc61c64b XL |
1345 | /// # Examples |
1346 | /// | |
1347 | /// ```rust | |
1348 | /// use std::sync::mpsc::channel; | |
1349 | /// use std::thread; | |
1350 | /// | |
1351 | /// let (send, recv) = channel(); | |
1352 | /// | |
1353 | /// thread::spawn(move || { | |
7cac9316 XL |
1354 | /// send.send(1).unwrap(); |
1355 | /// send.send(2).unwrap(); | |
1356 | /// send.send(3).unwrap(); | |
cc61c64b XL |
1357 | /// }); |
1358 | /// | |
7cac9316 XL |
1359 | /// let mut iter = recv.iter(); |
1360 | /// assert_eq!(iter.next(), Some(1)); | |
1361 | /// assert_eq!(iter.next(), Some(2)); | |
1362 | /// assert_eq!(iter.next(), Some(3)); | |
1363 | /// assert_eq!(iter.next(), None); | |
cc61c64b | 1364 | /// ``` |
85aaf69f | 1365 | #[stable(feature = "rust1", since = "1.0.0")] |
532ac7d7 | 1366 | pub fn iter(&self) -> Iter<'_, T> { |
1a4d82fc JJ |
1367 | Iter { rx: self } |
1368 | } | |
5bcae85e SL |
1369 | |
1370 | /// Returns an iterator that will attempt to yield all pending values. | |
1371 | /// It will return `None` if there are no more pending values or if the | |
cc61c64b | 1372 | /// channel has hung up. The iterator will never [`panic!`] or block the |
5bcae85e | 1373 | /// user by waiting for values. |
cc61c64b | 1374 | /// |
7cac9316 XL |
1375 | /// # Examples |
1376 | /// | |
1377 | /// ```no_run | |
1378 | /// use std::sync::mpsc::channel; | |
1379 | /// use std::thread; | |
1380 | /// use std::time::Duration; | |
1381 | /// | |
1382 | /// let (sender, receiver) = channel(); | |
1383 | /// | |
1384 | /// // nothing is in the buffer yet | |
1385 | /// assert!(receiver.try_iter().next().is_none()); | |
1386 | /// | |
1387 | /// thread::spawn(move || { | |
1388 | /// thread::sleep(Duration::from_secs(1)); | |
1389 | /// sender.send(1).unwrap(); | |
1390 | /// sender.send(2).unwrap(); | |
1391 | /// sender.send(3).unwrap(); | |
1392 | /// }); | |
1393 | /// | |
1394 | /// // nothing is in the buffer yet | |
1395 | /// assert!(receiver.try_iter().next().is_none()); | |
1396 | /// | |
1397 | /// // block for two seconds | |
1398 | /// thread::sleep(Duration::from_secs(2)); | |
1399 | /// | |
1400 | /// let mut iter = receiver.try_iter(); | |
1401 | /// assert_eq!(iter.next(), Some(1)); | |
1402 | /// assert_eq!(iter.next(), Some(2)); | |
1403 | /// assert_eq!(iter.next(), Some(3)); | |
1404 | /// assert_eq!(iter.next(), None); | |
1405 | /// ``` | |
476ff2be | 1406 | #[stable(feature = "receiver_try_iter", since = "1.15.0")] |
532ac7d7 | 1407 | pub fn try_iter(&self) -> TryIter<'_, T> { |
5bcae85e SL |
1408 | TryIter { rx: self } |
1409 | } | |
1a4d82fc JJ |
1410 | } |
1411 | ||
85aaf69f | 1412 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 1413 | impl<'a, T> Iterator for Iter<'a, T> { |
1a4d82fc JJ |
1414 | type Item = T; |
1415 | ||
60c5eb7d XL |
1416 | fn next(&mut self) -> Option<T> { |
1417 | self.rx.recv().ok() | |
1418 | } | |
1a4d82fc JJ |
1419 | } |
1420 | ||
476ff2be | 1421 | #[stable(feature = "receiver_try_iter", since = "1.15.0")] |
5bcae85e SL |
1422 | impl<'a, T> Iterator for TryIter<'a, T> { |
1423 | type Item = T; | |
1424 | ||
60c5eb7d XL |
1425 | fn next(&mut self) -> Option<T> { |
1426 | self.rx.try_recv().ok() | |
1427 | } | |
5bcae85e SL |
1428 | } |
1429 | ||
d9579d0f AL |
1430 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] |
1431 | impl<'a, T> IntoIterator for &'a Receiver<T> { | |
1432 | type Item = T; | |
1433 | type IntoIter = Iter<'a, T>; | |
1434 | ||
60c5eb7d XL |
1435 | fn into_iter(self) -> Iter<'a, T> { |
1436 | self.iter() | |
1437 | } | |
d9579d0f AL |
1438 | } |
1439 | ||
92a42be0 | 1440 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] |
d9579d0f AL |
1441 | impl<T> Iterator for IntoIter<T> { |
1442 | type Item = T; | |
60c5eb7d XL |
1443 | fn next(&mut self) -> Option<T> { |
1444 | self.rx.recv().ok() | |
1445 | } | |
d9579d0f AL |
1446 | } |
1447 | ||
1448 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] | |
60c5eb7d | 1449 | impl<T> IntoIterator for Receiver<T> { |
d9579d0f AL |
1450 | type Item = T; |
1451 | type IntoIter = IntoIter<T>; | |
1452 | ||
1453 | fn into_iter(self) -> IntoIter<T> { | |
1454 | IntoIter { rx: self } | |
1455 | } | |
1456 | } | |
1457 | ||
85aaf69f | 1458 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 1459 | impl<T> Drop for Receiver<T> { |
1a4d82fc | 1460 | fn drop(&mut self) { |
476ff2be SL |
1461 | match *unsafe { self.inner() } { |
1462 | Flavor::Oneshot(ref p) => p.drop_port(), | |
1463 | Flavor::Stream(ref p) => p.drop_port(), | |
1464 | Flavor::Shared(ref p) => p.drop_port(), | |
1465 | Flavor::Sync(ref p) => p.drop_port(), | |
1a4d82fc JJ |
1466 | } |
1467 | } | |
1468 | } | |
1469 | ||
7cac9316 | 1470 | #[stable(feature = "mpsc_debug", since = "1.8.0")] |
7453a54e | 1471 | impl<T> fmt::Debug for Receiver<T> { |
532ac7d7 | 1472 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
abe05a73 | 1473 | f.debug_struct("Receiver").finish() |
7453a54e SL |
1474 | } |
1475 | } | |
1476 | ||
85aaf69f SL |
1477 | #[stable(feature = "rust1", since = "1.0.0")] |
1478 | impl<T> fmt::Debug for SendError<T> { | |
532ac7d7 | 1479 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
85aaf69f | 1480 | "SendError(..)".fmt(f) |
1a4d82fc | 1481 | } |
85aaf69f | 1482 | } |
1a4d82fc | 1483 | |
85aaf69f SL |
1484 | #[stable(feature = "rust1", since = "1.0.0")] |
1485 | impl<T> fmt::Display for SendError<T> { | |
532ac7d7 | 1486 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
85aaf69f | 1487 | "sending on a closed channel".fmt(f) |
1a4d82fc | 1488 | } |
1a4d82fc JJ |
1489 | } |
1490 | ||
c34b1796 | 1491 | #[stable(feature = "rust1", since = "1.0.0")] |
9e0c209e | 1492 | impl<T: Send> error::Error for SendError<T> { |
dfeec247 | 1493 | #[allow(deprecated)] |
c34b1796 AL |
1494 | fn description(&self) -> &str { |
1495 | "sending on a closed channel" | |
1496 | } | |
c34b1796 AL |
1497 | } |
1498 | ||
85aaf69f SL |
1499 | #[stable(feature = "rust1", since = "1.0.0")] |
1500 | impl<T> fmt::Debug for TrySendError<T> { | |
532ac7d7 | 1501 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
85aaf69f SL |
1502 | match *self { |
1503 | TrySendError::Full(..) => "Full(..)".fmt(f), | |
1504 | TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), | |
1505 | } | |
1a4d82fc JJ |
1506 | } |
1507 | } | |
1508 | ||
85aaf69f SL |
1509 | #[stable(feature = "rust1", since = "1.0.0")] |
1510 | impl<T> fmt::Display for TrySendError<T> { | |
532ac7d7 | 1511 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1a4d82fc | 1512 | match *self { |
60c5eb7d XL |
1513 | TrySendError::Full(..) => "sending on a full channel".fmt(f), |
1514 | TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f), | |
1a4d82fc JJ |
1515 | } |
1516 | } | |
1517 | } | |
1518 | ||
c34b1796 | 1519 | #[stable(feature = "rust1", since = "1.0.0")] |
9e0c209e | 1520 | impl<T: Send> error::Error for TrySendError<T> { |
dfeec247 | 1521 | #[allow(deprecated)] |
c34b1796 AL |
1522 | fn description(&self) -> &str { |
1523 | match *self { | |
60c5eb7d XL |
1524 | TrySendError::Full(..) => "sending on a full channel", |
1525 | TrySendError::Disconnected(..) => "sending on a closed channel", | |
c34b1796 AL |
1526 | } |
1527 | } | |
c34b1796 AL |
1528 | } |
1529 | ||
ff7c6d11 XL |
1530 | #[stable(feature = "mpsc_error_conversions", since = "1.24.0")] |
1531 | impl<T> From<SendError<T>> for TrySendError<T> { | |
1b1a35ee XL |
1532 | /// Converts a `SendError<T>` into a `TrySendError<T>`. |
1533 | /// | |
1534 | /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError<T>`. | |
1535 | /// | |
1536 | /// No data is allocated on the heap. | |
ff7c6d11 XL |
1537 | fn from(err: SendError<T>) -> TrySendError<T> { |
1538 | match err { | |
1539 | SendError(t) => TrySendError::Disconnected(t), | |
1540 | } | |
1541 | } | |
1542 | } | |
1543 | ||
85aaf69f SL |
1544 | #[stable(feature = "rust1", since = "1.0.0")] |
1545 | impl fmt::Display for RecvError { | |
532ac7d7 | 1546 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1a4d82fc JJ |
1547 | "receiving on a closed channel".fmt(f) |
1548 | } | |
1549 | } | |
1550 | ||
c34b1796 AL |
1551 | #[stable(feature = "rust1", since = "1.0.0")] |
1552 | impl error::Error for RecvError { | |
dfeec247 | 1553 | #[allow(deprecated)] |
c34b1796 AL |
1554 | fn description(&self) -> &str { |
1555 | "receiving on a closed channel" | |
1556 | } | |
c34b1796 AL |
1557 | } |
1558 | ||
85aaf69f SL |
1559 | #[stable(feature = "rust1", since = "1.0.0")] |
1560 | impl fmt::Display for TryRecvError { | |
532ac7d7 | 1561 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1a4d82fc | 1562 | match *self { |
60c5eb7d XL |
1563 | TryRecvError::Empty => "receiving on an empty channel".fmt(f), |
1564 | TryRecvError::Disconnected => "receiving on a closed channel".fmt(f), | |
1a4d82fc JJ |
1565 | } |
1566 | } | |
1567 | } | |
1568 | ||
c34b1796 AL |
1569 | #[stable(feature = "rust1", since = "1.0.0")] |
1570 | impl error::Error for TryRecvError { | |
dfeec247 | 1571 | #[allow(deprecated)] |
c34b1796 AL |
1572 | fn description(&self) -> &str { |
1573 | match *self { | |
60c5eb7d XL |
1574 | TryRecvError::Empty => "receiving on an empty channel", |
1575 | TryRecvError::Disconnected => "receiving on a closed channel", | |
c34b1796 AL |
1576 | } |
1577 | } | |
c34b1796 AL |
1578 | } |
1579 | ||
ff7c6d11 XL |
1580 | #[stable(feature = "mpsc_error_conversions", since = "1.24.0")] |
1581 | impl From<RecvError> for TryRecvError { | |
1b1a35ee XL |
1582 | /// Converts a `RecvError` into a `TryRecvError`. |
1583 | /// | |
1584 | /// This conversion always returns `TryRecvError::Disconnected`. | |
1585 | /// | |
1586 | /// No data is allocated on the heap. | |
ff7c6d11 XL |
1587 | fn from(err: RecvError) -> TryRecvError { |
1588 | match err { | |
1589 | RecvError => TryRecvError::Disconnected, | |
1590 | } | |
1591 | } | |
1592 | } | |
1593 | ||
7cac9316 | 1594 | #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] |
476ff2be | 1595 | impl fmt::Display for RecvTimeoutError { |
532ac7d7 | 1596 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
476ff2be | 1597 | match *self { |
60c5eb7d XL |
1598 | RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f), |
1599 | RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f), | |
476ff2be SL |
1600 | } |
1601 | } | |
1602 | } | |
1603 | ||
7cac9316 | 1604 | #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] |
476ff2be | 1605 | impl error::Error for RecvTimeoutError { |
dfeec247 | 1606 | #[allow(deprecated)] |
476ff2be SL |
1607 | fn description(&self) -> &str { |
1608 | match *self { | |
60c5eb7d XL |
1609 | RecvTimeoutError::Timeout => "timed out waiting on channel", |
1610 | RecvTimeoutError::Disconnected => "channel is empty and sending half is closed", | |
476ff2be SL |
1611 | } |
1612 | } | |
476ff2be SL |
1613 | } |
1614 | ||
ff7c6d11 XL |
1615 | #[stable(feature = "mpsc_error_conversions", since = "1.24.0")] |
1616 | impl From<RecvError> for RecvTimeoutError { | |
1b1a35ee XL |
1617 | /// Converts a `RecvError` into a `RecvTimeoutError`. |
1618 | /// | |
1619 | /// This conversion always returns `RecvTimeoutError::Disconnected`. | |
1620 | /// | |
1621 | /// No data is allocated on the heap. | |
ff7c6d11 XL |
1622 | fn from(err: RecvError) -> RecvTimeoutError { |
1623 | match err { | |
1624 | RecvError => RecvTimeoutError::Disconnected, | |
1625 | } | |
1626 | } | |
1627 | } |