]>
Commit | Line | Data |
---|---|---|
85aaf69f | 1 | //! Multi-producer, single-consumer FIFO queue communication primitives. |
1a4d82fc JJ |
2 | //! |
3 | //! This module provides message-based communication over channels, concretely | |
4 | //! defined among three types: | |
5 | //! | |
cc61c64b XL |
6 | //! * [`Sender`] |
7 | //! * [`SyncSender`] | |
8 | //! * [`Receiver`] | |
1a4d82fc | 9 | //! |
cc61c64b | 10 | //! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both |
1a4d82fc JJ |
11 | //! senders are clone-able (multi-producer) such that many threads can send |
12 | //! simultaneously to one receiver (single-consumer). | |
13 | //! | |
14 | //! These channels come in two flavors: | |
15 | //! | |
cc61c64b | 16 | //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function |
1a4d82fc JJ |
17 | //! will return a `(Sender, Receiver)` tuple where all sends will be |
18 | //! **asynchronous** (they never block). The channel conceptually has an | |
19 | //! infinite buffer. | |
20 | //! | |
cc61c64b XL |
21 | //! 2. A synchronous, bounded channel. The [`sync_channel`] function will |
22 | //! return a `(SyncSender, Receiver)` tuple where the storage for pending | |
23 | //! messages is a pre-allocated buffer of a fixed size. All sends will be | |
1a4d82fc | 24 | //! **synchronous** by blocking until there is buffer space available. Note |
cc61c64b XL |
25 | //! that a bound of 0 is allowed, causing the channel to become a "rendezvous" |
26 | //! channel where each sender atomically hands off a message to a receiver. | |
27 | //! | |
28 | //! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html | |
29 | //! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html | |
30 | //! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html | |
31 | //! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send | |
32 | //! [`channel`]: ../../../std/sync/mpsc/fn.channel.html | |
33 | //! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html | |
1a4d82fc JJ |
34 | //! |
35 | //! ## Disconnection | |
36 | //! | |
cc61c64b | 37 | //! The send and receive operations on channels will all return a [`Result`] |
1a4d82fc JJ |
38 | //! indicating whether the operation succeeded or not. An unsuccessful operation |
39 | //! is normally indicative of the other half of a channel having "hung up" by | |
40 | //! being dropped in its corresponding thread. | |
41 | //! | |
42 | //! Once half of a channel has been deallocated, most operations can no longer | |
cc61c64b XL |
43 | //! continue to make progress, so [`Err`] will be returned. Many applications |
44 | //! will continue to [`unwrap`] the results returned from this module, | |
45 | //! instigating a propagation of failure among threads if one unexpectedly dies. | |
46 | //! | |
47 | //! [`Result`]: ../../../std/result/enum.Result.html | |
48 | //! [`Err`]: ../../../std/result/enum.Result.html#variant.Err | |
49 | //! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap | |
1a4d82fc JJ |
50 | //! |
51 | //! # Examples | |
52 | //! | |
53 | //! Simple usage: | |
54 | //! | |
55 | //! ``` | |
85aaf69f | 56 | //! use std::thread; |
1a4d82fc JJ |
57 | //! use std::sync::mpsc::channel; |
58 | //! | |
59 | //! // Create a simple streaming channel | |
60 | //! let (tx, rx) = channel(); | |
85aaf69f SL |
61 | //! thread::spawn(move|| { |
62 | //! tx.send(10).unwrap(); | |
1a4d82fc | 63 | //! }); |
85aaf69f | 64 | //! assert_eq!(rx.recv().unwrap(), 10); |
1a4d82fc JJ |
65 | //! ``` |
66 | //! | |
67 | //! Shared usage: | |
68 | //! | |
69 | //! ``` | |
85aaf69f | 70 | //! use std::thread; |
1a4d82fc JJ |
71 | //! use std::sync::mpsc::channel; |
72 | //! | |
73 | //! // Create a shared channel that can be sent along from many threads | |
74 | //! // where tx is the sending half (tx for transmission), and rx is the receiving | |
75 | //! // half (rx for receiving). | |
76 | //! let (tx, rx) = channel(); | |
85aaf69f | 77 | //! for i in 0..10 { |
1a4d82fc | 78 | //! let tx = tx.clone(); |
85aaf69f | 79 | //! thread::spawn(move|| { |
1a4d82fc JJ |
80 | //! tx.send(i).unwrap(); |
81 | //! }); | |
82 | //! } | |
83 | //! | |
85aaf69f | 84 | //! for _ in 0..10 { |
1a4d82fc JJ |
85 | //! let j = rx.recv().unwrap(); |
86 | //! assert!(0 <= j && j < 10); | |
87 | //! } | |
88 | //! ``` | |
89 | //! | |
90 | //! Propagating panics: | |
91 | //! | |
92 | //! ``` | |
93 | //! use std::sync::mpsc::channel; | |
94 | //! | |
95 | //! // The call to recv() will return an error because the channel has already | |
96 | //! // hung up (or been deallocated) | |
c34b1796 | 97 | //! let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
98 | //! drop(tx); |
99 | //! assert!(rx.recv().is_err()); | |
100 | //! ``` | |
101 | //! | |
102 | //! Synchronous channels: | |
103 | //! | |
104 | //! ``` | |
85aaf69f | 105 | //! use std::thread; |
1a4d82fc JJ |
106 | //! use std::sync::mpsc::sync_channel; |
107 | //! | |
c34b1796 | 108 | //! let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 109 | //! thread::spawn(move|| { |
bd371182 | 110 | //! // This will wait for the parent thread to start receiving |
1a4d82fc JJ |
111 | //! tx.send(53).unwrap(); |
112 | //! }); | |
113 | //! rx.recv().unwrap(); | |
114 | //! ``` | |
1a4d82fc | 115 | |
85aaf69f | 116 | #![stable(feature = "rust1", since = "1.0.0")] |
a1dfa0c6 | 117 | #![allow(deprecated)] // for mpsc_select |
1a4d82fc JJ |
118 | |
119 | // A description of how Rust's channel implementation works | |
120 | // | |
121 | // Channels are supposed to be the basic building block for all other | |
122 | // concurrent primitives that are used in Rust. As a result, the channel type | |
123 | // needs to be highly optimized, flexible, and broad enough for use everywhere. | |
124 | // | |
125 | // The choice of implementation of all channels is to be built on lock-free data | |
126 | // structures. The channels themselves are then consequently also lock-free data | |
127 | // structures. As always with lock-free code, this is a very "here be dragons" | |
128 | // territory, especially because I'm unaware of any academic papers that have | |
129 | // gone into great length about channels of these flavors. | |
130 | // | |
131 | // ## Flavors of channels | |
132 | // | |
133 | // From the perspective of a consumer of this library, there is only one flavor | |
134 | // of channel. This channel can be used as a stream and cloned to allow multiple | |
135 | // senders. Under the hood, however, there are actually three flavors of | |
136 | // channels in play. | |
137 | // | |
3157f602 XL |
138 | // * Flavor::Oneshots - these channels are highly optimized for the one-send use |
139 | // case. They contain as few atomics as possible and | |
140 | // involve one and exactly one allocation. | |
1a4d82fc JJ |
141 | // * Streams - these channels are optimized for the non-shared use case. They |
142 | // use a different concurrent queue that is more tailored for this | |
143 | // use case. The initial allocation of this flavor of channel is not | |
144 | // optimized. | |
145 | // * Shared - this is the most general form of channel that this module offers, | |
146 | // a channel with multiple senders. This type is as optimized as it | |
147 | // can be, but the previous two types mentioned are much faster for | |
148 | // their use-cases. | |
149 | // | |
150 | // ## Concurrent queues | |
151 | // | |
3157f602 XL |
152 | // The basic idea of Rust's Sender/Receiver types is that send() never blocks, |
153 | // but recv() obviously blocks. This means that under the hood there must be | |
154 | // some shared and concurrent queue holding all of the actual data. | |
1a4d82fc JJ |
155 | // |
156 | // With two flavors of channels, two flavors of queues are also used. We have | |
157 | // chosen to use queues from a well-known author that are abbreviated as SPSC | |
158 | // and MPSC (single producer, single consumer and multiple producer, single | |
159 | // consumer). SPSC queues are used for streams while MPSC queues are used for | |
160 | // shared channels. | |
161 | // | |
162 | // ### SPSC optimizations | |
163 | // | |
164 | // The SPSC queue found online is essentially a linked list of nodes where one | |
165 | // half of the nodes are the "queue of data" and the other half of nodes are a | |
166 | // cache of unused nodes. The unused nodes are used such that an allocation is | |
167 | // not required on every push() and a free doesn't need to happen on every | |
168 | // pop(). | |
169 | // | |
170 | // As found online, however, the cache of nodes is of an infinite size. This | |
171 | // means that if a channel at one point in its life had 50k items in the queue, | |
172 | // then the queue will always have the capacity for 50k items. I believed that | |
173 | // this was an unnecessary limitation of the implementation, so I have altered | |
174 | // the queue to optionally have a bound on the cache size. | |
175 | // | |
176 | // By default, streams will have an unbounded SPSC queue with a small-ish cache | |
177 | // size. The hope is that the cache is still large enough to have very fast | |
178 | // send() operations while not too large such that millions of channels can | |
179 | // coexist at once. | |
180 | // | |
181 | // ### MPSC optimizations | |
182 | // | |
183 | // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses | |
184 | // a linked list under the hood to earn its unboundedness, but I have not put | |
185 | // forth much effort into having a cache of nodes similar to the SPSC queue. | |
186 | // | |
187 | // For now, I believe that this is "ok" because shared channels are not the most | |
188 | // common type, but soon we may wish to revisit this queue choice and determine | |
189 | // another candidate for backend storage of shared channels. | |
190 | // | |
191 | // ## Overview of the Implementation | |
192 | // | |
193 | // Now that there's a little background on the concurrent queues used, it's | |
194 | // worth going into much more detail about the channels themselves. The basic | |
195 | // pseudocode for a send/recv are: | |
196 | // | |
197 | // | |
198 | // send(t) recv() | |
199 | // queue.push(t) return if queue.pop() | |
200 | // if increment() == -1 deschedule { | |
201 | // wakeup() if decrement() > 0 | |
202 | // cancel_deschedule() | |
203 | // } | |
204 | // queue.pop() | |
205 | // | |
206 | // As mentioned before, there are no locks in this implementation, only atomic | |
207 | // instructions are used. | |
208 | // | |
209 | // ### The internal atomic counter | |
210 | // | |
211 | // Every channel has a shared counter with each half to keep track of the size | |
212 | // of the queue. This counter is used to abort descheduling by the receiver and | |
213 | // to know when to wake up on the sending side. | |
214 | // | |
215 | // As seen in the pseudocode, senders will increment this count and receivers | |
216 | // will decrement the count. The theory behind this is that if a sender sees a | |
217 | // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, | |
218 | // then it doesn't need to block. | |
219 | // | |
220 | // The recv() method has a beginning call to pop(), and if successful, it needs | |
221 | // to decrement the count. It is a crucial implementation detail that this | |
222 | // decrement does *not* happen to the shared counter. If this were the case, | |
223 | // then it would be possible for the counter to be very negative when there were | |
224 | // no receivers waiting, in which case the senders would have to determine when | |
225 | // it was actually appropriate to wake up a receiver. | |
226 | // | |
227 | // Instead, the "steal count" is kept track of separately (not atomically | |
228 | // because it's only used by receivers), and then the decrement() call when | |
229 | // descheduling will lump in all of the recent steals into one large decrement. | |
230 | // | |
231 | // The implication of this is that if a sender sees a -1 count, then there's | |
232 | // guaranteed to be a waiter waiting! | |
233 | // | |
234 | // ## Native Implementation | |
235 | // | |
236 | // A major goal of these channels is to work seamlessly on and off the runtime. | |
237 | // All of the previous race conditions have been worded in terms of | |
238 | // scheduler-isms (which is obviously not available without the runtime). | |
239 | // | |
240 | // For now, native usage of channels (off the runtime) will fall back onto | |
241 | // mutexes/cond vars for descheduling/atomic decisions. The no-contention path | |
242 | // is still entirely lock-free, the "deschedule" blocks above are surrounded by | |
243 | // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a | |
244 | // condition variable. | |
245 | // | |
246 | // ## Select | |
247 | // | |
248 | // Being able to support selection over channels has greatly influenced this | |
249 | // design, and not only does selection need to work inside the runtime, but also | |
250 | // outside the runtime. | |
251 | // | |
252 | // The implementation is fairly straightforward. The goal of select() is not to | |
253 | // return some data, but only to return which channel can receive data without | |
254 | // blocking. The implementation is essentially the entire blocking procedure | |
255 | // followed by an increment as soon as its woken up. The cancellation procedure | |
256 | // involves an increment and swapping out of to_wake to acquire ownership of the | |
bd371182 | 257 | // thread to unblock. |
1a4d82fc JJ |
258 | // |
259 | // Sadly this current implementation requires multiple allocations, so I have | |
260 | // seen the throughput of select() be much worse than it should be. I do not | |
261 | // believe that there is anything fundamental that needs to change about these | |
262 | // channels, however, in order to support a more efficient select(). | |
263 | // | |
264 | // # Conclusion | |
265 | // | |
266 | // And now that you've seen all the races that I found and attempted to fix, | |
267 | // here's the code for you to find some more! | |
268 | ||
1a4d82fc | 269 | use sync::Arc; |
c34b1796 | 270 | use error; |
1a4d82fc | 271 | use fmt; |
1a4d82fc JJ |
272 | use mem; |
273 | use cell::UnsafeCell; | |
3157f602 | 274 | use time::{Duration, Instant}; |
1a4d82fc | 275 | |
92a42be0 | 276 | #[unstable(feature = "mpsc_select", issue = "27800")] |
1a4d82fc JJ |
277 | pub use self::select::{Select, Handle}; |
278 | use self::select::StartResult; | |
279 | use self::select::StartResult::*; | |
280 | use self::blocking::SignalToken; | |
281 | ||
9fa01778 XL |
282 | #[cfg(all(test, not(target_os = "emscripten")))] |
283 | mod select_tests; | |
284 | ||
1a4d82fc JJ |
285 | mod blocking; |
286 | mod oneshot; | |
287 | mod select; | |
288 | mod shared; | |
289 | mod stream; | |
290 | mod sync; | |
291 | mod mpsc_queue; | |
292 | mod spsc_queue; | |
293 | ||
abe05a73 XL |
294 | mod cache_aligned; |
295 | ||
7cac9316 XL |
296 | /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type. |
297 | /// This half can only be owned by one thread. | |
cc61c64b XL |
298 | /// |
299 | /// Messages sent to the channel can be retrieved using [`recv`]. | |
300 | /// | |
7cac9316 XL |
301 | /// [`channel`]: fn.channel.html |
302 | /// [`sync_channel`]: fn.sync_channel.html | |
303 | /// [`recv`]: struct.Receiver.html#method.recv | |
cc61c64b XL |
304 | /// |
305 | /// # Examples | |
306 | /// | |
307 | /// ```rust | |
308 | /// use std::sync::mpsc::channel; | |
309 | /// use std::thread; | |
310 | /// use std::time::Duration; | |
311 | /// | |
312 | /// let (send, recv) = channel(); | |
313 | /// | |
314 | /// thread::spawn(move || { | |
315 | /// send.send("Hello world!").unwrap(); | |
316 | /// thread::sleep(Duration::from_secs(2)); // block for two seconds | |
317 | /// send.send("Delayed for 2 seconds").unwrap(); | |
318 | /// }); | |
319 | /// | |
320 | /// println!("{}", recv.recv().unwrap()); // Received immediately | |
321 | /// println!("Waiting..."); | |
322 | /// println!("{}", recv.recv().unwrap()); // Received after 2 seconds | |
323 | /// ``` | |
85aaf69f | 324 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
325 | pub struct Receiver<T> { |
326 | inner: UnsafeCell<Flavor<T>>, | |
327 | } | |
328 | ||
329 | // The receiver port can be sent from place to place, so long as it | |
330 | // is not used to receive non-sendable things. | |
92a42be0 | 331 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 332 | unsafe impl<T: Send> Send for Receiver<T> { } |
1a4d82fc | 333 | |
54a0048b SL |
334 | #[stable(feature = "rust1", since = "1.0.0")] |
335 | impl<T> !Sync for Receiver<T> { } | |
336 | ||
7cac9316 XL |
337 | /// An iterator over messages on a [`Receiver`], created by [`iter`]. |
338 | /// | |
339 | /// This iterator will block whenever [`next`] is called, | |
340 | /// waiting for a new message, and [`None`] will be returned | |
cc61c64b XL |
341 | /// when the corresponding channel has hung up. |
342 | /// | |
7cac9316 XL |
343 | /// [`iter`]: struct.Receiver.html#method.iter |
344 | /// [`Receiver`]: struct.Receiver.html | |
cc61c64b XL |
345 | /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next |
346 | /// [`None`]: ../../../std/option/enum.Option.html#variant.None | |
7cac9316 XL |
347 | /// |
348 | /// # Examples | |
349 | /// | |
350 | /// ```rust | |
351 | /// use std::sync::mpsc::channel; | |
352 | /// use std::thread; | |
353 | /// | |
354 | /// let (send, recv) = channel(); | |
355 | /// | |
356 | /// thread::spawn(move || { | |
357 | /// send.send(1u8).unwrap(); | |
358 | /// send.send(2u8).unwrap(); | |
359 | /// send.send(3u8).unwrap(); | |
360 | /// }); | |
361 | /// | |
362 | /// for x in recv.iter() { | |
363 | /// println!("Got: {}", x); | |
364 | /// } | |
365 | /// ``` | |
85aaf69f | 366 | #[stable(feature = "rust1", since = "1.0.0")] |
32a655c1 | 367 | #[derive(Debug)] |
c34b1796 | 368 | pub struct Iter<'a, T: 'a> { |
1a4d82fc JJ |
369 | rx: &'a Receiver<T> |
370 | } | |
371 | ||
7cac9316 XL |
372 | /// An iterator that attempts to yield all pending values for a [`Receiver`], |
373 | /// created by [`try_iter`]. | |
5bcae85e | 374 | /// |
7cac9316 XL |
375 | /// [`None`] will be returned when there are no pending values remaining or |
376 | /// if the corresponding channel has hung up. | |
377 | /// | |
378 | /// This iterator will never block the caller in order to wait for data to | |
cc61c64b XL |
379 | /// become available. Instead, it will return [`None`]. |
380 | /// | |
7cac9316 XL |
381 | /// [`Receiver`]: struct.Receiver.html |
382 | /// [`try_iter`]: struct.Receiver.html#method.try_iter | |
cc61c64b | 383 | /// [`None`]: ../../../std/option/enum.Option.html#variant.None |
7cac9316 XL |
384 | /// |
385 | /// # Examples | |
386 | /// | |
387 | /// ```rust | |
388 | /// use std::sync::mpsc::channel; | |
389 | /// use std::thread; | |
390 | /// use std::time::Duration; | |
391 | /// | |
392 | /// let (sender, receiver) = channel(); | |
393 | /// | |
394 | /// // Nothing is in the buffer yet | |
395 | /// assert!(receiver.try_iter().next().is_none()); | |
396 | /// println!("Nothing in the buffer..."); | |
397 | /// | |
398 | /// thread::spawn(move || { | |
399 | /// sender.send(1).unwrap(); | |
400 | /// sender.send(2).unwrap(); | |
401 | /// sender.send(3).unwrap(); | |
402 | /// }); | |
403 | /// | |
404 | /// println!("Going to sleep..."); | |
405 | /// thread::sleep(Duration::from_secs(2)); // block for two seconds | |
406 | /// | |
407 | /// for x in receiver.try_iter() { | |
408 | /// println!("Got: {}", x); | |
409 | /// } | |
410 | /// ``` | |
476ff2be | 411 | #[stable(feature = "receiver_try_iter", since = "1.15.0")] |
32a655c1 | 412 | #[derive(Debug)] |
5bcae85e SL |
413 | pub struct TryIter<'a, T: 'a> { |
414 | rx: &'a Receiver<T> | |
415 | } | |
416 | ||
7cac9316 XL |
417 | /// An owning iterator over messages on a [`Receiver`], |
418 | /// created by **Receiver::into_iter**. | |
cc61c64b | 419 | /// |
7cac9316 XL |
420 | /// This iterator will block whenever [`next`] |
421 | /// is called, waiting for a new message, and [`None`] will be | |
422 | /// returned if the corresponding channel has hung up. | |
423 | /// | |
424 | /// [`Receiver`]: struct.Receiver.html | |
cc61c64b XL |
425 | /// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next |
426 | /// [`None`]: ../../../std/option/enum.Option.html#variant.None | |
427 | /// | |
7cac9316 XL |
428 | /// # Examples |
429 | /// | |
430 | /// ```rust | |
431 | /// use std::sync::mpsc::channel; | |
432 | /// use std::thread; | |
433 | /// | |
434 | /// let (send, recv) = channel(); | |
435 | /// | |
436 | /// thread::spawn(move || { | |
437 | /// send.send(1u8).unwrap(); | |
438 | /// send.send(2u8).unwrap(); | |
439 | /// send.send(3u8).unwrap(); | |
440 | /// }); | |
441 | /// | |
442 | /// for x in recv.into_iter() { | |
443 | /// println!("Got: {}", x); | |
444 | /// } | |
445 | /// ``` | |
d9579d0f | 446 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] |
32a655c1 | 447 | #[derive(Debug)] |
d9579d0f AL |
448 | pub struct IntoIter<T> { |
449 | rx: Receiver<T> | |
450 | } | |
451 | ||
7cac9316 | 452 | /// The sending-half of Rust's asynchronous [`channel`] type. This half can only be |
bd371182 | 453 | /// owned by one thread, but it can be cloned to send to other threads. |
cc61c64b XL |
454 | /// |
455 | /// Messages can be sent through this channel with [`send`]. | |
456 | /// | |
7cac9316 XL |
457 | /// [`channel`]: fn.channel.html |
458 | /// [`send`]: struct.Sender.html#method.send | |
cc61c64b XL |
459 | /// |
460 | /// # Examples | |
461 | /// | |
462 | /// ```rust | |
463 | /// use std::sync::mpsc::channel; | |
464 | /// use std::thread; | |
465 | /// | |
466 | /// let (sender, receiver) = channel(); | |
467 | /// let sender2 = sender.clone(); | |
468 | /// | |
469 | /// // First thread owns sender | |
470 | /// thread::spawn(move || { | |
471 | /// sender.send(1).unwrap(); | |
472 | /// }); | |
473 | /// | |
474 | /// // Second thread owns sender2 | |
475 | /// thread::spawn(move || { | |
476 | /// sender2.send(2).unwrap(); | |
477 | /// }); | |
478 | /// | |
479 | /// let msg = receiver.recv().unwrap(); | |
480 | /// let msg2 = receiver.recv().unwrap(); | |
481 | /// | |
482 | /// assert_eq!(3, msg + msg2); | |
483 | /// ``` | |
85aaf69f | 484 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
485 | pub struct Sender<T> { |
486 | inner: UnsafeCell<Flavor<T>>, | |
487 | } | |
488 | ||
489 | // The send port can be sent from place to place, so long as it | |
490 | // is not used to send non-sendable things. | |
92a42be0 | 491 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 492 | unsafe impl<T: Send> Send for Sender<T> { } |
1a4d82fc | 493 | |
54a0048b SL |
494 | #[stable(feature = "rust1", since = "1.0.0")] |
495 | impl<T> !Sync for Sender<T> { } | |
496 | ||
7cac9316 | 497 | /// The sending-half of Rust's synchronous [`sync_channel`] type. |
cc61c64b | 498 | /// |
7cac9316 XL |
499 | /// Messages can be sent through this channel with [`send`] or [`try_send`]. |
500 | /// | |
501 | /// [`send`] will block if there is no space in the internal buffer. | |
502 | /// | |
503 | /// [`sync_channel`]: fn.sync_channel.html | |
504 | /// [`send`]: struct.SyncSender.html#method.send | |
505 | /// [`try_send`]: struct.SyncSender.html#method.try_send | |
cc61c64b | 506 | /// |
7cac9316 XL |
507 | /// # Examples |
508 | /// | |
509 | /// ```rust | |
510 | /// use std::sync::mpsc::sync_channel; | |
511 | /// use std::thread; | |
512 | /// | |
513 | /// // Create a sync_channel with buffer size 2 | |
514 | /// let (sync_sender, receiver) = sync_channel(2); | |
515 | /// let sync_sender2 = sync_sender.clone(); | |
516 | /// | |
517 | /// // First thread owns sync_sender | |
518 | /// thread::spawn(move || { | |
519 | /// sync_sender.send(1).unwrap(); | |
520 | /// sync_sender.send(2).unwrap(); | |
521 | /// }); | |
522 | /// | |
523 | /// // Second thread owns sync_sender2 | |
524 | /// thread::spawn(move || { | |
525 | /// sync_sender2.send(3).unwrap(); | |
526 | /// // thread will now block since the buffer is full | |
527 | /// println!("Thread unblocked!"); | |
528 | /// }); | |
529 | /// | |
530 | /// let mut msg; | |
531 | /// | |
532 | /// msg = receiver.recv().unwrap(); | |
533 | /// println!("message {} received", msg); | |
534 | /// | |
535 | /// // "Thread unblocked!" will be printed now | |
536 | /// | |
537 | /// msg = receiver.recv().unwrap(); | |
538 | /// println!("message {} received", msg); | |
539 | /// | |
540 | /// msg = receiver.recv().unwrap(); | |
541 | /// | |
542 | /// println!("message {} received", msg); | |
543 | /// ``` | |
85aaf69f | 544 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc | 545 | pub struct SyncSender<T> { |
476ff2be | 546 | inner: Arc<sync::Packet<T>>, |
1a4d82fc JJ |
547 | } |
548 | ||
92a42be0 | 549 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 550 | unsafe impl<T: Send> Send for SyncSender<T> {} |
85aaf69f | 551 | |
cc61c64b XL |
552 | /// An error returned from the [`Sender::send`] or [`SyncSender::send`] |
553 | /// function on **channel**s. | |
1a4d82fc | 554 | /// |
cc61c64b | 555 | /// A **send** operation can only fail if the receiving end of a channel is |
1a4d82fc JJ |
556 | /// disconnected, implying that the data could never be received. The error |
557 | /// contains the data being sent as a payload so it can be recovered. | |
cc61c64b XL |
558 | /// |
559 | /// [`Sender::send`]: struct.Sender.html#method.send | |
560 | /// [`SyncSender::send`]: struct.SyncSender.html#method.send | |
85aaf69f SL |
561 | #[stable(feature = "rust1", since = "1.0.0")] |
562 | #[derive(PartialEq, Eq, Clone, Copy)] | |
c34b1796 | 563 | pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T); |
1a4d82fc | 564 | |
cc61c64b XL |
565 | /// An error returned from the [`recv`] function on a [`Receiver`]. |
566 | /// | |
567 | /// The [`recv`] operation can only fail if the sending half of a | |
568 | /// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further | |
569 | /// messages will ever be received. | |
1a4d82fc | 570 | /// |
cc61c64b XL |
571 | /// [`recv`]: struct.Receiver.html#method.recv |
572 | /// [`Receiver`]: struct.Receiver.html | |
573 | /// [`channel`]: fn.channel.html | |
574 | /// [`sync_channel`]: fn.sync_channel.html | |
85aaf69f SL |
575 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
576 | #[stable(feature = "rust1", since = "1.0.0")] | |
1a4d82fc JJ |
577 | pub struct RecvError; |
578 | ||
cc61c64b XL |
579 | /// This enumeration is the list of the possible reasons that [`try_recv`] could |
580 | /// not return data when called. This can occur with both a [`channel`] and | |
581 | /// a [`sync_channel`]. | |
582 | /// | |
583 | /// [`try_recv`]: struct.Receiver.html#method.try_recv | |
584 | /// [`channel`]: fn.channel.html | |
585 | /// [`sync_channel`]: fn.sync_channel.html | |
85aaf69f SL |
586 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
587 | #[stable(feature = "rust1", since = "1.0.0")] | |
1a4d82fc | 588 | pub enum TryRecvError { |
cc61c64b | 589 | /// This **channel** is currently empty, but the **Sender**(s) have not yet |
1a4d82fc | 590 | /// disconnected, so data may yet become available. |
85aaf69f | 591 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
592 | Empty, |
593 | ||
cc61c64b XL |
594 | /// The **channel**'s sending half has become disconnected, and there will |
595 | /// never be any more data received on it. | |
85aaf69f | 596 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
597 | Disconnected, |
598 | } | |
599 | ||
cc61c64b XL |
600 | /// This enumeration is the list of possible errors that made [`recv_timeout`] |
601 | /// unable to return data when called. This can occur with both a [`channel`] and | |
602 | /// a [`sync_channel`]. | |
603 | /// | |
604 | /// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout | |
605 | /// [`channel`]: fn.channel.html | |
606 | /// [`sync_channel`]: fn.sync_channel.html | |
3157f602 | 607 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
5bcae85e | 608 | #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] |
3157f602 | 609 | pub enum RecvTimeoutError { |
cc61c64b | 610 | /// This **channel** is currently empty, but the **Sender**(s) have not yet |
3157f602 | 611 | /// disconnected, so data may yet become available. |
5bcae85e | 612 | #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] |
3157f602 | 613 | Timeout, |
cc61c64b XL |
614 | /// The **channel**'s sending half has become disconnected, and there will |
615 | /// never be any more data received on it. | |
5bcae85e | 616 | #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] |
3157f602 XL |
617 | Disconnected, |
618 | } | |
619 | ||
1a4d82fc | 620 | /// This enumeration is the list of the possible error outcomes for the |
cc61c64b XL |
621 | /// [`try_send`] method. |
622 | /// | |
623 | /// [`try_send`]: struct.SyncSender.html#method.try_send | |
85aaf69f SL |
624 | #[stable(feature = "rust1", since = "1.0.0")] |
625 | #[derive(PartialEq, Eq, Clone, Copy)] | |
1a4d82fc | 626 | pub enum TrySendError<T> { |
cc61c64b | 627 | /// The data could not be sent on the [`sync_channel`] because it would require that |
1a4d82fc JJ |
628 | /// the callee block to send the data. |
629 | /// | |
630 | /// If this is a buffered channel, then the buffer is full at this time. If | |
cc61c64b | 631 | /// this is not a buffered channel, then there is no [`Receiver`] available to |
1a4d82fc | 632 | /// acquire the data. |
cc61c64b XL |
633 | /// |
634 | /// [`sync_channel`]: fn.sync_channel.html | |
635 | /// [`Receiver`]: struct.Receiver.html | |
85aaf69f | 636 | #[stable(feature = "rust1", since = "1.0.0")] |
7453a54e | 637 | Full(#[stable(feature = "rust1", since = "1.0.0")] T), |
1a4d82fc | 638 | |
cc61c64b | 639 | /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be |
1a4d82fc | 640 | /// sent. The data is returned back to the callee in this case. |
cc61c64b XL |
641 | /// |
642 | /// [`sync_channel`]: fn.sync_channel.html | |
85aaf69f | 643 | #[stable(feature = "rust1", since = "1.0.0")] |
7453a54e | 644 | Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T), |
1a4d82fc JJ |
645 | } |
646 | ||
647 | enum Flavor<T> { | |
476ff2be SL |
648 | Oneshot(Arc<oneshot::Packet<T>>), |
649 | Stream(Arc<stream::Packet<T>>), | |
650 | Shared(Arc<shared::Packet<T>>), | |
651 | Sync(Arc<sync::Packet<T>>), | |
1a4d82fc JJ |
652 | } |
653 | ||
654 | #[doc(hidden)] | |
655 | trait UnsafeFlavor<T> { | |
e9174d1e | 656 | fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>; |
7453a54e | 657 | unsafe fn inner_mut(&self) -> &mut Flavor<T> { |
1a4d82fc JJ |
658 | &mut *self.inner_unsafe().get() |
659 | } | |
7453a54e | 660 | unsafe fn inner(&self) -> &Flavor<T> { |
1a4d82fc JJ |
661 | &*self.inner_unsafe().get() |
662 | } | |
663 | } | |
664 | impl<T> UnsafeFlavor<T> for Sender<T> { | |
e9174d1e | 665 | fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> { |
1a4d82fc JJ |
666 | &self.inner |
667 | } | |
668 | } | |
669 | impl<T> UnsafeFlavor<T> for Receiver<T> { | |
e9174d1e | 670 | fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> { |
1a4d82fc JJ |
671 | &self.inner |
672 | } | |
673 | } | |
674 | ||
675 | /// Creates a new asynchronous channel, returning the sender/receiver halves. | |
cc61c64b XL |
676 | /// All data sent on the [`Sender`] will become available on the [`Receiver`] in |
677 | /// the same order as it was sent, and no [`send`] will block the calling thread | |
678 | /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will | |
679 | /// block after its buffer limit is reached). [`recv`] will block until a message | |
680 | /// is available. | |
681 | /// | |
682 | /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but | |
683 | /// only one [`Receiver`] is supported. | |
1a4d82fc | 684 | /// |
cc61c64b | 685 | /// If the [`Receiver`] is disconnected while trying to [`send`] with the |
8faf50e0 | 686 | /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the |
cc61c64b XL |
687 | /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will |
688 | /// return a [`RecvError`]. | |
476ff2be | 689 | /// |
cc61c64b XL |
690 | /// [`send`]: struct.Sender.html#method.send |
691 | /// [`recv`]: struct.Receiver.html#method.recv | |
692 | /// [`Sender`]: struct.Sender.html | |
693 | /// [`Receiver`]: struct.Receiver.html | |
694 | /// [`sync_channel`]: fn.sync_channel.html | |
695 | /// [`SendError`]: struct.SendError.html | |
696 | /// [`RecvError`]: struct.RecvError.html | |
476ff2be | 697 | /// |
c34b1796 | 698 | /// # Examples |
1a4d82fc JJ |
699 | /// |
700 | /// ``` | |
701 | /// use std::sync::mpsc::channel; | |
85aaf69f | 702 | /// use std::thread; |
1a4d82fc | 703 | /// |
cc61c64b | 704 | /// let (sender, receiver) = channel(); |
1a4d82fc JJ |
705 | /// |
706 | /// // Spawn off an expensive computation | |
85aaf69f | 707 | /// thread::spawn(move|| { |
1a4d82fc | 708 | /// # fn expensive_computation() {} |
cc61c64b | 709 | /// sender.send(expensive_computation()).unwrap(); |
1a4d82fc JJ |
710 | /// }); |
711 | /// | |
712 | /// // Do some useful work for awhile | |
713 | /// | |
714 | /// // Let's see what that answer was | |
cc61c64b | 715 | /// println!("{:?}", receiver.recv().unwrap()); |
1a4d82fc | 716 | /// ``` |
85aaf69f | 717 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 718 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
476ff2be | 719 | let a = Arc::new(oneshot::Packet::new()); |
1a4d82fc JJ |
720 | (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) |
721 | } | |
722 | ||
723 | /// Creates a new synchronous, bounded channel. | |
cc61c64b XL |
724 | /// All data sent on the [`SyncSender`] will become available on the [`Receiver`] |
725 | /// in the same order as it was sent. Like asynchronous [`channel`]s, the | |
726 | /// [`Receiver`] will block until a message becomes available. `sync_channel` | |
727 | /// differs greatly in the semantics of the sender, however. | |
1a4d82fc | 728 | /// |
476ff2be SL |
729 | /// This channel has an internal buffer on which messages will be queued. |
730 | /// `bound` specifies the buffer size. When the internal buffer becomes full, | |
731 | /// future sends will *block* waiting for the buffer to open up. Note that a | |
732 | /// buffer size of 0 is valid, in which case this becomes "rendezvous channel" | |
cc61c64b XL |
733 | /// where each [`send`] will not return until a [`recv`] is paired with it. |
734 | /// | |
735 | /// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple | |
736 | /// times, but only one [`Receiver`] is supported. | |
1a4d82fc | 737 | /// |
cc61c64b XL |
738 | /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying |
739 | /// to [`send`] with the [`SyncSender`], the [`send`] method will return a | |
740 | /// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying | |
741 | /// to [`recv`], the [`recv`] method will return a [`RecvError`]. | |
476ff2be | 742 | /// |
cc61c64b XL |
743 | /// [`channel`]: fn.channel.html |
744 | /// [`send`]: struct.SyncSender.html#method.send | |
745 | /// [`recv`]: struct.Receiver.html#method.recv | |
746 | /// [`SyncSender`]: struct.SyncSender.html | |
747 | /// [`Receiver`]: struct.Receiver.html | |
748 | /// [`SendError`]: struct.SendError.html | |
749 | /// [`RecvError`]: struct.RecvError.html | |
1a4d82fc | 750 | /// |
c34b1796 | 751 | /// # Examples |
1a4d82fc JJ |
752 | /// |
753 | /// ``` | |
754 | /// use std::sync::mpsc::sync_channel; | |
85aaf69f | 755 | /// use std::thread; |
1a4d82fc | 756 | /// |
cc61c64b | 757 | /// let (sender, receiver) = sync_channel(1); |
1a4d82fc JJ |
758 | /// |
759 | /// // this returns immediately | |
cc61c64b | 760 | /// sender.send(1).unwrap(); |
1a4d82fc | 761 | /// |
85aaf69f | 762 | /// thread::spawn(move|| { |
1a4d82fc | 763 | /// // this will block until the previous message has been received |
cc61c64b | 764 | /// sender.send(2).unwrap(); |
1a4d82fc JJ |
765 | /// }); |
766 | /// | |
cc61c64b XL |
767 | /// assert_eq!(receiver.recv().unwrap(), 1); |
768 | /// assert_eq!(receiver.recv().unwrap(), 2); | |
1a4d82fc | 769 | /// ``` |
85aaf69f | 770 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 771 | pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { |
476ff2be | 772 | let a = Arc::new(sync::Packet::new(bound)); |
1a4d82fc JJ |
773 | (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) |
774 | } | |
775 | ||
776 | //////////////////////////////////////////////////////////////////////////////// | |
777 | // Sender | |
778 | //////////////////////////////////////////////////////////////////////////////// | |
779 | ||
c34b1796 | 780 | impl<T> Sender<T> { |
1a4d82fc JJ |
781 | fn new(inner: Flavor<T>) -> Sender<T> { |
782 | Sender { | |
783 | inner: UnsafeCell::new(inner), | |
784 | } | |
785 | } | |
786 | ||
787 | /// Attempts to send a value on this channel, returning it back if it could | |
788 | /// not be sent. | |
789 | /// | |
790 | /// A successful send occurs when it is determined that the other end of | |
791 | /// the channel has not hung up already. An unsuccessful send would be one | |
792 | /// where the corresponding receiver has already been deallocated. Note | |
cc61c64b XL |
793 | /// that a return value of [`Err`] means that the data will never be |
794 | /// received, but a return value of [`Ok`] does *not* mean that the data | |
9fa01778 | 795 | /// will be received. It is possible for the corresponding receiver to |
cc61c64b XL |
796 | /// hang up immediately after this function returns [`Ok`]. |
797 | /// | |
798 | /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err | |
799 | /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok | |
1a4d82fc JJ |
800 | /// |
801 | /// This method will never block the current thread. | |
802 | /// | |
c34b1796 | 803 | /// # Examples |
1a4d82fc JJ |
804 | /// |
805 | /// ``` | |
806 | /// use std::sync::mpsc::channel; | |
807 | /// | |
808 | /// let (tx, rx) = channel(); | |
809 | /// | |
810 | /// // This send is always successful | |
85aaf69f | 811 | /// tx.send(1).unwrap(); |
1a4d82fc JJ |
812 | /// |
813 | /// // This send will fail because the receiver is gone | |
814 | /// drop(rx); | |
a7813a04 | 815 | /// assert_eq!(tx.send(1).unwrap_err().0, 1); |
1a4d82fc | 816 | /// ``` |
85aaf69f | 817 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
818 | pub fn send(&self, t: T) -> Result<(), SendError<T>> { |
819 | let (new_inner, ret) = match *unsafe { self.inner() } { | |
820 | Flavor::Oneshot(ref p) => { | |
476ff2be SL |
821 | if !p.sent() { |
822 | return p.send(t).map_err(SendError); | |
823 | } else { | |
824 | let a = Arc::new(stream::Packet::new()); | |
825 | let rx = Receiver::new(Flavor::Stream(a.clone())); | |
826 | match p.upgrade(rx) { | |
827 | oneshot::UpSuccess => { | |
828 | let ret = a.send(t); | |
829 | (a, ret) | |
830 | } | |
831 | oneshot::UpDisconnected => (a, Err(t)), | |
832 | oneshot::UpWoke(token) => { | |
833 | // This send cannot panic because the thread is | |
834 | // asleep (we're looking at it), so the receiver | |
835 | // can't go away. | |
836 | a.send(t).ok().unwrap(); | |
837 | token.signal(); | |
838 | (a, Ok(())) | |
1a4d82fc JJ |
839 | } |
840 | } | |
841 | } | |
842 | } | |
476ff2be SL |
843 | Flavor::Stream(ref p) => return p.send(t).map_err(SendError), |
844 | Flavor::Shared(ref p) => return p.send(t).map_err(SendError), | |
1a4d82fc JJ |
845 | Flavor::Sync(..) => unreachable!(), |
846 | }; | |
847 | ||
848 | unsafe { | |
849 | let tmp = Sender::new(Flavor::Stream(new_inner)); | |
850 | mem::swap(self.inner_mut(), tmp.inner_mut()); | |
851 | } | |
852 | ret.map_err(SendError) | |
853 | } | |
854 | } | |
855 | ||
85aaf69f | 856 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 857 | impl<T> Clone for Sender<T> { |
1a4d82fc | 858 | fn clone(&self) -> Sender<T> { |
476ff2be | 859 | let packet = match *unsafe { self.inner() } { |
1a4d82fc | 860 | Flavor::Oneshot(ref p) => { |
476ff2be SL |
861 | let a = Arc::new(shared::Packet::new()); |
862 | { | |
863 | let guard = a.postinit_lock(); | |
1a4d82fc | 864 | let rx = Receiver::new(Flavor::Shared(a.clone())); |
476ff2be | 865 | let sleeper = match p.upgrade(rx) { |
1a4d82fc | 866 | oneshot::UpSuccess | |
476ff2be SL |
867 | oneshot::UpDisconnected => None, |
868 | oneshot::UpWoke(task) => Some(task), | |
869 | }; | |
870 | a.inherit_blocker(sleeper, guard); | |
1a4d82fc | 871 | } |
476ff2be | 872 | a |
1a4d82fc JJ |
873 | } |
874 | Flavor::Stream(ref p) => { | |
476ff2be SL |
875 | let a = Arc::new(shared::Packet::new()); |
876 | { | |
877 | let guard = a.postinit_lock(); | |
1a4d82fc | 878 | let rx = Receiver::new(Flavor::Shared(a.clone())); |
476ff2be | 879 | let sleeper = match p.upgrade(rx) { |
1a4d82fc | 880 | stream::UpSuccess | |
476ff2be SL |
881 | stream::UpDisconnected => None, |
882 | stream::UpWoke(task) => Some(task), | |
883 | }; | |
884 | a.inherit_blocker(sleeper, guard); | |
1a4d82fc | 885 | } |
476ff2be | 886 | a |
1a4d82fc JJ |
887 | } |
888 | Flavor::Shared(ref p) => { | |
476ff2be | 889 | p.clone_chan(); |
1a4d82fc JJ |
890 | return Sender::new(Flavor::Shared(p.clone())); |
891 | } | |
892 | Flavor::Sync(..) => unreachable!(), | |
893 | }; | |
894 | ||
895 | unsafe { | |
1a4d82fc JJ |
896 | let tmp = Sender::new(Flavor::Shared(packet.clone())); |
897 | mem::swap(self.inner_mut(), tmp.inner_mut()); | |
898 | } | |
899 | Sender::new(Flavor::Shared(packet)) | |
900 | } | |
901 | } | |
902 | ||
85aaf69f | 903 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 904 | impl<T> Drop for Sender<T> { |
1a4d82fc | 905 | fn drop(&mut self) { |
476ff2be SL |
906 | match *unsafe { self.inner() } { |
907 | Flavor::Oneshot(ref p) => p.drop_chan(), | |
908 | Flavor::Stream(ref p) => p.drop_chan(), | |
909 | Flavor::Shared(ref p) => p.drop_chan(), | |
1a4d82fc JJ |
910 | Flavor::Sync(..) => unreachable!(), |
911 | } | |
912 | } | |
913 | } | |
914 | ||
7cac9316 | 915 | #[stable(feature = "mpsc_debug", since = "1.8.0")] |
7453a54e SL |
916 | impl<T> fmt::Debug for Sender<T> { |
917 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
abe05a73 | 918 | f.debug_struct("Sender").finish() |
7453a54e SL |
919 | } |
920 | } | |
921 | ||
1a4d82fc JJ |
922 | //////////////////////////////////////////////////////////////////////////////// |
923 | // SyncSender | |
924 | //////////////////////////////////////////////////////////////////////////////// | |
925 | ||
c34b1796 | 926 | impl<T> SyncSender<T> { |
476ff2be | 927 | fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> { |
a1dfa0c6 | 928 | SyncSender { inner } |
1a4d82fc JJ |
929 | } |
930 | ||
931 | /// Sends a value on this synchronous channel. | |
932 | /// | |
933 | /// This function will *block* until space in the internal buffer becomes | |
934 | /// available or a receiver is available to hand off the message to. | |
935 | /// | |
936 | /// Note that a successful send does *not* guarantee that the receiver will | |
937 | /// ever see the data if there is a buffer on this channel. Items may be | |
938 | /// enqueued in the internal buffer for the receiver to receive at a later | |
7cac9316 XL |
939 | /// time. If the buffer size is 0, however, the channel becomes a rendezvous |
940 | /// channel and it guarantees that the receiver has indeed received | |
941 | /// the data if this function returns success. | |
1a4d82fc | 942 | /// |
cc61c64b XL |
943 | /// This function will never panic, but it may return [`Err`] if the |
944 | /// [`Receiver`] has disconnected and is no longer able to receive | |
1a4d82fc | 945 | /// information. |
cc61c64b XL |
946 | /// |
947 | /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err | |
948 | /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html | |
7cac9316 XL |
949 | /// |
950 | /// # Examples | |
951 | /// | |
952 | /// ```rust | |
953 | /// use std::sync::mpsc::sync_channel; | |
954 | /// use std::thread; | |
955 | /// | |
956 | /// // Create a rendezvous sync_channel with buffer size 0 | |
957 | /// let (sync_sender, receiver) = sync_channel(0); | |
958 | /// | |
959 | /// thread::spawn(move || { | |
960 | /// println!("sending message..."); | |
961 | /// sync_sender.send(1).unwrap(); | |
962 | /// // Thread is now blocked until the message is received | |
963 | /// | |
964 | /// println!("...message received!"); | |
965 | /// }); | |
966 | /// | |
967 | /// let msg = receiver.recv().unwrap(); | |
968 | /// assert_eq!(1, msg); | |
969 | /// ``` | |
85aaf69f | 970 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc | 971 | pub fn send(&self, t: T) -> Result<(), SendError<T>> { |
476ff2be | 972 | self.inner.send(t).map_err(SendError) |
1a4d82fc JJ |
973 | } |
974 | ||
975 | /// Attempts to send a value on this channel without blocking. | |
976 | /// | |
cc61c64b | 977 | /// This method differs from [`send`] by returning immediately if the |
1a4d82fc | 978 | /// channel's buffer is full or no receiver is waiting to acquire some |
cc61c64b | 979 | /// data. Compared with [`send`], this function has two failure cases |
1a4d82fc JJ |
980 | /// instead of one (one for disconnection, one for a full buffer). |
981 | /// | |
7cac9316 | 982 | /// See [`send`] for notes about guarantees of whether the |
1a4d82fc | 983 | /// receiver has received the data or not if this function is successful. |
cc61c64b | 984 | /// |
7cac9316 XL |
985 | /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send |
986 | /// | |
987 | /// # Examples | |
988 | /// | |
989 | /// ```rust | |
990 | /// use std::sync::mpsc::sync_channel; | |
991 | /// use std::thread; | |
992 | /// | |
993 | /// // Create a sync_channel with buffer size 1 | |
994 | /// let (sync_sender, receiver) = sync_channel(1); | |
995 | /// let sync_sender2 = sync_sender.clone(); | |
996 | /// | |
997 | /// // First thread owns sync_sender | |
998 | /// thread::spawn(move || { | |
999 | /// sync_sender.send(1).unwrap(); | |
1000 | /// sync_sender.send(2).unwrap(); | |
1001 | /// // Thread blocked | |
1002 | /// }); | |
1003 | /// | |
1004 | /// // Second thread owns sync_sender2 | |
1005 | /// thread::spawn(move || { | |
1006 | /// // This will return an error and send | |
1007 | /// // no message if the buffer is full | |
1008 | /// sync_sender2.try_send(3).is_err(); | |
1009 | /// }); | |
1010 | /// | |
1011 | /// let mut msg; | |
1012 | /// msg = receiver.recv().unwrap(); | |
1013 | /// println!("message {} received", msg); | |
1014 | /// | |
1015 | /// msg = receiver.recv().unwrap(); | |
1016 | /// println!("message {} received", msg); | |
1017 | /// | |
1018 | /// // Third message may have never been sent | |
1019 | /// match receiver.try_recv() { | |
1020 | /// Ok(msg) => println!("message {} received", msg), | |
1021 | /// Err(_) => println!("the third message was never sent"), | |
1022 | /// } | |
1023 | /// ``` | |
85aaf69f | 1024 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc | 1025 | pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { |
476ff2be | 1026 | self.inner.try_send(t) |
1a4d82fc JJ |
1027 | } |
1028 | } | |
1029 | ||
85aaf69f | 1030 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 1031 | impl<T> Clone for SyncSender<T> { |
1a4d82fc | 1032 | fn clone(&self) -> SyncSender<T> { |
476ff2be | 1033 | self.inner.clone_chan(); |
e9174d1e | 1034 | SyncSender::new(self.inner.clone()) |
1a4d82fc JJ |
1035 | } |
1036 | } | |
1037 | ||
85aaf69f | 1038 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 1039 | impl<T> Drop for SyncSender<T> { |
1a4d82fc | 1040 | fn drop(&mut self) { |
476ff2be | 1041 | self.inner.drop_chan(); |
1a4d82fc JJ |
1042 | } |
1043 | } | |
1044 | ||
7cac9316 | 1045 | #[stable(feature = "mpsc_debug", since = "1.8.0")] |
7453a54e SL |
1046 | impl<T> fmt::Debug for SyncSender<T> { |
1047 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
abe05a73 | 1048 | f.debug_struct("SyncSender").finish() |
7453a54e SL |
1049 | } |
1050 | } | |
1051 | ||
1a4d82fc JJ |
1052 | //////////////////////////////////////////////////////////////////////////////// |
1053 | // Receiver | |
1054 | //////////////////////////////////////////////////////////////////////////////// | |
1055 | ||
c34b1796 | 1056 | impl<T> Receiver<T> { |
1a4d82fc JJ |
1057 | fn new(inner: Flavor<T>) -> Receiver<T> { |
1058 | Receiver { inner: UnsafeCell::new(inner) } | |
1059 | } | |
1060 | ||
7cac9316 | 1061 | /// Attempts to return a pending value on this receiver without blocking. |
1a4d82fc JJ |
1062 | /// |
1063 | /// This method will never block the caller in order to wait for data to | |
1064 | /// become available. Instead, this will always return immediately with a | |
1065 | /// possible option of pending data on the channel. | |
1066 | /// | |
1067 | /// This is useful for a flavor of "optimistic check" before deciding to | |
1068 | /// block on a receiver. | |
7cac9316 XL |
1069 | /// |
1070 | /// Compared with [`recv`], this function has two failure cases instead of one | |
1071 | /// (one for disconnection, one for an empty buffer). | |
1072 | /// | |
1073 | /// [`recv`]: struct.Receiver.html#method.recv | |
1074 | /// | |
1075 | /// # Examples | |
1076 | /// | |
1077 | /// ```rust | |
1078 | /// use std::sync::mpsc::{Receiver, channel}; | |
1079 | /// | |
1080 | /// let (_, receiver): (_, Receiver<i32>) = channel(); | |
1081 | /// | |
1082 | /// assert!(receiver.try_recv().is_err()); | |
1083 | /// ``` | |
85aaf69f | 1084 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
1085 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
1086 | loop { | |
1087 | let new_port = match *unsafe { self.inner() } { | |
1088 | Flavor::Oneshot(ref p) => { | |
476ff2be | 1089 | match p.try_recv() { |
1a4d82fc JJ |
1090 | Ok(t) => return Ok(t), |
1091 | Err(oneshot::Empty) => return Err(TryRecvError::Empty), | |
1092 | Err(oneshot::Disconnected) => { | |
1093 | return Err(TryRecvError::Disconnected) | |
1094 | } | |
1095 | Err(oneshot::Upgraded(rx)) => rx, | |
1096 | } | |
1097 | } | |
1098 | Flavor::Stream(ref p) => { | |
476ff2be | 1099 | match p.try_recv() { |
1a4d82fc JJ |
1100 | Ok(t) => return Ok(t), |
1101 | Err(stream::Empty) => return Err(TryRecvError::Empty), | |
1102 | Err(stream::Disconnected) => { | |
1103 | return Err(TryRecvError::Disconnected) | |
1104 | } | |
1105 | Err(stream::Upgraded(rx)) => rx, | |
1106 | } | |
1107 | } | |
1108 | Flavor::Shared(ref p) => { | |
476ff2be | 1109 | match p.try_recv() { |
1a4d82fc JJ |
1110 | Ok(t) => return Ok(t), |
1111 | Err(shared::Empty) => return Err(TryRecvError::Empty), | |
1112 | Err(shared::Disconnected) => { | |
1113 | return Err(TryRecvError::Disconnected) | |
1114 | } | |
1115 | } | |
1116 | } | |
1117 | Flavor::Sync(ref p) => { | |
476ff2be | 1118 | match p.try_recv() { |
1a4d82fc JJ |
1119 | Ok(t) => return Ok(t), |
1120 | Err(sync::Empty) => return Err(TryRecvError::Empty), | |
1121 | Err(sync::Disconnected) => { | |
1122 | return Err(TryRecvError::Disconnected) | |
1123 | } | |
1124 | } | |
1125 | } | |
1126 | }; | |
1127 | unsafe { | |
1128 | mem::swap(self.inner_mut(), | |
1129 | new_port.inner_mut()); | |
1130 | } | |
1131 | } | |
1132 | } | |
1133 | ||
9346a6ac | 1134 | /// Attempts to wait for a value on this receiver, returning an error if the |
1a4d82fc JJ |
1135 | /// corresponding channel has hung up. |
1136 | /// | |
1137 | /// This function will always block the current thread if there is no data | |
1138 | /// available and it's possible for more data to be sent. Once a message is | |
7cac9316 XL |
1139 | /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this |
1140 | /// receiver will wake up and return that message. | |
1a4d82fc | 1141 | /// |
cc61c64b XL |
1142 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
1143 | /// this call is blocking, this call will wake up and return [`Err`] to | |
1a4d82fc | 1144 | /// indicate that no more messages can ever be received on this channel. |
c1a9b12d SL |
1145 | /// However, since channels are buffered, messages sent before the disconnect |
1146 | /// will still be properly received. | |
1147 | /// | |
7cac9316 XL |
1148 | /// [`Sender`]: struct.Sender.html |
1149 | /// [`SyncSender`]: struct.SyncSender.html | |
cc61c64b XL |
1150 | /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err |
1151 | /// | |
c1a9b12d SL |
1152 | /// # Examples |
1153 | /// | |
1154 | /// ``` | |
1155 | /// use std::sync::mpsc; | |
1156 | /// use std::thread; | |
1157 | /// | |
1158 | /// let (send, recv) = mpsc::channel(); | |
1159 | /// let handle = thread::spawn(move || { | |
1160 | /// send.send(1u8).unwrap(); | |
1161 | /// }); | |
1162 | /// | |
1163 | /// handle.join().unwrap(); | |
1164 | /// | |
1165 | /// assert_eq!(Ok(1), recv.recv()); | |
1166 | /// ``` | |
1167 | /// | |
1168 | /// Buffering behavior: | |
1169 | /// | |
1170 | /// ``` | |
1171 | /// use std::sync::mpsc; | |
1172 | /// use std::thread; | |
1173 | /// use std::sync::mpsc::RecvError; | |
1174 | /// | |
1175 | /// let (send, recv) = mpsc::channel(); | |
1176 | /// let handle = thread::spawn(move || { | |
1177 | /// send.send(1u8).unwrap(); | |
1178 | /// send.send(2).unwrap(); | |
1179 | /// send.send(3).unwrap(); | |
1180 | /// drop(send); | |
1181 | /// }); | |
1182 | /// | |
1183 | /// // wait for the thread to join so we ensure the sender is dropped | |
1184 | /// handle.join().unwrap(); | |
1185 | /// | |
1186 | /// assert_eq!(Ok(1), recv.recv()); | |
1187 | /// assert_eq!(Ok(2), recv.recv()); | |
1188 | /// assert_eq!(Ok(3), recv.recv()); | |
1189 | /// assert_eq!(Err(RecvError), recv.recv()); | |
1190 | /// ``` | |
85aaf69f | 1191 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
1192 | pub fn recv(&self) -> Result<T, RecvError> { |
1193 | loop { | |
1194 | let new_port = match *unsafe { self.inner() } { | |
1195 | Flavor::Oneshot(ref p) => { | |
476ff2be | 1196 | match p.recv(None) { |
1a4d82fc | 1197 | Ok(t) => return Ok(t), |
1a4d82fc JJ |
1198 | Err(oneshot::Disconnected) => return Err(RecvError), |
1199 | Err(oneshot::Upgraded(rx)) => rx, | |
3157f602 | 1200 | Err(oneshot::Empty) => unreachable!(), |
1a4d82fc JJ |
1201 | } |
1202 | } | |
1203 | Flavor::Stream(ref p) => { | |
476ff2be | 1204 | match p.recv(None) { |
1a4d82fc | 1205 | Ok(t) => return Ok(t), |
1a4d82fc JJ |
1206 | Err(stream::Disconnected) => return Err(RecvError), |
1207 | Err(stream::Upgraded(rx)) => rx, | |
3157f602 | 1208 | Err(stream::Empty) => unreachable!(), |
1a4d82fc JJ |
1209 | } |
1210 | } | |
1211 | Flavor::Shared(ref p) => { | |
476ff2be | 1212 | match p.recv(None) { |
1a4d82fc | 1213 | Ok(t) => return Ok(t), |
1a4d82fc | 1214 | Err(shared::Disconnected) => return Err(RecvError), |
3157f602 | 1215 | Err(shared::Empty) => unreachable!(), |
1a4d82fc JJ |
1216 | } |
1217 | } | |
476ff2be | 1218 | Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError), |
1a4d82fc JJ |
1219 | }; |
1220 | unsafe { | |
1221 | mem::swap(self.inner_mut(), new_port.inner_mut()); | |
1222 | } | |
1223 | } | |
1224 | } | |
1225 | ||
3157f602 XL |
1226 | /// Attempts to wait for a value on this receiver, returning an error if the |
1227 | /// corresponding channel has hung up, or if it waits more than `timeout`. | |
1228 | /// | |
1229 | /// This function will always block the current thread if there is no data | |
1230 | /// available and it's possible for more data to be sent. Once a message is | |
7cac9316 XL |
1231 | /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this |
1232 | /// receiver will wake up and return that message. | |
3157f602 | 1233 | /// |
cc61c64b XL |
1234 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
1235 | /// this call is blocking, this call will wake up and return [`Err`] to | |
3157f602 XL |
1236 | /// indicate that no more messages can ever be received on this channel. |
1237 | /// However, since channels are buffered, messages sent before the disconnect | |
1238 | /// will still be properly received. | |
1239 | /// | |
7cac9316 XL |
1240 | /// [`Sender`]: struct.Sender.html |
1241 | /// [`SyncSender`]: struct.SyncSender.html | |
cc61c64b XL |
1242 | /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err |
1243 | /// | |
b7449926 XL |
1244 | /// # Known Issues |
1245 | /// | |
1246 | /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout` | |
1247 | /// to panic unexpectedly with the following example: | |
1248 | /// | |
1249 | /// ```no_run | |
1250 | /// use std::sync::mpsc::channel; | |
1251 | /// use std::thread; | |
1252 | /// use std::time::Duration; | |
1253 | /// | |
1254 | /// let (tx, rx) = channel::<String>(); | |
1255 | /// | |
1256 | /// thread::spawn(move || { | |
1257 | /// let d = Duration::from_millis(10); | |
1258 | /// loop { | |
1259 | /// println!("recv"); | |
1260 | /// let _r = rx.recv_timeout(d); | |
1261 | /// } | |
1262 | /// }); | |
1263 | /// | |
1264 | /// thread::sleep(Duration::from_millis(100)); | |
1265 | /// let _c1 = tx.clone(); | |
1266 | /// | |
1267 | /// thread::sleep(Duration::from_secs(1)); | |
1268 | /// ``` | |
1269 | /// | |
1270 | /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364 | |
1271 | /// | |
3157f602 XL |
1272 | /// # Examples |
1273 | /// | |
7cac9316 XL |
1274 | /// Successfully receiving value before encountering timeout: |
1275 | /// | |
3157f602 | 1276 | /// ```no_run |
7cac9316 XL |
1277 | /// use std::thread; |
1278 | /// use std::time::Duration; | |
1279 | /// use std::sync::mpsc; | |
1280 | /// | |
1281 | /// let (send, recv) = mpsc::channel(); | |
1282 | /// | |
1283 | /// thread::spawn(move || { | |
1284 | /// send.send('a').unwrap(); | |
1285 | /// }); | |
1286 | /// | |
1287 | /// assert_eq!( | |
1288 | /// recv.recv_timeout(Duration::from_millis(400)), | |
1289 | /// Ok('a') | |
1290 | /// ); | |
1291 | /// ``` | |
1292 | /// | |
1293 | /// Receiving an error upon reaching timeout: | |
1294 | /// | |
1295 | /// ```no_run | |
1296 | /// use std::thread; | |
3157f602 | 1297 | /// use std::time::Duration; |
7cac9316 | 1298 | /// use std::sync::mpsc; |
3157f602 | 1299 | /// |
7cac9316 | 1300 | /// let (send, recv) = mpsc::channel(); |
3157f602 | 1301 | /// |
7cac9316 XL |
1302 | /// thread::spawn(move || { |
1303 | /// thread::sleep(Duration::from_millis(800)); | |
1304 | /// send.send('a').unwrap(); | |
1305 | /// }); | |
1306 | /// | |
1307 | /// assert_eq!( | |
1308 | /// recv.recv_timeout(Duration::from_millis(400)), | |
1309 | /// Err(mpsc::RecvTimeoutError::Timeout) | |
1310 | /// ); | |
3157f602 | 1311 | /// ``` |
5bcae85e | 1312 | #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] |
3157f602 XL |
1313 | pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { |
1314 | // Do an optimistic try_recv to avoid the performance impact of | |
1315 | // Instant::now() in the full-channel case. | |
1316 | match self.try_recv() { | |
0731742a XL |
1317 | Ok(result) => Ok(result), |
1318 | Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected), | |
1319 | Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) { | |
1320 | Some(deadline) => self.recv_deadline(deadline), | |
1321 | // So far in the future that it's practically the same as waiting indefinitely. | |
1322 | None => self.recv().map_err(RecvTimeoutError::from), | |
1323 | }, | |
3157f602 XL |
1324 | } |
1325 | } | |
1326 | ||
ff7c6d11 XL |
1327 | /// Attempts to wait for a value on this receiver, returning an error if the |
1328 | /// corresponding channel has hung up, or if `deadline` is reached. | |
1329 | /// | |
1330 | /// This function will always block the current thread if there is no data | |
1331 | /// available and it's possible for more data to be sent. Once a message is | |
1332 | /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this | |
1333 | /// receiver will wake up and return that message. | |
1334 | /// | |
1335 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while | |
1336 | /// this call is blocking, this call will wake up and return [`Err`] to | |
1337 | /// indicate that no more messages can ever be received on this channel. | |
1338 | /// However, since channels are buffered, messages sent before the disconnect | |
1339 | /// will still be properly received. | |
1340 | /// | |
1341 | /// [`Sender`]: struct.Sender.html | |
1342 | /// [`SyncSender`]: struct.SyncSender.html | |
1343 | /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err | |
1344 | /// | |
1345 | /// # Examples | |
1346 | /// | |
1347 | /// Successfully receiving value before reaching deadline: | |
1348 | /// | |
1349 | /// ```no_run | |
1350 | /// #![feature(deadline_api)] | |
1351 | /// use std::thread; | |
1352 | /// use std::time::{Duration, Instant}; | |
1353 | /// use std::sync::mpsc; | |
1354 | /// | |
1355 | /// let (send, recv) = mpsc::channel(); | |
1356 | /// | |
1357 | /// thread::spawn(move || { | |
1358 | /// send.send('a').unwrap(); | |
1359 | /// }); | |
1360 | /// | |
1361 | /// assert_eq!( | |
1362 | /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), | |
1363 | /// Ok('a') | |
1364 | /// ); | |
1365 | /// ``` | |
1366 | /// | |
1367 | /// Receiving an error upon reaching deadline: | |
1368 | /// | |
1369 | /// ```no_run | |
1370 | /// #![feature(deadline_api)] | |
1371 | /// use std::thread; | |
1372 | /// use std::time::{Duration, Instant}; | |
1373 | /// use std::sync::mpsc; | |
1374 | /// | |
1375 | /// let (send, recv) = mpsc::channel(); | |
1376 | /// | |
1377 | /// thread::spawn(move || { | |
1378 | /// thread::sleep(Duration::from_millis(800)); | |
1379 | /// send.send('a').unwrap(); | |
1380 | /// }); | |
1381 | /// | |
1382 | /// assert_eq!( | |
1383 | /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), | |
1384 | /// Err(mpsc::RecvTimeoutError::Timeout) | |
1385 | /// ); | |
1386 | /// ``` | |
1387 | #[unstable(feature = "deadline_api", issue = "46316")] | |
1388 | pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { | |
3157f602 XL |
1389 | use self::RecvTimeoutError::*; |
1390 | ||
1391 | loop { | |
1392 | let port_or_empty = match *unsafe { self.inner() } { | |
1393 | Flavor::Oneshot(ref p) => { | |
476ff2be | 1394 | match p.recv(Some(deadline)) { |
3157f602 XL |
1395 | Ok(t) => return Ok(t), |
1396 | Err(oneshot::Disconnected) => return Err(Disconnected), | |
1397 | Err(oneshot::Upgraded(rx)) => Some(rx), | |
1398 | Err(oneshot::Empty) => None, | |
1399 | } | |
1400 | } | |
1401 | Flavor::Stream(ref p) => { | |
476ff2be | 1402 | match p.recv(Some(deadline)) { |
3157f602 XL |
1403 | Ok(t) => return Ok(t), |
1404 | Err(stream::Disconnected) => return Err(Disconnected), | |
1405 | Err(stream::Upgraded(rx)) => Some(rx), | |
1406 | Err(stream::Empty) => None, | |
1407 | } | |
1408 | } | |
1409 | Flavor::Shared(ref p) => { | |
476ff2be | 1410 | match p.recv(Some(deadline)) { |
3157f602 XL |
1411 | Ok(t) => return Ok(t), |
1412 | Err(shared::Disconnected) => return Err(Disconnected), | |
1413 | Err(shared::Empty) => None, | |
1414 | } | |
1415 | } | |
1416 | Flavor::Sync(ref p) => { | |
476ff2be | 1417 | match p.recv(Some(deadline)) { |
3157f602 XL |
1418 | Ok(t) => return Ok(t), |
1419 | Err(sync::Disconnected) => return Err(Disconnected), | |
1420 | Err(sync::Empty) => None, | |
1421 | } | |
1422 | } | |
1423 | }; | |
1424 | ||
1425 | if let Some(new_port) = port_or_empty { | |
1426 | unsafe { | |
1427 | mem::swap(self.inner_mut(), new_port.inner_mut()); | |
1428 | } | |
1429 | } | |
1430 | ||
1431 | // If we're already passed the deadline, and we're here without | |
1432 | // data, return a timeout, else try again. | |
1433 | if Instant::now() >= deadline { | |
1434 | return Err(Timeout); | |
1435 | } | |
1436 | } | |
1437 | } | |
1438 | ||
1a4d82fc | 1439 | /// Returns an iterator that will block waiting for messages, but never |
cc61c64b XL |
1440 | /// [`panic!`]. It will return [`None`] when the channel has hung up. |
1441 | /// | |
1442 | /// [`panic!`]: ../../../std/macro.panic.html | |
1443 | /// [`None`]: ../../../std/option/enum.Option.html#variant.None | |
1444 | /// | |
1445 | /// # Examples | |
1446 | /// | |
1447 | /// ```rust | |
1448 | /// use std::sync::mpsc::channel; | |
1449 | /// use std::thread; | |
1450 | /// | |
1451 | /// let (send, recv) = channel(); | |
1452 | /// | |
1453 | /// thread::spawn(move || { | |
7cac9316 XL |
1454 | /// send.send(1).unwrap(); |
1455 | /// send.send(2).unwrap(); | |
1456 | /// send.send(3).unwrap(); | |
cc61c64b XL |
1457 | /// }); |
1458 | /// | |
7cac9316 XL |
1459 | /// let mut iter = recv.iter(); |
1460 | /// assert_eq!(iter.next(), Some(1)); | |
1461 | /// assert_eq!(iter.next(), Some(2)); | |
1462 | /// assert_eq!(iter.next(), Some(3)); | |
1463 | /// assert_eq!(iter.next(), None); | |
cc61c64b | 1464 | /// ``` |
85aaf69f | 1465 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
1466 | pub fn iter(&self) -> Iter<T> { |
1467 | Iter { rx: self } | |
1468 | } | |
5bcae85e SL |
1469 | |
1470 | /// Returns an iterator that will attempt to yield all pending values. | |
1471 | /// It will return `None` if there are no more pending values or if the | |
cc61c64b | 1472 | /// channel has hung up. The iterator will never [`panic!`] or block the |
5bcae85e | 1473 | /// user by waiting for values. |
cc61c64b XL |
1474 | /// |
1475 | /// [`panic!`]: ../../../std/macro.panic.html | |
7cac9316 XL |
1476 | /// |
1477 | /// # Examples | |
1478 | /// | |
1479 | /// ```no_run | |
1480 | /// use std::sync::mpsc::channel; | |
1481 | /// use std::thread; | |
1482 | /// use std::time::Duration; | |
1483 | /// | |
1484 | /// let (sender, receiver) = channel(); | |
1485 | /// | |
1486 | /// // nothing is in the buffer yet | |
1487 | /// assert!(receiver.try_iter().next().is_none()); | |
1488 | /// | |
1489 | /// thread::spawn(move || { | |
1490 | /// thread::sleep(Duration::from_secs(1)); | |
1491 | /// sender.send(1).unwrap(); | |
1492 | /// sender.send(2).unwrap(); | |
1493 | /// sender.send(3).unwrap(); | |
1494 | /// }); | |
1495 | /// | |
1496 | /// // nothing is in the buffer yet | |
1497 | /// assert!(receiver.try_iter().next().is_none()); | |
1498 | /// | |
1499 | /// // block for two seconds | |
1500 | /// thread::sleep(Duration::from_secs(2)); | |
1501 | /// | |
1502 | /// let mut iter = receiver.try_iter(); | |
1503 | /// assert_eq!(iter.next(), Some(1)); | |
1504 | /// assert_eq!(iter.next(), Some(2)); | |
1505 | /// assert_eq!(iter.next(), Some(3)); | |
1506 | /// assert_eq!(iter.next(), None); | |
1507 | /// ``` | |
476ff2be | 1508 | #[stable(feature = "receiver_try_iter", since = "1.15.0")] |
5bcae85e SL |
1509 | pub fn try_iter(&self) -> TryIter<T> { |
1510 | TryIter { rx: self } | |
1511 | } | |
1512 | ||
1a4d82fc JJ |
1513 | } |
1514 | ||
c34b1796 | 1515 | impl<T> select::Packet for Receiver<T> { |
1a4d82fc JJ |
1516 | fn can_recv(&self) -> bool { |
1517 | loop { | |
1518 | let new_port = match *unsafe { self.inner() } { | |
1519 | Flavor::Oneshot(ref p) => { | |
476ff2be | 1520 | match p.can_recv() { |
1a4d82fc JJ |
1521 | Ok(ret) => return ret, |
1522 | Err(upgrade) => upgrade, | |
1523 | } | |
1524 | } | |
1525 | Flavor::Stream(ref p) => { | |
476ff2be | 1526 | match p.can_recv() { |
1a4d82fc JJ |
1527 | Ok(ret) => return ret, |
1528 | Err(upgrade) => upgrade, | |
1529 | } | |
1530 | } | |
476ff2be SL |
1531 | Flavor::Shared(ref p) => return p.can_recv(), |
1532 | Flavor::Sync(ref p) => return p.can_recv(), | |
1a4d82fc JJ |
1533 | }; |
1534 | unsafe { | |
1535 | mem::swap(self.inner_mut(), | |
1536 | new_port.inner_mut()); | |
1537 | } | |
1538 | } | |
1539 | } | |
1540 | ||
1541 | fn start_selection(&self, mut token: SignalToken) -> StartResult { | |
1542 | loop { | |
1543 | let (t, new_port) = match *unsafe { self.inner() } { | |
1544 | Flavor::Oneshot(ref p) => { | |
476ff2be | 1545 | match p.start_selection(token) { |
1a4d82fc JJ |
1546 | oneshot::SelSuccess => return Installed, |
1547 | oneshot::SelCanceled => return Abort, | |
1548 | oneshot::SelUpgraded(t, rx) => (t, rx), | |
1549 | } | |
1550 | } | |
1551 | Flavor::Stream(ref p) => { | |
476ff2be | 1552 | match p.start_selection(token) { |
1a4d82fc JJ |
1553 | stream::SelSuccess => return Installed, |
1554 | stream::SelCanceled => return Abort, | |
1555 | stream::SelUpgraded(t, rx) => (t, rx), | |
1556 | } | |
1557 | } | |
476ff2be SL |
1558 | Flavor::Shared(ref p) => return p.start_selection(token), |
1559 | Flavor::Sync(ref p) => return p.start_selection(token), | |
1a4d82fc JJ |
1560 | }; |
1561 | token = t; | |
1562 | unsafe { | |
1563 | mem::swap(self.inner_mut(), new_port.inner_mut()); | |
1564 | } | |
1565 | } | |
1566 | } | |
1567 | ||
1568 | fn abort_selection(&self) -> bool { | |
1569 | let mut was_upgrade = false; | |
1570 | loop { | |
1571 | let result = match *unsafe { self.inner() } { | |
476ff2be SL |
1572 | Flavor::Oneshot(ref p) => p.abort_selection(), |
1573 | Flavor::Stream(ref p) => p.abort_selection(was_upgrade), | |
1574 | Flavor::Shared(ref p) => return p.abort_selection(was_upgrade), | |
1575 | Flavor::Sync(ref p) => return p.abort_selection(), | |
1a4d82fc JJ |
1576 | }; |
1577 | let new_port = match result { Ok(b) => return b, Err(p) => p }; | |
1578 | was_upgrade = true; | |
1579 | unsafe { | |
1580 | mem::swap(self.inner_mut(), | |
1581 | new_port.inner_mut()); | |
1582 | } | |
1583 | } | |
1584 | } | |
1585 | } | |
1586 | ||
85aaf69f | 1587 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 1588 | impl<'a, T> Iterator for Iter<'a, T> { |
1a4d82fc JJ |
1589 | type Item = T; |
1590 | ||
1591 | fn next(&mut self) -> Option<T> { self.rx.recv().ok() } | |
1592 | } | |
1593 | ||
476ff2be | 1594 | #[stable(feature = "receiver_try_iter", since = "1.15.0")] |
5bcae85e SL |
1595 | impl<'a, T> Iterator for TryIter<'a, T> { |
1596 | type Item = T; | |
1597 | ||
1598 | fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() } | |
1599 | } | |
1600 | ||
d9579d0f AL |
1601 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] |
1602 | impl<'a, T> IntoIterator for &'a Receiver<T> { | |
1603 | type Item = T; | |
1604 | type IntoIter = Iter<'a, T>; | |
1605 | ||
1606 | fn into_iter(self) -> Iter<'a, T> { self.iter() } | |
1607 | } | |
1608 | ||
92a42be0 | 1609 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] |
d9579d0f AL |
1610 | impl<T> Iterator for IntoIter<T> { |
1611 | type Item = T; | |
1612 | fn next(&mut self) -> Option<T> { self.rx.recv().ok() } | |
1613 | } | |
1614 | ||
1615 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] | |
1616 | impl <T> IntoIterator for Receiver<T> { | |
1617 | type Item = T; | |
1618 | type IntoIter = IntoIter<T>; | |
1619 | ||
1620 | fn into_iter(self) -> IntoIter<T> { | |
1621 | IntoIter { rx: self } | |
1622 | } | |
1623 | } | |
1624 | ||
85aaf69f | 1625 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 1626 | impl<T> Drop for Receiver<T> { |
1a4d82fc | 1627 | fn drop(&mut self) { |
476ff2be SL |
1628 | match *unsafe { self.inner() } { |
1629 | Flavor::Oneshot(ref p) => p.drop_port(), | |
1630 | Flavor::Stream(ref p) => p.drop_port(), | |
1631 | Flavor::Shared(ref p) => p.drop_port(), | |
1632 | Flavor::Sync(ref p) => p.drop_port(), | |
1a4d82fc JJ |
1633 | } |
1634 | } | |
1635 | } | |
1636 | ||
7cac9316 | 1637 | #[stable(feature = "mpsc_debug", since = "1.8.0")] |
7453a54e SL |
1638 | impl<T> fmt::Debug for Receiver<T> { |
1639 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
abe05a73 | 1640 | f.debug_struct("Receiver").finish() |
7453a54e SL |
1641 | } |
1642 | } | |
1643 | ||
85aaf69f SL |
1644 | #[stable(feature = "rust1", since = "1.0.0")] |
1645 | impl<T> fmt::Debug for SendError<T> { | |
1646 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
1647 | "SendError(..)".fmt(f) | |
1a4d82fc | 1648 | } |
85aaf69f | 1649 | } |
1a4d82fc | 1650 | |
85aaf69f SL |
1651 | #[stable(feature = "rust1", since = "1.0.0")] |
1652 | impl<T> fmt::Display for SendError<T> { | |
1653 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
1654 | "sending on a closed channel".fmt(f) | |
1a4d82fc | 1655 | } |
1a4d82fc JJ |
1656 | } |
1657 | ||
c34b1796 | 1658 | #[stable(feature = "rust1", since = "1.0.0")] |
9e0c209e | 1659 | impl<T: Send> error::Error for SendError<T> { |
c34b1796 AL |
1660 | fn description(&self) -> &str { |
1661 | "sending on a closed channel" | |
1662 | } | |
1663 | ||
8faf50e0 | 1664 | fn cause(&self) -> Option<&dyn error::Error> { |
c34b1796 AL |
1665 | None |
1666 | } | |
1667 | } | |
1668 | ||
85aaf69f SL |
1669 | #[stable(feature = "rust1", since = "1.0.0")] |
1670 | impl<T> fmt::Debug for TrySendError<T> { | |
1a4d82fc | 1671 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
85aaf69f SL |
1672 | match *self { |
1673 | TrySendError::Full(..) => "Full(..)".fmt(f), | |
1674 | TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), | |
1675 | } | |
1a4d82fc JJ |
1676 | } |
1677 | } | |
1678 | ||
85aaf69f SL |
1679 | #[stable(feature = "rust1", since = "1.0.0")] |
1680 | impl<T> fmt::Display for TrySendError<T> { | |
1a4d82fc JJ |
1681 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1682 | match *self { | |
1683 | TrySendError::Full(..) => { | |
1684 | "sending on a full channel".fmt(f) | |
1685 | } | |
1686 | TrySendError::Disconnected(..) => { | |
1687 | "sending on a closed channel".fmt(f) | |
1688 | } | |
1689 | } | |
1690 | } | |
1691 | } | |
1692 | ||
c34b1796 | 1693 | #[stable(feature = "rust1", since = "1.0.0")] |
9e0c209e | 1694 | impl<T: Send> error::Error for TrySendError<T> { |
c34b1796 AL |
1695 | |
1696 | fn description(&self) -> &str { | |
1697 | match *self { | |
1698 | TrySendError::Full(..) => { | |
1699 | "sending on a full channel" | |
1700 | } | |
1701 | TrySendError::Disconnected(..) => { | |
1702 | "sending on a closed channel" | |
1703 | } | |
1704 | } | |
1705 | } | |
1706 | ||
8faf50e0 | 1707 | fn cause(&self) -> Option<&dyn error::Error> { |
c34b1796 AL |
1708 | None |
1709 | } | |
1710 | } | |
1711 | ||
ff7c6d11 XL |
1712 | #[stable(feature = "mpsc_error_conversions", since = "1.24.0")] |
1713 | impl<T> From<SendError<T>> for TrySendError<T> { | |
1714 | fn from(err: SendError<T>) -> TrySendError<T> { | |
1715 | match err { | |
1716 | SendError(t) => TrySendError::Disconnected(t), | |
1717 | } | |
1718 | } | |
1719 | } | |
1720 | ||
85aaf69f SL |
1721 | #[stable(feature = "rust1", since = "1.0.0")] |
1722 | impl fmt::Display for RecvError { | |
1a4d82fc JJ |
1723 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1724 | "receiving on a closed channel".fmt(f) | |
1725 | } | |
1726 | } | |
1727 | ||
c34b1796 AL |
1728 | #[stable(feature = "rust1", since = "1.0.0")] |
1729 | impl error::Error for RecvError { | |
1730 | ||
1731 | fn description(&self) -> &str { | |
1732 | "receiving on a closed channel" | |
1733 | } | |
1734 | ||
8faf50e0 | 1735 | fn cause(&self) -> Option<&dyn error::Error> { |
c34b1796 AL |
1736 | None |
1737 | } | |
1738 | } | |
1739 | ||
85aaf69f SL |
1740 | #[stable(feature = "rust1", since = "1.0.0")] |
1741 | impl fmt::Display for TryRecvError { | |
1a4d82fc JJ |
1742 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1743 | match *self { | |
1744 | TryRecvError::Empty => { | |
1745 | "receiving on an empty channel".fmt(f) | |
1746 | } | |
1747 | TryRecvError::Disconnected => { | |
1748 | "receiving on a closed channel".fmt(f) | |
1749 | } | |
1750 | } | |
1751 | } | |
1752 | } | |
1753 | ||
c34b1796 AL |
1754 | #[stable(feature = "rust1", since = "1.0.0")] |
1755 | impl error::Error for TryRecvError { | |
1756 | ||
1757 | fn description(&self) -> &str { | |
1758 | match *self { | |
1759 | TryRecvError::Empty => { | |
1760 | "receiving on an empty channel" | |
1761 | } | |
1762 | TryRecvError::Disconnected => { | |
1763 | "receiving on a closed channel" | |
1764 | } | |
1765 | } | |
1766 | } | |
1767 | ||
8faf50e0 | 1768 | fn cause(&self) -> Option<&dyn error::Error> { |
c34b1796 AL |
1769 | None |
1770 | } | |
1771 | } | |
1772 | ||
ff7c6d11 XL |
1773 | #[stable(feature = "mpsc_error_conversions", since = "1.24.0")] |
1774 | impl From<RecvError> for TryRecvError { | |
1775 | fn from(err: RecvError) -> TryRecvError { | |
1776 | match err { | |
1777 | RecvError => TryRecvError::Disconnected, | |
1778 | } | |
1779 | } | |
1780 | } | |
1781 | ||
7cac9316 | 1782 | #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] |
476ff2be SL |
1783 | impl fmt::Display for RecvTimeoutError { |
1784 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
1785 | match *self { | |
1786 | RecvTimeoutError::Timeout => { | |
1787 | "timed out waiting on channel".fmt(f) | |
1788 | } | |
1789 | RecvTimeoutError::Disconnected => { | |
1790 | "channel is empty and sending half is closed".fmt(f) | |
1791 | } | |
1792 | } | |
1793 | } | |
1794 | } | |
1795 | ||
7cac9316 | 1796 | #[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] |
476ff2be SL |
1797 | impl error::Error for RecvTimeoutError { |
1798 | fn description(&self) -> &str { | |
1799 | match *self { | |
1800 | RecvTimeoutError::Timeout => { | |
1801 | "timed out waiting on channel" | |
1802 | } | |
1803 | RecvTimeoutError::Disconnected => { | |
1804 | "channel is empty and sending half is closed" | |
1805 | } | |
1806 | } | |
1807 | } | |
1808 | ||
8faf50e0 | 1809 | fn cause(&self) -> Option<&dyn error::Error> { |
476ff2be SL |
1810 | None |
1811 | } | |
1812 | } | |
1813 | ||
ff7c6d11 XL |
1814 | #[stable(feature = "mpsc_error_conversions", since = "1.24.0")] |
1815 | impl From<RecvError> for RecvTimeoutError { | |
1816 | fn from(err: RecvError) -> RecvTimeoutError { | |
1817 | match err { | |
1818 | RecvError => RecvTimeoutError::Disconnected, | |
1819 | } | |
1820 | } | |
1821 | } | |
1822 | ||
c30ab7b3 | 1823 | #[cfg(all(test, not(target_os = "emscripten")))] |
d9579d0f | 1824 | mod tests { |
c1a9b12d | 1825 | use env; |
1a4d82fc | 1826 | use super::*; |
85aaf69f | 1827 | use thread; |
3157f602 | 1828 | use time::{Duration, Instant}; |
1a4d82fc | 1829 | |
c34b1796 | 1830 | pub fn stress_factor() -> usize { |
85aaf69f SL |
1831 | match env::var("RUST_TEST_STRESS") { |
1832 | Ok(val) => val.parse().unwrap(), | |
1833 | Err(..) => 1, | |
1a4d82fc JJ |
1834 | } |
1835 | } | |
1836 | ||
1837 | #[test] | |
1838 | fn smoke() { | |
c34b1796 | 1839 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1840 | tx.send(1).unwrap(); |
1841 | assert_eq!(rx.recv().unwrap(), 1); | |
1842 | } | |
1843 | ||
1844 | #[test] | |
1845 | fn drop_full() { | |
c34b1796 | 1846 | let (tx, _rx) = channel::<Box<isize>>(); |
85aaf69f | 1847 | tx.send(box 1).unwrap(); |
1a4d82fc JJ |
1848 | } |
1849 | ||
1850 | #[test] | |
1851 | fn drop_full_shared() { | |
c34b1796 | 1852 | let (tx, _rx) = channel::<Box<isize>>(); |
1a4d82fc JJ |
1853 | drop(tx.clone()); |
1854 | drop(tx.clone()); | |
85aaf69f | 1855 | tx.send(box 1).unwrap(); |
1a4d82fc JJ |
1856 | } |
1857 | ||
1858 | #[test] | |
1859 | fn smoke_shared() { | |
c34b1796 | 1860 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1861 | tx.send(1).unwrap(); |
1862 | assert_eq!(rx.recv().unwrap(), 1); | |
1863 | let tx = tx.clone(); | |
1864 | tx.send(1).unwrap(); | |
1865 | assert_eq!(rx.recv().unwrap(), 1); | |
1866 | } | |
1867 | ||
1868 | #[test] | |
1869 | fn smoke_threads() { | |
c34b1796 | 1870 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 1871 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1872 | tx.send(1).unwrap(); |
1873 | }); | |
1874 | assert_eq!(rx.recv().unwrap(), 1); | |
1875 | } | |
1876 | ||
1877 | #[test] | |
1878 | fn smoke_port_gone() { | |
c34b1796 | 1879 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1880 | drop(rx); |
1881 | assert!(tx.send(1).is_err()); | |
1882 | } | |
1883 | ||
1884 | #[test] | |
1885 | fn smoke_shared_port_gone() { | |
c34b1796 | 1886 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1887 | drop(rx); |
1888 | assert!(tx.send(1).is_err()) | |
1889 | } | |
1890 | ||
1891 | #[test] | |
1892 | fn smoke_shared_port_gone2() { | |
c34b1796 | 1893 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1894 | drop(rx); |
1895 | let tx2 = tx.clone(); | |
1896 | drop(tx); | |
1897 | assert!(tx2.send(1).is_err()); | |
1898 | } | |
1899 | ||
1900 | #[test] | |
1901 | fn port_gone_concurrent() { | |
c34b1796 | 1902 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 1903 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1904 | rx.recv().unwrap(); |
1905 | }); | |
1906 | while tx.send(1).is_ok() {} | |
1907 | } | |
1908 | ||
1909 | #[test] | |
1910 | fn port_gone_concurrent_shared() { | |
c34b1796 | 1911 | let (tx, rx) = channel::<i32>(); |
1a4d82fc | 1912 | let tx2 = tx.clone(); |
85aaf69f | 1913 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1914 | rx.recv().unwrap(); |
1915 | }); | |
1916 | while tx.send(1).is_ok() && tx2.send(1).is_ok() {} | |
1917 | } | |
1918 | ||
1919 | #[test] | |
1920 | fn smoke_chan_gone() { | |
c34b1796 | 1921 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1922 | drop(tx); |
1923 | assert!(rx.recv().is_err()); | |
1924 | } | |
1925 | ||
1926 | #[test] | |
1927 | fn smoke_chan_gone_shared() { | |
1928 | let (tx, rx) = channel::<()>(); | |
1929 | let tx2 = tx.clone(); | |
1930 | drop(tx); | |
1931 | drop(tx2); | |
1932 | assert!(rx.recv().is_err()); | |
1933 | } | |
1934 | ||
1935 | #[test] | |
1936 | fn chan_gone_concurrent() { | |
c34b1796 | 1937 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 1938 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1939 | tx.send(1).unwrap(); |
1940 | tx.send(1).unwrap(); | |
1941 | }); | |
1942 | while rx.recv().is_ok() {} | |
1943 | } | |
1944 | ||
1945 | #[test] | |
1946 | fn stress() { | |
c34b1796 | 1947 | let (tx, rx) = channel::<i32>(); |
85aaf69f SL |
1948 | let t = thread::spawn(move|| { |
1949 | for _ in 0..10000 { tx.send(1).unwrap(); } | |
1a4d82fc | 1950 | }); |
85aaf69f | 1951 | for _ in 0..10000 { |
1a4d82fc JJ |
1952 | assert_eq!(rx.recv().unwrap(), 1); |
1953 | } | |
1954 | t.join().ok().unwrap(); | |
1955 | } | |
1956 | ||
1957 | #[test] | |
1958 | fn stress_shared() { | |
c34b1796 AL |
1959 | const AMT: u32 = 10000; |
1960 | const NTHREADS: u32 = 8; | |
1961 | let (tx, rx) = channel::<i32>(); | |
1a4d82fc | 1962 | |
85aaf69f SL |
1963 | let t = thread::spawn(move|| { |
1964 | for _ in 0..AMT * NTHREADS { | |
1a4d82fc JJ |
1965 | assert_eq!(rx.recv().unwrap(), 1); |
1966 | } | |
1967 | match rx.try_recv() { | |
1968 | Ok(..) => panic!(), | |
1969 | _ => {} | |
1970 | } | |
1971 | }); | |
1972 | ||
85aaf69f | 1973 | for _ in 0..NTHREADS { |
1a4d82fc | 1974 | let tx = tx.clone(); |
85aaf69f SL |
1975 | thread::spawn(move|| { |
1976 | for _ in 0..AMT { tx.send(1).unwrap(); } | |
1a4d82fc JJ |
1977 | }); |
1978 | } | |
1979 | drop(tx); | |
1980 | t.join().ok().unwrap(); | |
1981 | } | |
1982 | ||
1983 | #[test] | |
1984 | fn send_from_outside_runtime() { | |
1985 | let (tx1, rx1) = channel::<()>(); | |
c34b1796 | 1986 | let (tx2, rx2) = channel::<i32>(); |
85aaf69f | 1987 | let t1 = thread::spawn(move|| { |
1a4d82fc | 1988 | tx1.send(()).unwrap(); |
85aaf69f | 1989 | for _ in 0..40 { |
1a4d82fc JJ |
1990 | assert_eq!(rx2.recv().unwrap(), 1); |
1991 | } | |
1992 | }); | |
1993 | rx1.recv().unwrap(); | |
85aaf69f SL |
1994 | let t2 = thread::spawn(move|| { |
1995 | for _ in 0..40 { | |
1a4d82fc JJ |
1996 | tx2.send(1).unwrap(); |
1997 | } | |
1998 | }); | |
1999 | t1.join().ok().unwrap(); | |
2000 | t2.join().ok().unwrap(); | |
2001 | } | |
2002 | ||
2003 | #[test] | |
2004 | fn recv_from_outside_runtime() { | |
c34b1796 | 2005 | let (tx, rx) = channel::<i32>(); |
85aaf69f SL |
2006 | let t = thread::spawn(move|| { |
2007 | for _ in 0..40 { | |
1a4d82fc JJ |
2008 | assert_eq!(rx.recv().unwrap(), 1); |
2009 | } | |
2010 | }); | |
85aaf69f | 2011 | for _ in 0..40 { |
1a4d82fc JJ |
2012 | tx.send(1).unwrap(); |
2013 | } | |
2014 | t.join().ok().unwrap(); | |
2015 | } | |
2016 | ||
2017 | #[test] | |
2018 | fn no_runtime() { | |
c34b1796 AL |
2019 | let (tx1, rx1) = channel::<i32>(); |
2020 | let (tx2, rx2) = channel::<i32>(); | |
85aaf69f | 2021 | let t1 = thread::spawn(move|| { |
1a4d82fc JJ |
2022 | assert_eq!(rx1.recv().unwrap(), 1); |
2023 | tx2.send(2).unwrap(); | |
2024 | }); | |
85aaf69f | 2025 | let t2 = thread::spawn(move|| { |
1a4d82fc JJ |
2026 | tx1.send(1).unwrap(); |
2027 | assert_eq!(rx2.recv().unwrap(), 2); | |
2028 | }); | |
2029 | t1.join().ok().unwrap(); | |
2030 | t2.join().ok().unwrap(); | |
2031 | } | |
2032 | ||
2033 | #[test] | |
2034 | fn oneshot_single_thread_close_port_first() { | |
2035 | // Simple test of closing without sending | |
c34b1796 | 2036 | let (_tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2037 | drop(rx); |
2038 | } | |
2039 | ||
2040 | #[test] | |
2041 | fn oneshot_single_thread_close_chan_first() { | |
2042 | // Simple test of closing without sending | |
c34b1796 | 2043 | let (tx, _rx) = channel::<i32>(); |
1a4d82fc JJ |
2044 | drop(tx); |
2045 | } | |
2046 | ||
2047 | #[test] | |
2048 | fn oneshot_single_thread_send_port_close() { | |
2049 | // Testing that the sender cleans up the payload if receiver is closed | |
c34b1796 | 2050 | let (tx, rx) = channel::<Box<i32>>(); |
1a4d82fc JJ |
2051 | drop(rx); |
2052 | assert!(tx.send(box 0).is_err()); | |
2053 | } | |
2054 | ||
2055 | #[test] | |
2056 | fn oneshot_single_thread_recv_chan_close() { | |
2057 | // Receiving on a closed chan will panic | |
85aaf69f | 2058 | let res = thread::spawn(move|| { |
c34b1796 | 2059 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2060 | drop(tx); |
2061 | rx.recv().unwrap(); | |
2062 | }).join(); | |
2063 | // What is our res? | |
2064 | assert!(res.is_err()); | |
2065 | } | |
2066 | ||
2067 | #[test] | |
2068 | fn oneshot_single_thread_send_then_recv() { | |
c34b1796 | 2069 | let (tx, rx) = channel::<Box<i32>>(); |
1a4d82fc | 2070 | tx.send(box 10).unwrap(); |
7cac9316 | 2071 | assert!(*rx.recv().unwrap() == 10); |
1a4d82fc JJ |
2072 | } |
2073 | ||
2074 | #[test] | |
2075 | fn oneshot_single_thread_try_send_open() { | |
c34b1796 | 2076 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2077 | assert!(tx.send(10).is_ok()); |
2078 | assert!(rx.recv().unwrap() == 10); | |
2079 | } | |
2080 | ||
2081 | #[test] | |
2082 | fn oneshot_single_thread_try_send_closed() { | |
c34b1796 | 2083 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2084 | drop(rx); |
2085 | assert!(tx.send(10).is_err()); | |
2086 | } | |
2087 | ||
2088 | #[test] | |
2089 | fn oneshot_single_thread_try_recv_open() { | |
c34b1796 | 2090 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2091 | tx.send(10).unwrap(); |
2092 | assert!(rx.recv() == Ok(10)); | |
2093 | } | |
2094 | ||
2095 | #[test] | |
2096 | fn oneshot_single_thread_try_recv_closed() { | |
c34b1796 | 2097 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2098 | drop(tx); |
2099 | assert!(rx.recv().is_err()); | |
2100 | } | |
2101 | ||
2102 | #[test] | |
2103 | fn oneshot_single_thread_peek_data() { | |
c34b1796 | 2104 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2105 | assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); |
2106 | tx.send(10).unwrap(); | |
2107 | assert_eq!(rx.try_recv(), Ok(10)); | |
2108 | } | |
2109 | ||
2110 | #[test] | |
2111 | fn oneshot_single_thread_peek_close() { | |
c34b1796 | 2112 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2113 | drop(tx); |
2114 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); | |
2115 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); | |
2116 | } | |
2117 | ||
2118 | #[test] | |
2119 | fn oneshot_single_thread_peek_open() { | |
c34b1796 | 2120 | let (_tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2121 | assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); |
2122 | } | |
2123 | ||
2124 | #[test] | |
2125 | fn oneshot_multi_task_recv_then_send() { | |
c34b1796 | 2126 | let (tx, rx) = channel::<Box<i32>>(); |
85aaf69f | 2127 | let _t = thread::spawn(move|| { |
7cac9316 | 2128 | assert!(*rx.recv().unwrap() == 10); |
1a4d82fc JJ |
2129 | }); |
2130 | ||
2131 | tx.send(box 10).unwrap(); | |
2132 | } | |
2133 | ||
2134 | #[test] | |
2135 | fn oneshot_multi_task_recv_then_close() { | |
c34b1796 | 2136 | let (tx, rx) = channel::<Box<i32>>(); |
85aaf69f | 2137 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2138 | drop(tx); |
2139 | }); | |
85aaf69f | 2140 | let res = thread::spawn(move|| { |
7cac9316 | 2141 | assert!(*rx.recv().unwrap() == 10); |
1a4d82fc JJ |
2142 | }).join(); |
2143 | assert!(res.is_err()); | |
2144 | } | |
2145 | ||
2146 | #[test] | |
2147 | fn oneshot_multi_thread_close_stress() { | |
85aaf69f | 2148 | for _ in 0..stress_factor() { |
c34b1796 | 2149 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 2150 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2151 | drop(rx); |
2152 | }); | |
2153 | drop(tx); | |
2154 | } | |
2155 | } | |
2156 | ||
2157 | #[test] | |
2158 | fn oneshot_multi_thread_send_close_stress() { | |
85aaf69f | 2159 | for _ in 0..stress_factor() { |
c34b1796 | 2160 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 2161 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2162 | drop(rx); |
2163 | }); | |
85aaf69f | 2164 | let _ = thread::spawn(move|| { |
1a4d82fc JJ |
2165 | tx.send(1).unwrap(); |
2166 | }).join(); | |
2167 | } | |
2168 | } | |
2169 | ||
2170 | #[test] | |
2171 | fn oneshot_multi_thread_recv_close_stress() { | |
85aaf69f | 2172 | for _ in 0..stress_factor() { |
c34b1796 | 2173 | let (tx, rx) = channel::<i32>(); |
85aaf69f SL |
2174 | thread::spawn(move|| { |
2175 | let res = thread::spawn(move|| { | |
1a4d82fc JJ |
2176 | rx.recv().unwrap(); |
2177 | }).join(); | |
2178 | assert!(res.is_err()); | |
2179 | }); | |
85aaf69f SL |
2180 | let _t = thread::spawn(move|| { |
2181 | thread::spawn(move|| { | |
1a4d82fc JJ |
2182 | drop(tx); |
2183 | }); | |
2184 | }); | |
2185 | } | |
2186 | } | |
2187 | ||
2188 | #[test] | |
2189 | fn oneshot_multi_thread_send_recv_stress() { | |
85aaf69f | 2190 | for _ in 0..stress_factor() { |
c34b1796 | 2191 | let (tx, rx) = channel::<Box<isize>>(); |
85aaf69f SL |
2192 | let _t = thread::spawn(move|| { |
2193 | tx.send(box 10).unwrap(); | |
1a4d82fc | 2194 | }); |
7cac9316 | 2195 | assert!(*rx.recv().unwrap() == 10); |
1a4d82fc JJ |
2196 | } |
2197 | } | |
2198 | ||
2199 | #[test] | |
2200 | fn stream_send_recv_stress() { | |
85aaf69f | 2201 | for _ in 0..stress_factor() { |
1a4d82fc JJ |
2202 | let (tx, rx) = channel(); |
2203 | ||
2204 | send(tx, 0); | |
2205 | recv(rx, 0); | |
2206 | ||
c34b1796 | 2207 | fn send(tx: Sender<Box<i32>>, i: i32) { |
1a4d82fc JJ |
2208 | if i == 10 { return } |
2209 | ||
85aaf69f | 2210 | thread::spawn(move|| { |
1a4d82fc JJ |
2211 | tx.send(box i).unwrap(); |
2212 | send(tx, i + 1); | |
2213 | }); | |
2214 | } | |
2215 | ||
c34b1796 | 2216 | fn recv(rx: Receiver<Box<i32>>, i: i32) { |
1a4d82fc JJ |
2217 | if i == 10 { return } |
2218 | ||
85aaf69f | 2219 | thread::spawn(move|| { |
7cac9316 | 2220 | assert!(*rx.recv().unwrap() == i); |
1a4d82fc JJ |
2221 | recv(rx, i + 1); |
2222 | }); | |
2223 | } | |
2224 | } | |
2225 | } | |
2226 | ||
3157f602 XL |
2227 | #[test] |
2228 | fn oneshot_single_thread_recv_timeout() { | |
2229 | let (tx, rx) = channel(); | |
2230 | tx.send(()).unwrap(); | |
2231 | assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); | |
2232 | assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); | |
2233 | tx.send(()).unwrap(); | |
2234 | assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); | |
2235 | } | |
2236 | ||
2237 | #[test] | |
2238 | fn stress_recv_timeout_two_threads() { | |
2239 | let (tx, rx) = channel(); | |
2240 | let stress = stress_factor() + 100; | |
2241 | let timeout = Duration::from_millis(100); | |
2242 | ||
2243 | thread::spawn(move || { | |
2244 | for i in 0..stress { | |
2245 | if i % 2 == 0 { | |
2246 | thread::sleep(timeout * 2); | |
2247 | } | |
2248 | tx.send(1usize).unwrap(); | |
2249 | } | |
2250 | }); | |
2251 | ||
2252 | let mut recv_count = 0; | |
2253 | loop { | |
2254 | match rx.recv_timeout(timeout) { | |
2255 | Ok(n) => { | |
2256 | assert_eq!(n, 1usize); | |
2257 | recv_count += 1; | |
2258 | } | |
2259 | Err(RecvTimeoutError::Timeout) => continue, | |
2260 | Err(RecvTimeoutError::Disconnected) => break, | |
2261 | } | |
2262 | } | |
2263 | ||
2264 | assert_eq!(recv_count, stress); | |
2265 | } | |
2266 | ||
2267 | #[test] | |
2268 | fn recv_timeout_upgrade() { | |
2269 | let (tx, rx) = channel::<()>(); | |
2270 | let timeout = Duration::from_millis(1); | |
2271 | let _tx_clone = tx.clone(); | |
2272 | ||
2273 | let start = Instant::now(); | |
2274 | assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); | |
2275 | assert!(Instant::now() >= start + timeout); | |
2276 | } | |
2277 | ||
2278 | #[test] | |
2279 | fn stress_recv_timeout_shared() { | |
2280 | let (tx, rx) = channel(); | |
2281 | let stress = stress_factor() + 100; | |
2282 | ||
2283 | for i in 0..stress { | |
2284 | let tx = tx.clone(); | |
2285 | thread::spawn(move || { | |
2286 | thread::sleep(Duration::from_millis(i as u64 * 10)); | |
2287 | tx.send(1usize).unwrap(); | |
2288 | }); | |
2289 | } | |
2290 | ||
2291 | drop(tx); | |
2292 | ||
2293 | let mut recv_count = 0; | |
2294 | loop { | |
2295 | match rx.recv_timeout(Duration::from_millis(10)) { | |
2296 | Ok(n) => { | |
2297 | assert_eq!(n, 1usize); | |
2298 | recv_count += 1; | |
2299 | } | |
2300 | Err(RecvTimeoutError::Timeout) => continue, | |
2301 | Err(RecvTimeoutError::Disconnected) => break, | |
2302 | } | |
2303 | } | |
2304 | ||
2305 | assert_eq!(recv_count, stress); | |
2306 | } | |
2307 | ||
0731742a XL |
2308 | #[test] |
2309 | fn very_long_recv_timeout_wont_panic() { | |
2310 | let (tx, rx) = channel::<()>(); | |
2311 | let join_handle = thread::spawn(move || { | |
2312 | rx.recv_timeout(Duration::from_secs(u64::max_value())) | |
2313 | }); | |
2314 | thread::sleep(Duration::from_secs(1)); | |
2315 | assert!(tx.send(()).is_ok()); | |
2316 | assert_eq!(join_handle.join().unwrap(), Ok(())); | |
2317 | } | |
2318 | ||
1a4d82fc JJ |
2319 | #[test] |
2320 | fn recv_a_lot() { | |
2321 | // Regression test that we don't run out of stack in scheduler context | |
2322 | let (tx, rx) = channel(); | |
85aaf69f SL |
2323 | for _ in 0..10000 { tx.send(()).unwrap(); } |
2324 | for _ in 0..10000 { rx.recv().unwrap(); } | |
1a4d82fc JJ |
2325 | } |
2326 | ||
3157f602 XL |
2327 | #[test] |
2328 | fn shared_recv_timeout() { | |
2329 | let (tx, rx) = channel(); | |
2330 | let total = 5; | |
2331 | for _ in 0..total { | |
2332 | let tx = tx.clone(); | |
2333 | thread::spawn(move|| { | |
2334 | tx.send(()).unwrap(); | |
2335 | }); | |
2336 | } | |
2337 | ||
2338 | for _ in 0..total { rx.recv().unwrap(); } | |
2339 | ||
2340 | assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); | |
2341 | tx.send(()).unwrap(); | |
2342 | assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); | |
2343 | } | |
2344 | ||
1a4d82fc JJ |
2345 | #[test] |
2346 | fn shared_chan_stress() { | |
2347 | let (tx, rx) = channel(); | |
2348 | let total = stress_factor() + 100; | |
85aaf69f | 2349 | for _ in 0..total { |
1a4d82fc | 2350 | let tx = tx.clone(); |
85aaf69f | 2351 | thread::spawn(move|| { |
1a4d82fc JJ |
2352 | tx.send(()).unwrap(); |
2353 | }); | |
2354 | } | |
2355 | ||
85aaf69f | 2356 | for _ in 0..total { |
1a4d82fc JJ |
2357 | rx.recv().unwrap(); |
2358 | } | |
2359 | } | |
2360 | ||
2361 | #[test] | |
2362 | fn test_nested_recv_iter() { | |
c34b1796 AL |
2363 | let (tx, rx) = channel::<i32>(); |
2364 | let (total_tx, total_rx) = channel::<i32>(); | |
1a4d82fc | 2365 | |
85aaf69f | 2366 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2367 | let mut acc = 0; |
2368 | for x in rx.iter() { | |
2369 | acc += x; | |
2370 | } | |
2371 | total_tx.send(acc).unwrap(); | |
2372 | }); | |
2373 | ||
2374 | tx.send(3).unwrap(); | |
2375 | tx.send(1).unwrap(); | |
2376 | tx.send(2).unwrap(); | |
2377 | drop(tx); | |
2378 | assert_eq!(total_rx.recv().unwrap(), 6); | |
2379 | } | |
2380 | ||
2381 | #[test] | |
2382 | fn test_recv_iter_break() { | |
c34b1796 | 2383 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
2384 | let (count_tx, count_rx) = channel(); |
2385 | ||
85aaf69f | 2386 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2387 | let mut count = 0; |
2388 | for x in rx.iter() { | |
2389 | if count >= 3 { | |
2390 | break; | |
2391 | } else { | |
2392 | count += x; | |
2393 | } | |
2394 | } | |
2395 | count_tx.send(count).unwrap(); | |
2396 | }); | |
2397 | ||
2398 | tx.send(2).unwrap(); | |
2399 | tx.send(2).unwrap(); | |
2400 | tx.send(2).unwrap(); | |
2401 | let _ = tx.send(2); | |
2402 | drop(tx); | |
2403 | assert_eq!(count_rx.recv().unwrap(), 4); | |
2404 | } | |
2405 | ||
5bcae85e SL |
2406 | #[test] |
2407 | fn test_recv_try_iter() { | |
2408 | let (request_tx, request_rx) = channel(); | |
2409 | let (response_tx, response_rx) = channel(); | |
2410 | ||
2411 | // Request `x`s until we have `6`. | |
2412 | let t = thread::spawn(move|| { | |
2413 | let mut count = 0; | |
2414 | loop { | |
2415 | for x in response_rx.try_iter() { | |
2416 | count += x; | |
2417 | if count == 6 { | |
2418 | return count; | |
2419 | } | |
2420 | } | |
2421 | request_tx.send(()).unwrap(); | |
2422 | } | |
2423 | }); | |
2424 | ||
2425 | for _ in request_rx.iter() { | |
2426 | if response_tx.send(2).is_err() { | |
2427 | break; | |
2428 | } | |
2429 | } | |
2430 | ||
2431 | assert_eq!(t.join().unwrap(), 6); | |
2432 | } | |
2433 | ||
d9579d0f AL |
2434 | #[test] |
2435 | fn test_recv_into_iter_owned() { | |
2436 | let mut iter = { | |
2437 | let (tx, rx) = channel::<i32>(); | |
2438 | tx.send(1).unwrap(); | |
2439 | tx.send(2).unwrap(); | |
2440 | ||
2441 | rx.into_iter() | |
2442 | }; | |
2443 | assert_eq!(iter.next().unwrap(), 1); | |
2444 | assert_eq!(iter.next().unwrap(), 2); | |
2445 | assert_eq!(iter.next().is_none(), true); | |
2446 | } | |
2447 | ||
2448 | #[test] | |
2449 | fn test_recv_into_iter_borrowed() { | |
2450 | let (tx, rx) = channel::<i32>(); | |
2451 | tx.send(1).unwrap(); | |
2452 | tx.send(2).unwrap(); | |
2453 | drop(tx); | |
2454 | let mut iter = (&rx).into_iter(); | |
2455 | assert_eq!(iter.next().unwrap(), 1); | |
2456 | assert_eq!(iter.next().unwrap(), 2); | |
2457 | assert_eq!(iter.next().is_none(), true); | |
2458 | } | |
2459 | ||
1a4d82fc JJ |
2460 | #[test] |
2461 | fn try_recv_states() { | |
c34b1796 | 2462 | let (tx1, rx1) = channel::<i32>(); |
1a4d82fc JJ |
2463 | let (tx2, rx2) = channel::<()>(); |
2464 | let (tx3, rx3) = channel::<()>(); | |
85aaf69f | 2465 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2466 | rx2.recv().unwrap(); |
2467 | tx1.send(1).unwrap(); | |
2468 | tx3.send(()).unwrap(); | |
2469 | rx2.recv().unwrap(); | |
2470 | drop(tx1); | |
2471 | tx3.send(()).unwrap(); | |
2472 | }); | |
2473 | ||
2474 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); | |
2475 | tx2.send(()).unwrap(); | |
2476 | rx3.recv().unwrap(); | |
2477 | assert_eq!(rx1.try_recv(), Ok(1)); | |
2478 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); | |
2479 | tx2.send(()).unwrap(); | |
2480 | rx3.recv().unwrap(); | |
2481 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); | |
2482 | } | |
2483 | ||
2484 | // This bug used to end up in a livelock inside of the Receiver destructor | |
2485 | // because the internal state of the Shared packet was corrupted | |
2486 | #[test] | |
2487 | fn destroy_upgraded_shared_port_when_sender_still_active() { | |
2488 | let (tx, rx) = channel(); | |
2489 | let (tx2, rx2) = channel(); | |
85aaf69f | 2490 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2491 | rx.recv().unwrap(); // wait on a oneshot |
2492 | drop(rx); // destroy a shared | |
2493 | tx2.send(()).unwrap(); | |
2494 | }); | |
bd371182 | 2495 | // make sure the other thread has gone to sleep |
85aaf69f | 2496 | for _ in 0..5000 { thread::yield_now(); } |
1a4d82fc JJ |
2497 | |
2498 | // upgrade to a shared chan and send a message | |
2499 | let t = tx.clone(); | |
2500 | drop(tx); | |
2501 | t.send(()).unwrap(); | |
2502 | ||
bd371182 | 2503 | // wait for the child thread to exit before we exit |
1a4d82fc JJ |
2504 | rx2.recv().unwrap(); |
2505 | } | |
c30ab7b3 SL |
2506 | |
2507 | #[test] | |
2508 | fn issue_32114() { | |
2509 | let (tx, _) = channel(); | |
2510 | let _ = tx.send(123); | |
2511 | assert_eq!(tx.send(123), Err(SendError(123))); | |
2512 | } | |
1a4d82fc JJ |
2513 | } |
2514 | ||
c30ab7b3 | 2515 | #[cfg(all(test, not(target_os = "emscripten")))] |
1a4d82fc | 2516 | mod sync_tests { |
c1a9b12d | 2517 | use env; |
85aaf69f | 2518 | use thread; |
1a4d82fc | 2519 | use super::*; |
3157f602 | 2520 | use time::Duration; |
1a4d82fc | 2521 | |
c34b1796 | 2522 | pub fn stress_factor() -> usize { |
85aaf69f SL |
2523 | match env::var("RUST_TEST_STRESS") { |
2524 | Ok(val) => val.parse().unwrap(), | |
2525 | Err(..) => 1, | |
1a4d82fc JJ |
2526 | } |
2527 | } | |
2528 | ||
2529 | #[test] | |
2530 | fn smoke() { | |
c34b1796 | 2531 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
2532 | tx.send(1).unwrap(); |
2533 | assert_eq!(rx.recv().unwrap(), 1); | |
2534 | } | |
2535 | ||
2536 | #[test] | |
2537 | fn drop_full() { | |
c34b1796 | 2538 | let (tx, _rx) = sync_channel::<Box<isize>>(1); |
85aaf69f | 2539 | tx.send(box 1).unwrap(); |
1a4d82fc JJ |
2540 | } |
2541 | ||
2542 | #[test] | |
2543 | fn smoke_shared() { | |
c34b1796 | 2544 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
2545 | tx.send(1).unwrap(); |
2546 | assert_eq!(rx.recv().unwrap(), 1); | |
2547 | let tx = tx.clone(); | |
2548 | tx.send(1).unwrap(); | |
2549 | assert_eq!(rx.recv().unwrap(), 1); | |
2550 | } | |
2551 | ||
3157f602 XL |
2552 | #[test] |
2553 | fn recv_timeout() { | |
2554 | let (tx, rx) = sync_channel::<i32>(1); | |
2555 | assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); | |
2556 | tx.send(1).unwrap(); | |
2557 | assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); | |
2558 | } | |
2559 | ||
1a4d82fc JJ |
2560 | #[test] |
2561 | fn smoke_threads() { | |
c34b1796 | 2562 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 2563 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2564 | tx.send(1).unwrap(); |
2565 | }); | |
2566 | assert_eq!(rx.recv().unwrap(), 1); | |
2567 | } | |
2568 | ||
2569 | #[test] | |
2570 | fn smoke_port_gone() { | |
c34b1796 | 2571 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2572 | drop(rx); |
2573 | assert!(tx.send(1).is_err()); | |
2574 | } | |
2575 | ||
2576 | #[test] | |
2577 | fn smoke_shared_port_gone2() { | |
c34b1796 | 2578 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2579 | drop(rx); |
2580 | let tx2 = tx.clone(); | |
2581 | drop(tx); | |
2582 | assert!(tx2.send(1).is_err()); | |
2583 | } | |
2584 | ||
2585 | #[test] | |
2586 | fn port_gone_concurrent() { | |
c34b1796 | 2587 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 2588 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2589 | rx.recv().unwrap(); |
2590 | }); | |
2591 | while tx.send(1).is_ok() {} | |
2592 | } | |
2593 | ||
2594 | #[test] | |
2595 | fn port_gone_concurrent_shared() { | |
c34b1796 | 2596 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc | 2597 | let tx2 = tx.clone(); |
85aaf69f | 2598 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2599 | rx.recv().unwrap(); |
2600 | }); | |
2601 | while tx.send(1).is_ok() && tx2.send(1).is_ok() {} | |
2602 | } | |
2603 | ||
2604 | #[test] | |
2605 | fn smoke_chan_gone() { | |
c34b1796 | 2606 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2607 | drop(tx); |
2608 | assert!(rx.recv().is_err()); | |
2609 | } | |
2610 | ||
2611 | #[test] | |
2612 | fn smoke_chan_gone_shared() { | |
2613 | let (tx, rx) = sync_channel::<()>(0); | |
2614 | let tx2 = tx.clone(); | |
2615 | drop(tx); | |
2616 | drop(tx2); | |
2617 | assert!(rx.recv().is_err()); | |
2618 | } | |
2619 | ||
2620 | #[test] | |
2621 | fn chan_gone_concurrent() { | |
c34b1796 | 2622 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 2623 | thread::spawn(move|| { |
1a4d82fc JJ |
2624 | tx.send(1).unwrap(); |
2625 | tx.send(1).unwrap(); | |
2626 | }); | |
2627 | while rx.recv().is_ok() {} | |
2628 | } | |
2629 | ||
2630 | #[test] | |
2631 | fn stress() { | |
c34b1796 | 2632 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f SL |
2633 | thread::spawn(move|| { |
2634 | for _ in 0..10000 { tx.send(1).unwrap(); } | |
1a4d82fc | 2635 | }); |
85aaf69f | 2636 | for _ in 0..10000 { |
1a4d82fc JJ |
2637 | assert_eq!(rx.recv().unwrap(), 1); |
2638 | } | |
2639 | } | |
2640 | ||
2641 | #[test] | |
3157f602 XL |
2642 | fn stress_recv_timeout_two_threads() { |
2643 | let (tx, rx) = sync_channel::<i32>(0); | |
2644 | ||
2645 | thread::spawn(move|| { | |
2646 | for _ in 0..10000 { tx.send(1).unwrap(); } | |
2647 | }); | |
2648 | ||
2649 | let mut recv_count = 0; | |
2650 | loop { | |
2651 | match rx.recv_timeout(Duration::from_millis(1)) { | |
2652 | Ok(v) => { | |
2653 | assert_eq!(v, 1); | |
2654 | recv_count += 1; | |
2655 | }, | |
2656 | Err(RecvTimeoutError::Timeout) => continue, | |
2657 | Err(RecvTimeoutError::Disconnected) => break, | |
2658 | } | |
2659 | } | |
2660 | ||
2661 | assert_eq!(recv_count, 10000); | |
2662 | } | |
2663 | ||
2664 | #[test] | |
2665 | fn stress_recv_timeout_shared() { | |
2666 | const AMT: u32 = 1000; | |
2667 | const NTHREADS: u32 = 8; | |
2668 | let (tx, rx) = sync_channel::<i32>(0); | |
2669 | let (dtx, drx) = sync_channel::<()>(0); | |
2670 | ||
2671 | thread::spawn(move|| { | |
2672 | let mut recv_count = 0; | |
2673 | loop { | |
2674 | match rx.recv_timeout(Duration::from_millis(10)) { | |
2675 | Ok(v) => { | |
2676 | assert_eq!(v, 1); | |
2677 | recv_count += 1; | |
2678 | }, | |
2679 | Err(RecvTimeoutError::Timeout) => continue, | |
2680 | Err(RecvTimeoutError::Disconnected) => break, | |
2681 | } | |
2682 | } | |
2683 | ||
2684 | assert_eq!(recv_count, AMT * NTHREADS); | |
2685 | assert!(rx.try_recv().is_err()); | |
2686 | ||
2687 | dtx.send(()).unwrap(); | |
2688 | }); | |
2689 | ||
2690 | for _ in 0..NTHREADS { | |
2691 | let tx = tx.clone(); | |
2692 | thread::spawn(move|| { | |
2693 | for _ in 0..AMT { tx.send(1).unwrap(); } | |
2694 | }); | |
2695 | } | |
2696 | ||
2697 | drop(tx); | |
2698 | ||
2699 | drx.recv().unwrap(); | |
2700 | } | |
2701 | ||
2702 | #[test] | |
1a4d82fc | 2703 | fn stress_shared() { |
c34b1796 AL |
2704 | const AMT: u32 = 1000; |
2705 | const NTHREADS: u32 = 8; | |
2706 | let (tx, rx) = sync_channel::<i32>(0); | |
1a4d82fc JJ |
2707 | let (dtx, drx) = sync_channel::<()>(0); |
2708 | ||
85aaf69f SL |
2709 | thread::spawn(move|| { |
2710 | for _ in 0..AMT * NTHREADS { | |
1a4d82fc JJ |
2711 | assert_eq!(rx.recv().unwrap(), 1); |
2712 | } | |
2713 | match rx.try_recv() { | |
2714 | Ok(..) => panic!(), | |
2715 | _ => {} | |
2716 | } | |
2717 | dtx.send(()).unwrap(); | |
2718 | }); | |
2719 | ||
85aaf69f | 2720 | for _ in 0..NTHREADS { |
1a4d82fc | 2721 | let tx = tx.clone(); |
85aaf69f SL |
2722 | thread::spawn(move|| { |
2723 | for _ in 0..AMT { tx.send(1).unwrap(); } | |
1a4d82fc JJ |
2724 | }); |
2725 | } | |
2726 | drop(tx); | |
2727 | drx.recv().unwrap(); | |
2728 | } | |
2729 | ||
2730 | #[test] | |
2731 | fn oneshot_single_thread_close_port_first() { | |
2732 | // Simple test of closing without sending | |
c34b1796 | 2733 | let (_tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2734 | drop(rx); |
2735 | } | |
2736 | ||
2737 | #[test] | |
2738 | fn oneshot_single_thread_close_chan_first() { | |
2739 | // Simple test of closing without sending | |
c34b1796 | 2740 | let (tx, _rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2741 | drop(tx); |
2742 | } | |
2743 | ||
2744 | #[test] | |
2745 | fn oneshot_single_thread_send_port_close() { | |
2746 | // Testing that the sender cleans up the payload if receiver is closed | |
c34b1796 | 2747 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
1a4d82fc JJ |
2748 | drop(rx); |
2749 | assert!(tx.send(box 0).is_err()); | |
2750 | } | |
2751 | ||
2752 | #[test] | |
2753 | fn oneshot_single_thread_recv_chan_close() { | |
2754 | // Receiving on a closed chan will panic | |
85aaf69f | 2755 | let res = thread::spawn(move|| { |
c34b1796 | 2756 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2757 | drop(tx); |
2758 | rx.recv().unwrap(); | |
2759 | }).join(); | |
2760 | // What is our res? | |
2761 | assert!(res.is_err()); | |
2762 | } | |
2763 | ||
2764 | #[test] | |
2765 | fn oneshot_single_thread_send_then_recv() { | |
c34b1796 | 2766 | let (tx, rx) = sync_channel::<Box<i32>>(1); |
1a4d82fc | 2767 | tx.send(box 10).unwrap(); |
7cac9316 | 2768 | assert!(*rx.recv().unwrap() == 10); |
1a4d82fc JJ |
2769 | } |
2770 | ||
2771 | #[test] | |
2772 | fn oneshot_single_thread_try_send_open() { | |
c34b1796 | 2773 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
2774 | assert_eq!(tx.try_send(10), Ok(())); |
2775 | assert!(rx.recv().unwrap() == 10); | |
2776 | } | |
2777 | ||
2778 | #[test] | |
2779 | fn oneshot_single_thread_try_send_closed() { | |
c34b1796 | 2780 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2781 | drop(rx); |
2782 | assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); | |
2783 | } | |
2784 | ||
2785 | #[test] | |
2786 | fn oneshot_single_thread_try_send_closed2() { | |
c34b1796 | 2787 | let (tx, _rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2788 | assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); |
2789 | } | |
2790 | ||
2791 | #[test] | |
2792 | fn oneshot_single_thread_try_recv_open() { | |
c34b1796 | 2793 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
2794 | tx.send(10).unwrap(); |
2795 | assert!(rx.recv() == Ok(10)); | |
2796 | } | |
2797 | ||
2798 | #[test] | |
2799 | fn oneshot_single_thread_try_recv_closed() { | |
c34b1796 | 2800 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2801 | drop(tx); |
2802 | assert!(rx.recv().is_err()); | |
2803 | } | |
2804 | ||
5bcae85e SL |
2805 | #[test] |
2806 | fn oneshot_single_thread_try_recv_closed_with_data() { | |
2807 | let (tx, rx) = sync_channel::<i32>(1); | |
2808 | tx.send(10).unwrap(); | |
2809 | drop(tx); | |
2810 | assert_eq!(rx.try_recv(), Ok(10)); | |
2811 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); | |
2812 | } | |
2813 | ||
1a4d82fc JJ |
2814 | #[test] |
2815 | fn oneshot_single_thread_peek_data() { | |
c34b1796 | 2816 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
2817 | assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); |
2818 | tx.send(10).unwrap(); | |
2819 | assert_eq!(rx.try_recv(), Ok(10)); | |
2820 | } | |
2821 | ||
2822 | #[test] | |
2823 | fn oneshot_single_thread_peek_close() { | |
c34b1796 | 2824 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2825 | drop(tx); |
2826 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); | |
2827 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); | |
2828 | } | |
2829 | ||
2830 | #[test] | |
2831 | fn oneshot_single_thread_peek_open() { | |
c34b1796 | 2832 | let (_tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2833 | assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); |
2834 | } | |
2835 | ||
2836 | #[test] | |
2837 | fn oneshot_multi_task_recv_then_send() { | |
c34b1796 | 2838 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
85aaf69f | 2839 | let _t = thread::spawn(move|| { |
7cac9316 | 2840 | assert!(*rx.recv().unwrap() == 10); |
1a4d82fc JJ |
2841 | }); |
2842 | ||
2843 | tx.send(box 10).unwrap(); | |
2844 | } | |
2845 | ||
2846 | #[test] | |
2847 | fn oneshot_multi_task_recv_then_close() { | |
c34b1796 | 2848 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
85aaf69f | 2849 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2850 | drop(tx); |
2851 | }); | |
85aaf69f | 2852 | let res = thread::spawn(move|| { |
7cac9316 | 2853 | assert!(*rx.recv().unwrap() == 10); |
1a4d82fc JJ |
2854 | }).join(); |
2855 | assert!(res.is_err()); | |
2856 | } | |
2857 | ||
2858 | #[test] | |
2859 | fn oneshot_multi_thread_close_stress() { | |
85aaf69f | 2860 | for _ in 0..stress_factor() { |
c34b1796 | 2861 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 2862 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2863 | drop(rx); |
2864 | }); | |
2865 | drop(tx); | |
2866 | } | |
2867 | } | |
2868 | ||
2869 | #[test] | |
2870 | fn oneshot_multi_thread_send_close_stress() { | |
85aaf69f | 2871 | for _ in 0..stress_factor() { |
c34b1796 | 2872 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 2873 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2874 | drop(rx); |
2875 | }); | |
85aaf69f | 2876 | let _ = thread::spawn(move || { |
1a4d82fc JJ |
2877 | tx.send(1).unwrap(); |
2878 | }).join(); | |
2879 | } | |
2880 | } | |
2881 | ||
2882 | #[test] | |
2883 | fn oneshot_multi_thread_recv_close_stress() { | |
85aaf69f | 2884 | for _ in 0..stress_factor() { |
c34b1796 | 2885 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f SL |
2886 | let _t = thread::spawn(move|| { |
2887 | let res = thread::spawn(move|| { | |
1a4d82fc JJ |
2888 | rx.recv().unwrap(); |
2889 | }).join(); | |
2890 | assert!(res.is_err()); | |
2891 | }); | |
85aaf69f SL |
2892 | let _t = thread::spawn(move|| { |
2893 | thread::spawn(move|| { | |
1a4d82fc JJ |
2894 | drop(tx); |
2895 | }); | |
2896 | }); | |
2897 | } | |
2898 | } | |
2899 | ||
2900 | #[test] | |
2901 | fn oneshot_multi_thread_send_recv_stress() { | |
85aaf69f | 2902 | for _ in 0..stress_factor() { |
c34b1796 | 2903 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
85aaf69f SL |
2904 | let _t = thread::spawn(move|| { |
2905 | tx.send(box 10).unwrap(); | |
1a4d82fc | 2906 | }); |
7cac9316 | 2907 | assert!(*rx.recv().unwrap() == 10); |
1a4d82fc JJ |
2908 | } |
2909 | } | |
2910 | ||
2911 | #[test] | |
2912 | fn stream_send_recv_stress() { | |
85aaf69f | 2913 | for _ in 0..stress_factor() { |
c34b1796 | 2914 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
1a4d82fc JJ |
2915 | |
2916 | send(tx, 0); | |
2917 | recv(rx, 0); | |
2918 | ||
c34b1796 | 2919 | fn send(tx: SyncSender<Box<i32>>, i: i32) { |
1a4d82fc JJ |
2920 | if i == 10 { return } |
2921 | ||
85aaf69f | 2922 | thread::spawn(move|| { |
1a4d82fc JJ |
2923 | tx.send(box i).unwrap(); |
2924 | send(tx, i + 1); | |
2925 | }); | |
2926 | } | |
2927 | ||
c34b1796 | 2928 | fn recv(rx: Receiver<Box<i32>>, i: i32) { |
1a4d82fc JJ |
2929 | if i == 10 { return } |
2930 | ||
85aaf69f | 2931 | thread::spawn(move|| { |
7cac9316 | 2932 | assert!(*rx.recv().unwrap() == i); |
1a4d82fc JJ |
2933 | recv(rx, i + 1); |
2934 | }); | |
2935 | } | |
2936 | } | |
2937 | } | |
2938 | ||
2939 | #[test] | |
2940 | fn recv_a_lot() { | |
2941 | // Regression test that we don't run out of stack in scheduler context | |
2942 | let (tx, rx) = sync_channel(10000); | |
85aaf69f SL |
2943 | for _ in 0..10000 { tx.send(()).unwrap(); } |
2944 | for _ in 0..10000 { rx.recv().unwrap(); } | |
1a4d82fc JJ |
2945 | } |
2946 | ||
2947 | #[test] | |
2948 | fn shared_chan_stress() { | |
2949 | let (tx, rx) = sync_channel(0); | |
2950 | let total = stress_factor() + 100; | |
85aaf69f | 2951 | for _ in 0..total { |
1a4d82fc | 2952 | let tx = tx.clone(); |
85aaf69f | 2953 | thread::spawn(move|| { |
1a4d82fc JJ |
2954 | tx.send(()).unwrap(); |
2955 | }); | |
2956 | } | |
2957 | ||
85aaf69f | 2958 | for _ in 0..total { |
1a4d82fc JJ |
2959 | rx.recv().unwrap(); |
2960 | } | |
2961 | } | |
2962 | ||
2963 | #[test] | |
2964 | fn test_nested_recv_iter() { | |
c34b1796 AL |
2965 | let (tx, rx) = sync_channel::<i32>(0); |
2966 | let (total_tx, total_rx) = sync_channel::<i32>(0); | |
1a4d82fc | 2967 | |
85aaf69f | 2968 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2969 | let mut acc = 0; |
2970 | for x in rx.iter() { | |
2971 | acc += x; | |
2972 | } | |
2973 | total_tx.send(acc).unwrap(); | |
2974 | }); | |
2975 | ||
2976 | tx.send(3).unwrap(); | |
2977 | tx.send(1).unwrap(); | |
2978 | tx.send(2).unwrap(); | |
2979 | drop(tx); | |
2980 | assert_eq!(total_rx.recv().unwrap(), 6); | |
2981 | } | |
2982 | ||
2983 | #[test] | |
2984 | fn test_recv_iter_break() { | |
c34b1796 | 2985 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2986 | let (count_tx, count_rx) = sync_channel(0); |
2987 | ||
85aaf69f | 2988 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2989 | let mut count = 0; |
2990 | for x in rx.iter() { | |
2991 | if count >= 3 { | |
2992 | break; | |
2993 | } else { | |
2994 | count += x; | |
2995 | } | |
2996 | } | |
2997 | count_tx.send(count).unwrap(); | |
2998 | }); | |
2999 | ||
3000 | tx.send(2).unwrap(); | |
3001 | tx.send(2).unwrap(); | |
3002 | tx.send(2).unwrap(); | |
3003 | let _ = tx.try_send(2); | |
3004 | drop(tx); | |
3005 | assert_eq!(count_rx.recv().unwrap(), 4); | |
3006 | } | |
3007 | ||
3008 | #[test] | |
3009 | fn try_recv_states() { | |
c34b1796 | 3010 | let (tx1, rx1) = sync_channel::<i32>(1); |
1a4d82fc JJ |
3011 | let (tx2, rx2) = sync_channel::<()>(1); |
3012 | let (tx3, rx3) = sync_channel::<()>(1); | |
85aaf69f | 3013 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
3014 | rx2.recv().unwrap(); |
3015 | tx1.send(1).unwrap(); | |
3016 | tx3.send(()).unwrap(); | |
3017 | rx2.recv().unwrap(); | |
3018 | drop(tx1); | |
3019 | tx3.send(()).unwrap(); | |
3020 | }); | |
3021 | ||
3022 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); | |
3023 | tx2.send(()).unwrap(); | |
3024 | rx3.recv().unwrap(); | |
3025 | assert_eq!(rx1.try_recv(), Ok(1)); | |
3026 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); | |
3027 | tx2.send(()).unwrap(); | |
3028 | rx3.recv().unwrap(); | |
3029 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); | |
3030 | } | |
3031 | ||
3032 | // This bug used to end up in a livelock inside of the Receiver destructor | |
3033 | // because the internal state of the Shared packet was corrupted | |
3034 | #[test] | |
3035 | fn destroy_upgraded_shared_port_when_sender_still_active() { | |
3036 | let (tx, rx) = sync_channel::<()>(0); | |
3037 | let (tx2, rx2) = sync_channel::<()>(0); | |
85aaf69f | 3038 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
3039 | rx.recv().unwrap(); // wait on a oneshot |
3040 | drop(rx); // destroy a shared | |
3041 | tx2.send(()).unwrap(); | |
3042 | }); | |
bd371182 | 3043 | // make sure the other thread has gone to sleep |
85aaf69f | 3044 | for _ in 0..5000 { thread::yield_now(); } |
1a4d82fc JJ |
3045 | |
3046 | // upgrade to a shared chan and send a message | |
3047 | let t = tx.clone(); | |
3048 | drop(tx); | |
3049 | t.send(()).unwrap(); | |
3050 | ||
bd371182 | 3051 | // wait for the child thread to exit before we exit |
1a4d82fc JJ |
3052 | rx2.recv().unwrap(); |
3053 | } | |
3054 | ||
3055 | #[test] | |
3056 | fn send1() { | |
c34b1796 | 3057 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 3058 | let _t = thread::spawn(move|| { rx.recv().unwrap(); }); |
1a4d82fc JJ |
3059 | assert_eq!(tx.send(1), Ok(())); |
3060 | } | |
3061 | ||
3062 | #[test] | |
3063 | fn send2() { | |
c34b1796 | 3064 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 3065 | let _t = thread::spawn(move|| { drop(rx); }); |
1a4d82fc JJ |
3066 | assert!(tx.send(1).is_err()); |
3067 | } | |
3068 | ||
3069 | #[test] | |
3070 | fn send3() { | |
c34b1796 | 3071 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc | 3072 | assert_eq!(tx.send(1), Ok(())); |
85aaf69f | 3073 | let _t =thread::spawn(move|| { drop(rx); }); |
1a4d82fc JJ |
3074 | assert!(tx.send(1).is_err()); |
3075 | } | |
3076 | ||
3077 | #[test] | |
3078 | fn send4() { | |
c34b1796 | 3079 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
3080 | let tx2 = tx.clone(); |
3081 | let (done, donerx) = channel(); | |
3082 | let done2 = done.clone(); | |
85aaf69f | 3083 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
3084 | assert!(tx.send(1).is_err()); |
3085 | done.send(()).unwrap(); | |
3086 | }); | |
85aaf69f | 3087 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
3088 | assert!(tx2.send(2).is_err()); |
3089 | done2.send(()).unwrap(); | |
3090 | }); | |
3091 | drop(rx); | |
3092 | donerx.recv().unwrap(); | |
3093 | donerx.recv().unwrap(); | |
3094 | } | |
3095 | ||
3096 | #[test] | |
3097 | fn try_send1() { | |
c34b1796 | 3098 | let (tx, _rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
3099 | assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); |
3100 | } | |
3101 | ||
3102 | #[test] | |
3103 | fn try_send2() { | |
c34b1796 | 3104 | let (tx, _rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
3105 | assert_eq!(tx.try_send(1), Ok(())); |
3106 | assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); | |
3107 | } | |
3108 | ||
3109 | #[test] | |
3110 | fn try_send3() { | |
c34b1796 | 3111 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
3112 | assert_eq!(tx.try_send(1), Ok(())); |
3113 | drop(rx); | |
3114 | assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); | |
3115 | } | |
3116 | ||
3117 | #[test] | |
3118 | fn issue_15761() { | |
3119 | fn repro() { | |
3120 | let (tx1, rx1) = sync_channel::<()>(3); | |
3121 | let (tx2, rx2) = sync_channel::<()>(3); | |
3122 | ||
85aaf69f | 3123 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
3124 | rx1.recv().unwrap(); |
3125 | tx2.try_send(()).unwrap(); | |
3126 | }); | |
3127 | ||
3128 | tx1.try_send(()).unwrap(); | |
3129 | rx2.recv().unwrap(); | |
3130 | } | |
3131 | ||
85aaf69f | 3132 | for _ in 0..100 { |
1a4d82fc JJ |
3133 | repro() |
3134 | } | |
3135 | } | |
3136 | } |