]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
10 | ||
85aaf69f | 11 | //! Multi-producer, single-consumer FIFO queue communication primitives. |
1a4d82fc JJ |
12 | //! |
13 | //! This module provides message-based communication over channels, concretely | |
14 | //! defined among three types: | |
15 | //! | |
16 | //! * `Sender` | |
17 | //! * `SyncSender` | |
18 | //! * `Receiver` | |
19 | //! | |
20 | //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both | |
21 | //! senders are clone-able (multi-producer) such that many threads can send | |
22 | //! simultaneously to one receiver (single-consumer). | |
23 | //! | |
24 | //! These channels come in two flavors: | |
25 | //! | |
26 | //! 1. An asynchronous, infinitely buffered channel. The `channel()` function | |
27 | //! will return a `(Sender, Receiver)` tuple where all sends will be | |
28 | //! **asynchronous** (they never block). The channel conceptually has an | |
29 | //! infinite buffer. | |
30 | //! | |
31 | //! 2. A synchronous, bounded channel. The `sync_channel()` function will return | |
32 | //! a `(SyncSender, Receiver)` tuple where the storage for pending messages | |
33 | //! is a pre-allocated buffer of a fixed size. All sends will be | |
34 | //! **synchronous** by blocking until there is buffer space available. Note | |
35 | //! that a bound of 0 is allowed, causing the channel to become a | |
36 | //! "rendezvous" channel where each sender atomically hands off a message to | |
37 | //! a receiver. | |
38 | //! | |
39 | //! ## Disconnection | |
40 | //! | |
41 | //! The send and receive operations on channels will all return a `Result` | |
42 | //! indicating whether the operation succeeded or not. An unsuccessful operation | |
43 | //! is normally indicative of the other half of a channel having "hung up" by | |
44 | //! being dropped in its corresponding thread. | |
45 | //! | |
46 | //! Once half of a channel has been deallocated, most operations can no longer | |
47 | //! continue to make progress, so `Err` will be returned. Many applications will | |
48 | //! continue to `unwrap()` the results returned from this module, instigating a | |
49 | //! propagation of failure among threads if one unexpectedly dies. | |
50 | //! | |
51 | //! # Examples | |
52 | //! | |
53 | //! Simple usage: | |
54 | //! | |
55 | //! ``` | |
85aaf69f | 56 | //! use std::thread; |
1a4d82fc JJ |
57 | //! use std::sync::mpsc::channel; |
58 | //! | |
59 | //! // Create a simple streaming channel | |
60 | //! let (tx, rx) = channel(); | |
85aaf69f SL |
61 | //! thread::spawn(move|| { |
62 | //! tx.send(10).unwrap(); | |
1a4d82fc | 63 | //! }); |
85aaf69f | 64 | //! assert_eq!(rx.recv().unwrap(), 10); |
1a4d82fc JJ |
65 | //! ``` |
66 | //! | |
67 | //! Shared usage: | |
68 | //! | |
69 | //! ``` | |
85aaf69f | 70 | //! use std::thread; |
1a4d82fc JJ |
71 | //! use std::sync::mpsc::channel; |
72 | //! | |
73 | //! // Create a shared channel that can be sent along from many threads | |
74 | //! // where tx is the sending half (tx for transmission), and rx is the receiving | |
75 | //! // half (rx for receiving). | |
76 | //! let (tx, rx) = channel(); | |
85aaf69f | 77 | //! for i in 0..10 { |
1a4d82fc | 78 | //! let tx = tx.clone(); |
85aaf69f | 79 | //! thread::spawn(move|| { |
1a4d82fc JJ |
80 | //! tx.send(i).unwrap(); |
81 | //! }); | |
82 | //! } | |
83 | //! | |
85aaf69f | 84 | //! for _ in 0..10 { |
1a4d82fc JJ |
85 | //! let j = rx.recv().unwrap(); |
86 | //! assert!(0 <= j && j < 10); | |
87 | //! } | |
88 | //! ``` | |
89 | //! | |
90 | //! Propagating panics: | |
91 | //! | |
92 | //! ``` | |
93 | //! use std::sync::mpsc::channel; | |
94 | //! | |
95 | //! // The call to recv() will return an error because the channel has already | |
96 | //! // hung up (or been deallocated) | |
c34b1796 | 97 | //! let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
98 | //! drop(tx); |
99 | //! assert!(rx.recv().is_err()); | |
100 | //! ``` | |
101 | //! | |
102 | //! Synchronous channels: | |
103 | //! | |
104 | //! ``` | |
85aaf69f | 105 | //! use std::thread; |
1a4d82fc JJ |
106 | //! use std::sync::mpsc::sync_channel; |
107 | //! | |
c34b1796 | 108 | //! let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 109 | //! thread::spawn(move|| { |
bd371182 | 110 | //! // This will wait for the parent thread to start receiving |
1a4d82fc JJ |
111 | //! tx.send(53).unwrap(); |
112 | //! }); | |
113 | //! rx.recv().unwrap(); | |
114 | //! ``` | |
1a4d82fc | 115 | |
85aaf69f | 116 | #![stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
117 | |
118 | // A description of how Rust's channel implementation works | |
119 | // | |
120 | // Channels are supposed to be the basic building block for all other | |
121 | // concurrent primitives that are used in Rust. As a result, the channel type | |
122 | // needs to be highly optimized, flexible, and broad enough for use everywhere. | |
123 | // | |
124 | // The choice of implementation of all channels is to be built on lock-free data | |
125 | // structures. The channels themselves are then consequently also lock-free data | |
126 | // structures. As always with lock-free code, this is a very "here be dragons" | |
127 | // territory, especially because I'm unaware of any academic papers that have | |
128 | // gone into great length about channels of these flavors. | |
129 | // | |
130 | // ## Flavors of channels | |
131 | // | |
132 | // From the perspective of a consumer of this library, there is only one flavor | |
133 | // of channel. This channel can be used as a stream and cloned to allow multiple | |
134 | // senders. Under the hood, however, there are actually three flavors of | |
135 | // channels in play. | |
136 | // | |
137 | // * Flavor::Oneshots - these channels are highly optimized for the one-send use case. | |
138 | // They contain as few atomics as possible and involve one and | |
139 | // exactly one allocation. | |
140 | // * Streams - these channels are optimized for the non-shared use case. They | |
141 | // use a different concurrent queue that is more tailored for this | |
142 | // use case. The initial allocation of this flavor of channel is not | |
143 | // optimized. | |
144 | // * Shared - this is the most general form of channel that this module offers, | |
145 | // a channel with multiple senders. This type is as optimized as it | |
146 | // can be, but the previous two types mentioned are much faster for | |
147 | // their use-cases. | |
148 | // | |
149 | // ## Concurrent queues | |
150 | // | |
151 | // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but | |
152 | // recv() obviously blocks. This means that under the hood there must be some | |
153 | // shared and concurrent queue holding all of the actual data. | |
154 | // | |
155 | // With two flavors of channels, two flavors of queues are also used. We have | |
156 | // chosen to use queues from a well-known author that are abbreviated as SPSC | |
157 | // and MPSC (single producer, single consumer and multiple producer, single | |
158 | // consumer). SPSC queues are used for streams while MPSC queues are used for | |
159 | // shared channels. | |
160 | // | |
161 | // ### SPSC optimizations | |
162 | // | |
163 | // The SPSC queue found online is essentially a linked list of nodes where one | |
164 | // half of the nodes are the "queue of data" and the other half of nodes are a | |
165 | // cache of unused nodes. The unused nodes are used such that an allocation is | |
166 | // not required on every push() and a free doesn't need to happen on every | |
167 | // pop(). | |
168 | // | |
169 | // As found online, however, the cache of nodes is of an infinite size. This | |
170 | // means that if a channel at one point in its life had 50k items in the queue, | |
171 | // then the queue will always have the capacity for 50k items. I believed that | |
172 | // this was an unnecessary limitation of the implementation, so I have altered | |
173 | // the queue to optionally have a bound on the cache size. | |
174 | // | |
175 | // By default, streams will have an unbounded SPSC queue with a small-ish cache | |
176 | // size. The hope is that the cache is still large enough to have very fast | |
177 | // send() operations while not too large such that millions of channels can | |
178 | // coexist at once. | |
179 | // | |
180 | // ### MPSC optimizations | |
181 | // | |
182 | // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses | |
183 | // a linked list under the hood to earn its unboundedness, but I have not put | |
184 | // forth much effort into having a cache of nodes similar to the SPSC queue. | |
185 | // | |
186 | // For now, I believe that this is "ok" because shared channels are not the most | |
187 | // common type, but soon we may wish to revisit this queue choice and determine | |
188 | // another candidate for backend storage of shared channels. | |
189 | // | |
190 | // ## Overview of the Implementation | |
191 | // | |
192 | // Now that there's a little background on the concurrent queues used, it's | |
193 | // worth going into much more detail about the channels themselves. The basic | |
194 | // pseudocode for a send/recv are: | |
195 | // | |
196 | // | |
197 | // send(t) recv() | |
198 | // queue.push(t) return if queue.pop() | |
199 | // if increment() == -1 deschedule { | |
200 | // wakeup() if decrement() > 0 | |
201 | // cancel_deschedule() | |
202 | // } | |
203 | // queue.pop() | |
204 | // | |
205 | // As mentioned before, there are no locks in this implementation, only atomic | |
206 | // instructions are used. | |
207 | // | |
208 | // ### The internal atomic counter | |
209 | // | |
210 | // Every channel has a shared counter with each half to keep track of the size | |
211 | // of the queue. This counter is used to abort descheduling by the receiver and | |
212 | // to know when to wake up on the sending side. | |
213 | // | |
214 | // As seen in the pseudocode, senders will increment this count and receivers | |
215 | // will decrement the count. The theory behind this is that if a sender sees a | |
216 | // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, | |
217 | // then it doesn't need to block. | |
218 | // | |
219 | // The recv() method has a beginning call to pop(), and if successful, it needs | |
220 | // to decrement the count. It is a crucial implementation detail that this | |
221 | // decrement does *not* happen to the shared counter. If this were the case, | |
222 | // then it would be possible for the counter to be very negative when there were | |
223 | // no receivers waiting, in which case the senders would have to determine when | |
224 | // it was actually appropriate to wake up a receiver. | |
225 | // | |
226 | // Instead, the "steal count" is kept track of separately (not atomically | |
227 | // because it's only used by receivers), and then the decrement() call when | |
228 | // descheduling will lump in all of the recent steals into one large decrement. | |
229 | // | |
230 | // The implication of this is that if a sender sees a -1 count, then there's | |
231 | // guaranteed to be a waiter waiting! | |
232 | // | |
233 | // ## Native Implementation | |
234 | // | |
235 | // A major goal of these channels is to work seamlessly on and off the runtime. | |
236 | // All of the previous race conditions have been worded in terms of | |
237 | // scheduler-isms (which is obviously not available without the runtime). | |
238 | // | |
239 | // For now, native usage of channels (off the runtime) will fall back onto | |
240 | // mutexes/cond vars for descheduling/atomic decisions. The no-contention path | |
241 | // is still entirely lock-free, the "deschedule" blocks above are surrounded by | |
242 | // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a | |
243 | // condition variable. | |
244 | // | |
245 | // ## Select | |
246 | // | |
247 | // Being able to support selection over channels has greatly influenced this | |
248 | // design, and not only does selection need to work inside the runtime, but also | |
249 | // outside the runtime. | |
250 | // | |
251 | // The implementation is fairly straightforward. The goal of select() is not to | |
252 | // return some data, but only to return which channel can receive data without | |
253 | // blocking. The implementation is essentially the entire blocking procedure | |
254 | // followed by an increment as soon as its woken up. The cancellation procedure | |
255 | // involves an increment and swapping out of to_wake to acquire ownership of the | |
bd371182 | 256 | // thread to unblock. |
1a4d82fc JJ |
257 | // |
258 | // Sadly this current implementation requires multiple allocations, so I have | |
259 | // seen the throughput of select() be much worse than it should be. I do not | |
260 | // believe that there is anything fundamental that needs to change about these | |
261 | // channels, however, in order to support a more efficient select(). | |
262 | // | |
263 | // # Conclusion | |
264 | // | |
265 | // And now that you've seen all the races that I found and attempted to fix, | |
266 | // here's the code for you to find some more! | |
267 | ||
1a4d82fc | 268 | use sync::Arc; |
c34b1796 | 269 | use error; |
1a4d82fc | 270 | use fmt; |
1a4d82fc JJ |
271 | use mem; |
272 | use cell::UnsafeCell; | |
bd371182 | 273 | use marker::Reflect; |
1a4d82fc | 274 | |
92a42be0 | 275 | #[unstable(feature = "mpsc_select", issue = "27800")] |
1a4d82fc JJ |
276 | pub use self::select::{Select, Handle}; |
277 | use self::select::StartResult; | |
278 | use self::select::StartResult::*; | |
279 | use self::blocking::SignalToken; | |
280 | ||
281 | mod blocking; | |
282 | mod oneshot; | |
283 | mod select; | |
284 | mod shared; | |
285 | mod stream; | |
286 | mod sync; | |
287 | mod mpsc_queue; | |
288 | mod spsc_queue; | |
289 | ||
290 | /// The receiving-half of Rust's channel type. This half can only be owned by | |
bd371182 | 291 | /// one thread |
85aaf69f | 292 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
293 | pub struct Receiver<T> { |
294 | inner: UnsafeCell<Flavor<T>>, | |
295 | } | |
296 | ||
297 | // The receiver port can be sent from place to place, so long as it | |
298 | // is not used to receive non-sendable things. | |
92a42be0 | 299 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 300 | unsafe impl<T: Send> Send for Receiver<T> { } |
1a4d82fc JJ |
301 | |
302 | /// An iterator over messages on a receiver, this iterator will block | |
303 | /// whenever `next` is called, waiting for a new message, and `None` will be | |
304 | /// returned when the corresponding channel has hung up. | |
85aaf69f | 305 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 306 | pub struct Iter<'a, T: 'a> { |
1a4d82fc JJ |
307 | rx: &'a Receiver<T> |
308 | } | |
309 | ||
d9579d0f AL |
310 | /// An owning iterator over messages on a receiver, this iterator will block |
311 | /// whenever `next` is called, waiting for a new message, and `None` will be | |
312 | /// returned when the corresponding channel has hung up. | |
313 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] | |
314 | pub struct IntoIter<T> { | |
315 | rx: Receiver<T> | |
316 | } | |
317 | ||
1a4d82fc | 318 | /// The sending-half of Rust's asynchronous channel type. This half can only be |
bd371182 | 319 | /// owned by one thread, but it can be cloned to send to other threads. |
85aaf69f | 320 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
321 | pub struct Sender<T> { |
322 | inner: UnsafeCell<Flavor<T>>, | |
323 | } | |
324 | ||
325 | // The send port can be sent from place to place, so long as it | |
326 | // is not used to send non-sendable things. | |
92a42be0 | 327 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 328 | unsafe impl<T: Send> Send for Sender<T> { } |
1a4d82fc JJ |
329 | |
330 | /// The sending-half of Rust's synchronous channel type. This half can only be | |
bd371182 | 331 | /// owned by one thread, but it can be cloned to send to other threads. |
85aaf69f | 332 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc | 333 | pub struct SyncSender<T> { |
85aaf69f | 334 | inner: Arc<UnsafeCell<sync::Packet<T>>>, |
1a4d82fc JJ |
335 | } |
336 | ||
92a42be0 | 337 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 338 | unsafe impl<T: Send> Send for SyncSender<T> {} |
85aaf69f | 339 | |
92a42be0 | 340 | #[stable(feature = "rust1", since = "1.0.0")] |
85aaf69f SL |
341 | impl<T> !Sync for SyncSender<T> {} |
342 | ||
1a4d82fc JJ |
343 | /// An error returned from the `send` function on channels. |
344 | /// | |
345 | /// A `send` operation can only fail if the receiving end of a channel is | |
346 | /// disconnected, implying that the data could never be received. The error | |
347 | /// contains the data being sent as a payload so it can be recovered. | |
85aaf69f SL |
348 | #[stable(feature = "rust1", since = "1.0.0")] |
349 | #[derive(PartialEq, Eq, Clone, Copy)] | |
c34b1796 | 350 | pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T); |
1a4d82fc JJ |
351 | |
352 | /// An error returned from the `recv` function on a `Receiver`. | |
353 | /// | |
354 | /// The `recv` operation can only fail if the sending half of a channel is | |
355 | /// disconnected, implying that no further messages will ever be received. | |
85aaf69f SL |
356 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
357 | #[stable(feature = "rust1", since = "1.0.0")] | |
1a4d82fc JJ |
358 | pub struct RecvError; |
359 | ||
9346a6ac AL |
360 | /// This enumeration is the list of the possible reasons that `try_recv` could |
361 | /// not return data when called. | |
85aaf69f SL |
362 | #[derive(PartialEq, Eq, Clone, Copy, Debug)] |
363 | #[stable(feature = "rust1", since = "1.0.0")] | |
1a4d82fc JJ |
364 | pub enum TryRecvError { |
365 | /// This channel is currently empty, but the sender(s) have not yet | |
366 | /// disconnected, so data may yet become available. | |
85aaf69f | 367 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
368 | Empty, |
369 | ||
370 | /// This channel's sending half has become disconnected, and there will | |
371 | /// never be any more data received on this channel | |
85aaf69f | 372 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
373 | Disconnected, |
374 | } | |
375 | ||
376 | /// This enumeration is the list of the possible error outcomes for the | |
377 | /// `SyncSender::try_send` method. | |
85aaf69f SL |
378 | #[stable(feature = "rust1", since = "1.0.0")] |
379 | #[derive(PartialEq, Eq, Clone, Copy)] | |
1a4d82fc JJ |
380 | pub enum TrySendError<T> { |
381 | /// The data could not be sent on the channel because it would require that | |
382 | /// the callee block to send the data. | |
383 | /// | |
384 | /// If this is a buffered channel, then the buffer is full at this time. If | |
385 | /// this is not a buffered channel, then there is no receiver available to | |
386 | /// acquire the data. | |
85aaf69f | 387 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
388 | Full(T), |
389 | ||
390 | /// This channel's receiving half has disconnected, so the data could not be | |
391 | /// sent. The data is returned back to the callee in this case. | |
85aaf69f | 392 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
393 | Disconnected(T), |
394 | } | |
395 | ||
396 | enum Flavor<T> { | |
85aaf69f SL |
397 | Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>), |
398 | Stream(Arc<UnsafeCell<stream::Packet<T>>>), | |
399 | Shared(Arc<UnsafeCell<shared::Packet<T>>>), | |
400 | Sync(Arc<UnsafeCell<sync::Packet<T>>>), | |
1a4d82fc JJ |
401 | } |
402 | ||
403 | #[doc(hidden)] | |
404 | trait UnsafeFlavor<T> { | |
e9174d1e | 405 | fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>; |
1a4d82fc JJ |
406 | unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> { |
407 | &mut *self.inner_unsafe().get() | |
408 | } | |
409 | unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> { | |
410 | &*self.inner_unsafe().get() | |
411 | } | |
412 | } | |
413 | impl<T> UnsafeFlavor<T> for Sender<T> { | |
e9174d1e | 414 | fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> { |
1a4d82fc JJ |
415 | &self.inner |
416 | } | |
417 | } | |
418 | impl<T> UnsafeFlavor<T> for Receiver<T> { | |
e9174d1e | 419 | fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> { |
1a4d82fc JJ |
420 | &self.inner |
421 | } | |
422 | } | |
423 | ||
424 | /// Creates a new asynchronous channel, returning the sender/receiver halves. | |
425 | /// | |
426 | /// All data sent on the sender will become available on the receiver, and no | |
bd371182 | 427 | /// send will block the calling thread (this channel has an "infinite buffer"). |
1a4d82fc | 428 | /// |
c34b1796 | 429 | /// # Examples |
1a4d82fc JJ |
430 | /// |
431 | /// ``` | |
432 | /// use std::sync::mpsc::channel; | |
85aaf69f | 433 | /// use std::thread; |
1a4d82fc | 434 | /// |
b039eaaf | 435 | /// // tx is the sending half (tx for transmission), and rx is the receiving |
1a4d82fc JJ |
436 | /// // half (rx for receiving). |
437 | /// let (tx, rx) = channel(); | |
438 | /// | |
439 | /// // Spawn off an expensive computation | |
85aaf69f | 440 | /// thread::spawn(move|| { |
1a4d82fc JJ |
441 | /// # fn expensive_computation() {} |
442 | /// tx.send(expensive_computation()).unwrap(); | |
443 | /// }); | |
444 | /// | |
445 | /// // Do some useful work for awhile | |
446 | /// | |
447 | /// // Let's see what that answer was | |
448 | /// println!("{:?}", rx.recv().unwrap()); | |
449 | /// ``` | |
85aaf69f | 450 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 451 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
85aaf69f | 452 | let a = Arc::new(UnsafeCell::new(oneshot::Packet::new())); |
1a4d82fc JJ |
453 | (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) |
454 | } | |
455 | ||
456 | /// Creates a new synchronous, bounded channel. | |
457 | /// | |
458 | /// Like asynchronous channels, the `Receiver` will block until a message | |
459 | /// becomes available. These channels differ greatly in the semantics of the | |
460 | /// sender from asynchronous channels, however. | |
461 | /// | |
462 | /// This channel has an internal buffer on which messages will be queued. When | |
463 | /// the internal buffer becomes full, future sends will *block* waiting for the | |
464 | /// buffer to open up. Note that a buffer size of 0 is valid, in which case this | |
465 | /// becomes "rendezvous channel" where each send will not return until a recv | |
466 | /// is paired with it. | |
467 | /// | |
468 | /// As with asynchronous channels, all senders will panic in `send` if the | |
469 | /// `Receiver` has been destroyed. | |
470 | /// | |
c34b1796 | 471 | /// # Examples |
1a4d82fc JJ |
472 | /// |
473 | /// ``` | |
474 | /// use std::sync::mpsc::sync_channel; | |
85aaf69f | 475 | /// use std::thread; |
1a4d82fc JJ |
476 | /// |
477 | /// let (tx, rx) = sync_channel(1); | |
478 | /// | |
479 | /// // this returns immediately | |
85aaf69f | 480 | /// tx.send(1).unwrap(); |
1a4d82fc | 481 | /// |
85aaf69f | 482 | /// thread::spawn(move|| { |
1a4d82fc | 483 | /// // this will block until the previous message has been received |
85aaf69f | 484 | /// tx.send(2).unwrap(); |
1a4d82fc JJ |
485 | /// }); |
486 | /// | |
85aaf69f SL |
487 | /// assert_eq!(rx.recv().unwrap(), 1); |
488 | /// assert_eq!(rx.recv().unwrap(), 2); | |
1a4d82fc | 489 | /// ``` |
85aaf69f | 490 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 491 | pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { |
85aaf69f | 492 | let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound))); |
1a4d82fc JJ |
493 | (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) |
494 | } | |
495 | ||
496 | //////////////////////////////////////////////////////////////////////////////// | |
497 | // Sender | |
498 | //////////////////////////////////////////////////////////////////////////////// | |
499 | ||
c34b1796 | 500 | impl<T> Sender<T> { |
1a4d82fc JJ |
501 | fn new(inner: Flavor<T>) -> Sender<T> { |
502 | Sender { | |
503 | inner: UnsafeCell::new(inner), | |
504 | } | |
505 | } | |
506 | ||
507 | /// Attempts to send a value on this channel, returning it back if it could | |
508 | /// not be sent. | |
509 | /// | |
510 | /// A successful send occurs when it is determined that the other end of | |
511 | /// the channel has not hung up already. An unsuccessful send would be one | |
512 | /// where the corresponding receiver has already been deallocated. Note | |
513 | /// that a return value of `Err` means that the data will never be | |
514 | /// received, but a return value of `Ok` does *not* mean that the data | |
515 | /// will be received. It is possible for the corresponding receiver to | |
516 | /// hang up immediately after this function returns `Ok`. | |
517 | /// | |
518 | /// This method will never block the current thread. | |
519 | /// | |
c34b1796 | 520 | /// # Examples |
1a4d82fc JJ |
521 | /// |
522 | /// ``` | |
523 | /// use std::sync::mpsc::channel; | |
524 | /// | |
525 | /// let (tx, rx) = channel(); | |
526 | /// | |
527 | /// // This send is always successful | |
85aaf69f | 528 | /// tx.send(1).unwrap(); |
1a4d82fc JJ |
529 | /// |
530 | /// // This send will fail because the receiver is gone | |
531 | /// drop(rx); | |
85aaf69f | 532 | /// assert_eq!(tx.send(1).err().unwrap().0, 1); |
1a4d82fc | 533 | /// ``` |
85aaf69f | 534 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
535 | pub fn send(&self, t: T) -> Result<(), SendError<T>> { |
536 | let (new_inner, ret) = match *unsafe { self.inner() } { | |
537 | Flavor::Oneshot(ref p) => { | |
538 | unsafe { | |
539 | let p = p.get(); | |
540 | if !(*p).sent() { | |
541 | return (*p).send(t).map_err(SendError); | |
542 | } else { | |
543 | let a = | |
85aaf69f | 544 | Arc::new(UnsafeCell::new(stream::Packet::new())); |
1a4d82fc JJ |
545 | let rx = Receiver::new(Flavor::Stream(a.clone())); |
546 | match (*p).upgrade(rx) { | |
547 | oneshot::UpSuccess => { | |
548 | let ret = (*a.get()).send(t); | |
549 | (a, ret) | |
550 | } | |
551 | oneshot::UpDisconnected => (a, Err(t)), | |
552 | oneshot::UpWoke(token) => { | |
553 | // This send cannot panic because the thread is | |
554 | // asleep (we're looking at it), so the receiver | |
555 | // can't go away. | |
556 | (*a.get()).send(t).ok().unwrap(); | |
c34b1796 | 557 | token.signal(); |
1a4d82fc JJ |
558 | (a, Ok(())) |
559 | } | |
560 | } | |
561 | } | |
562 | } | |
563 | } | |
564 | Flavor::Stream(ref p) => return unsafe { | |
565 | (*p.get()).send(t).map_err(SendError) | |
566 | }, | |
567 | Flavor::Shared(ref p) => return unsafe { | |
568 | (*p.get()).send(t).map_err(SendError) | |
569 | }, | |
570 | Flavor::Sync(..) => unreachable!(), | |
571 | }; | |
572 | ||
573 | unsafe { | |
574 | let tmp = Sender::new(Flavor::Stream(new_inner)); | |
575 | mem::swap(self.inner_mut(), tmp.inner_mut()); | |
576 | } | |
577 | ret.map_err(SendError) | |
578 | } | |
579 | } | |
580 | ||
85aaf69f | 581 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 582 | impl<T> Clone for Sender<T> { |
1a4d82fc JJ |
583 | fn clone(&self) -> Sender<T> { |
584 | let (packet, sleeper, guard) = match *unsafe { self.inner() } { | |
585 | Flavor::Oneshot(ref p) => { | |
85aaf69f | 586 | let a = Arc::new(UnsafeCell::new(shared::Packet::new())); |
1a4d82fc JJ |
587 | unsafe { |
588 | let guard = (*a.get()).postinit_lock(); | |
589 | let rx = Receiver::new(Flavor::Shared(a.clone())); | |
590 | match (*p.get()).upgrade(rx) { | |
591 | oneshot::UpSuccess | | |
592 | oneshot::UpDisconnected => (a, None, guard), | |
593 | oneshot::UpWoke(task) => (a, Some(task), guard) | |
594 | } | |
595 | } | |
596 | } | |
597 | Flavor::Stream(ref p) => { | |
85aaf69f | 598 | let a = Arc::new(UnsafeCell::new(shared::Packet::new())); |
1a4d82fc JJ |
599 | unsafe { |
600 | let guard = (*a.get()).postinit_lock(); | |
601 | let rx = Receiver::new(Flavor::Shared(a.clone())); | |
602 | match (*p.get()).upgrade(rx) { | |
603 | stream::UpSuccess | | |
604 | stream::UpDisconnected => (a, None, guard), | |
605 | stream::UpWoke(task) => (a, Some(task), guard), | |
606 | } | |
607 | } | |
608 | } | |
609 | Flavor::Shared(ref p) => { | |
610 | unsafe { (*p.get()).clone_chan(); } | |
611 | return Sender::new(Flavor::Shared(p.clone())); | |
612 | } | |
613 | Flavor::Sync(..) => unreachable!(), | |
614 | }; | |
615 | ||
616 | unsafe { | |
617 | (*packet.get()).inherit_blocker(sleeper, guard); | |
618 | ||
619 | let tmp = Sender::new(Flavor::Shared(packet.clone())); | |
620 | mem::swap(self.inner_mut(), tmp.inner_mut()); | |
621 | } | |
622 | Sender::new(Flavor::Shared(packet)) | |
623 | } | |
624 | } | |
625 | ||
85aaf69f | 626 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 627 | impl<T> Drop for Sender<T> { |
1a4d82fc JJ |
628 | fn drop(&mut self) { |
629 | match *unsafe { self.inner_mut() } { | |
630 | Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, | |
631 | Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, | |
632 | Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, | |
633 | Flavor::Sync(..) => unreachable!(), | |
634 | } | |
635 | } | |
636 | } | |
637 | ||
638 | //////////////////////////////////////////////////////////////////////////////// | |
639 | // SyncSender | |
640 | //////////////////////////////////////////////////////////////////////////////// | |
641 | ||
c34b1796 | 642 | impl<T> SyncSender<T> { |
85aaf69f SL |
643 | fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> { |
644 | SyncSender { inner: inner } | |
1a4d82fc JJ |
645 | } |
646 | ||
647 | /// Sends a value on this synchronous channel. | |
648 | /// | |
649 | /// This function will *block* until space in the internal buffer becomes | |
650 | /// available or a receiver is available to hand off the message to. | |
651 | /// | |
652 | /// Note that a successful send does *not* guarantee that the receiver will | |
653 | /// ever see the data if there is a buffer on this channel. Items may be | |
654 | /// enqueued in the internal buffer for the receiver to receive at a later | |
655 | /// time. If the buffer size is 0, however, it can be guaranteed that the | |
656 | /// receiver has indeed received the data if this function returns success. | |
657 | /// | |
658 | /// This function will never panic, but it may return `Err` if the | |
659 | /// `Receiver` has disconnected and is no longer able to receive | |
660 | /// information. | |
85aaf69f | 661 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
662 | pub fn send(&self, t: T) -> Result<(), SendError<T>> { |
663 | unsafe { (*self.inner.get()).send(t).map_err(SendError) } | |
664 | } | |
665 | ||
666 | /// Attempts to send a value on this channel without blocking. | |
667 | /// | |
668 | /// This method differs from `send` by returning immediately if the | |
669 | /// channel's buffer is full or no receiver is waiting to acquire some | |
670 | /// data. Compared with `send`, this function has two failure cases | |
671 | /// instead of one (one for disconnection, one for a full buffer). | |
672 | /// | |
673 | /// See `SyncSender::send` for notes about guarantees of whether the | |
674 | /// receiver has received the data or not if this function is successful. | |
85aaf69f | 675 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
676 | pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { |
677 | unsafe { (*self.inner.get()).try_send(t) } | |
678 | } | |
679 | } | |
680 | ||
85aaf69f | 681 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 682 | impl<T> Clone for SyncSender<T> { |
1a4d82fc JJ |
683 | fn clone(&self) -> SyncSender<T> { |
684 | unsafe { (*self.inner.get()).clone_chan(); } | |
e9174d1e | 685 | SyncSender::new(self.inner.clone()) |
1a4d82fc JJ |
686 | } |
687 | } | |
688 | ||
85aaf69f | 689 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 690 | impl<T> Drop for SyncSender<T> { |
1a4d82fc JJ |
691 | fn drop(&mut self) { |
692 | unsafe { (*self.inner.get()).drop_chan(); } | |
693 | } | |
694 | } | |
695 | ||
696 | //////////////////////////////////////////////////////////////////////////////// | |
697 | // Receiver | |
698 | //////////////////////////////////////////////////////////////////////////////// | |
699 | ||
c34b1796 | 700 | impl<T> Receiver<T> { |
1a4d82fc JJ |
701 | fn new(inner: Flavor<T>) -> Receiver<T> { |
702 | Receiver { inner: UnsafeCell::new(inner) } | |
703 | } | |
704 | ||
705 | /// Attempts to return a pending value on this receiver without blocking | |
706 | /// | |
707 | /// This method will never block the caller in order to wait for data to | |
708 | /// become available. Instead, this will always return immediately with a | |
709 | /// possible option of pending data on the channel. | |
710 | /// | |
711 | /// This is useful for a flavor of "optimistic check" before deciding to | |
712 | /// block on a receiver. | |
85aaf69f | 713 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
714 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
715 | loop { | |
716 | let new_port = match *unsafe { self.inner() } { | |
717 | Flavor::Oneshot(ref p) => { | |
718 | match unsafe { (*p.get()).try_recv() } { | |
719 | Ok(t) => return Ok(t), | |
720 | Err(oneshot::Empty) => return Err(TryRecvError::Empty), | |
721 | Err(oneshot::Disconnected) => { | |
722 | return Err(TryRecvError::Disconnected) | |
723 | } | |
724 | Err(oneshot::Upgraded(rx)) => rx, | |
725 | } | |
726 | } | |
727 | Flavor::Stream(ref p) => { | |
728 | match unsafe { (*p.get()).try_recv() } { | |
729 | Ok(t) => return Ok(t), | |
730 | Err(stream::Empty) => return Err(TryRecvError::Empty), | |
731 | Err(stream::Disconnected) => { | |
732 | return Err(TryRecvError::Disconnected) | |
733 | } | |
734 | Err(stream::Upgraded(rx)) => rx, | |
735 | } | |
736 | } | |
737 | Flavor::Shared(ref p) => { | |
738 | match unsafe { (*p.get()).try_recv() } { | |
739 | Ok(t) => return Ok(t), | |
740 | Err(shared::Empty) => return Err(TryRecvError::Empty), | |
741 | Err(shared::Disconnected) => { | |
742 | return Err(TryRecvError::Disconnected) | |
743 | } | |
744 | } | |
745 | } | |
746 | Flavor::Sync(ref p) => { | |
747 | match unsafe { (*p.get()).try_recv() } { | |
748 | Ok(t) => return Ok(t), | |
749 | Err(sync::Empty) => return Err(TryRecvError::Empty), | |
750 | Err(sync::Disconnected) => { | |
751 | return Err(TryRecvError::Disconnected) | |
752 | } | |
753 | } | |
754 | } | |
755 | }; | |
756 | unsafe { | |
757 | mem::swap(self.inner_mut(), | |
758 | new_port.inner_mut()); | |
759 | } | |
760 | } | |
761 | } | |
762 | ||
9346a6ac | 763 | /// Attempts to wait for a value on this receiver, returning an error if the |
1a4d82fc JJ |
764 | /// corresponding channel has hung up. |
765 | /// | |
766 | /// This function will always block the current thread if there is no data | |
767 | /// available and it's possible for more data to be sent. Once a message is | |
768 | /// sent to the corresponding `Sender`, then this receiver will wake up and | |
769 | /// return that message. | |
770 | /// | |
771 | /// If the corresponding `Sender` has disconnected, or it disconnects while | |
772 | /// this call is blocking, this call will wake up and return `Err` to | |
773 | /// indicate that no more messages can ever be received on this channel. | |
c1a9b12d SL |
774 | /// However, since channels are buffered, messages sent before the disconnect |
775 | /// will still be properly received. | |
776 | /// | |
777 | /// # Examples | |
778 | /// | |
779 | /// ``` | |
780 | /// use std::sync::mpsc; | |
781 | /// use std::thread; | |
782 | /// | |
783 | /// let (send, recv) = mpsc::channel(); | |
784 | /// let handle = thread::spawn(move || { | |
785 | /// send.send(1u8).unwrap(); | |
786 | /// }); | |
787 | /// | |
788 | /// handle.join().unwrap(); | |
789 | /// | |
790 | /// assert_eq!(Ok(1), recv.recv()); | |
791 | /// ``` | |
792 | /// | |
793 | /// Buffering behavior: | |
794 | /// | |
795 | /// ``` | |
796 | /// use std::sync::mpsc; | |
797 | /// use std::thread; | |
798 | /// use std::sync::mpsc::RecvError; | |
799 | /// | |
800 | /// let (send, recv) = mpsc::channel(); | |
801 | /// let handle = thread::spawn(move || { | |
802 | /// send.send(1u8).unwrap(); | |
803 | /// send.send(2).unwrap(); | |
804 | /// send.send(3).unwrap(); | |
805 | /// drop(send); | |
806 | /// }); | |
807 | /// | |
808 | /// // wait for the thread to join so we ensure the sender is dropped | |
809 | /// handle.join().unwrap(); | |
810 | /// | |
811 | /// assert_eq!(Ok(1), recv.recv()); | |
812 | /// assert_eq!(Ok(2), recv.recv()); | |
813 | /// assert_eq!(Ok(3), recv.recv()); | |
814 | /// assert_eq!(Err(RecvError), recv.recv()); | |
815 | /// ``` | |
85aaf69f | 816 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
817 | pub fn recv(&self) -> Result<T, RecvError> { |
818 | loop { | |
819 | let new_port = match *unsafe { self.inner() } { | |
820 | Flavor::Oneshot(ref p) => { | |
821 | match unsafe { (*p.get()).recv() } { | |
822 | Ok(t) => return Ok(t), | |
823 | Err(oneshot::Empty) => return unreachable!(), | |
824 | Err(oneshot::Disconnected) => return Err(RecvError), | |
825 | Err(oneshot::Upgraded(rx)) => rx, | |
826 | } | |
827 | } | |
828 | Flavor::Stream(ref p) => { | |
829 | match unsafe { (*p.get()).recv() } { | |
830 | Ok(t) => return Ok(t), | |
831 | Err(stream::Empty) => return unreachable!(), | |
832 | Err(stream::Disconnected) => return Err(RecvError), | |
833 | Err(stream::Upgraded(rx)) => rx, | |
834 | } | |
835 | } | |
836 | Flavor::Shared(ref p) => { | |
837 | match unsafe { (*p.get()).recv() } { | |
838 | Ok(t) => return Ok(t), | |
839 | Err(shared::Empty) => return unreachable!(), | |
840 | Err(shared::Disconnected) => return Err(RecvError), | |
841 | } | |
842 | } | |
843 | Flavor::Sync(ref p) => return unsafe { | |
844 | (*p.get()).recv().map_err(|()| RecvError) | |
845 | } | |
846 | }; | |
847 | unsafe { | |
848 | mem::swap(self.inner_mut(), new_port.inner_mut()); | |
849 | } | |
850 | } | |
851 | } | |
852 | ||
853 | /// Returns an iterator that will block waiting for messages, but never | |
854 | /// `panic!`. It will return `None` when the channel has hung up. | |
85aaf69f | 855 | #[stable(feature = "rust1", since = "1.0.0")] |
1a4d82fc JJ |
856 | pub fn iter(&self) -> Iter<T> { |
857 | Iter { rx: self } | |
858 | } | |
859 | } | |
860 | ||
c34b1796 | 861 | impl<T> select::Packet for Receiver<T> { |
1a4d82fc JJ |
862 | fn can_recv(&self) -> bool { |
863 | loop { | |
864 | let new_port = match *unsafe { self.inner() } { | |
865 | Flavor::Oneshot(ref p) => { | |
866 | match unsafe { (*p.get()).can_recv() } { | |
867 | Ok(ret) => return ret, | |
868 | Err(upgrade) => upgrade, | |
869 | } | |
870 | } | |
871 | Flavor::Stream(ref p) => { | |
872 | match unsafe { (*p.get()).can_recv() } { | |
873 | Ok(ret) => return ret, | |
874 | Err(upgrade) => upgrade, | |
875 | } | |
876 | } | |
877 | Flavor::Shared(ref p) => { | |
878 | return unsafe { (*p.get()).can_recv() }; | |
879 | } | |
880 | Flavor::Sync(ref p) => { | |
881 | return unsafe { (*p.get()).can_recv() }; | |
882 | } | |
883 | }; | |
884 | unsafe { | |
885 | mem::swap(self.inner_mut(), | |
886 | new_port.inner_mut()); | |
887 | } | |
888 | } | |
889 | } | |
890 | ||
891 | fn start_selection(&self, mut token: SignalToken) -> StartResult { | |
892 | loop { | |
893 | let (t, new_port) = match *unsafe { self.inner() } { | |
894 | Flavor::Oneshot(ref p) => { | |
895 | match unsafe { (*p.get()).start_selection(token) } { | |
896 | oneshot::SelSuccess => return Installed, | |
897 | oneshot::SelCanceled => return Abort, | |
898 | oneshot::SelUpgraded(t, rx) => (t, rx), | |
899 | } | |
900 | } | |
901 | Flavor::Stream(ref p) => { | |
902 | match unsafe { (*p.get()).start_selection(token) } { | |
903 | stream::SelSuccess => return Installed, | |
904 | stream::SelCanceled => return Abort, | |
905 | stream::SelUpgraded(t, rx) => (t, rx), | |
906 | } | |
907 | } | |
908 | Flavor::Shared(ref p) => { | |
909 | return unsafe { (*p.get()).start_selection(token) }; | |
910 | } | |
911 | Flavor::Sync(ref p) => { | |
912 | return unsafe { (*p.get()).start_selection(token) }; | |
913 | } | |
914 | }; | |
915 | token = t; | |
916 | unsafe { | |
917 | mem::swap(self.inner_mut(), new_port.inner_mut()); | |
918 | } | |
919 | } | |
920 | } | |
921 | ||
922 | fn abort_selection(&self) -> bool { | |
923 | let mut was_upgrade = false; | |
924 | loop { | |
925 | let result = match *unsafe { self.inner() } { | |
926 | Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, | |
927 | Flavor::Stream(ref p) => unsafe { | |
928 | (*p.get()).abort_selection(was_upgrade) | |
929 | }, | |
930 | Flavor::Shared(ref p) => return unsafe { | |
931 | (*p.get()).abort_selection(was_upgrade) | |
932 | }, | |
933 | Flavor::Sync(ref p) => return unsafe { | |
934 | (*p.get()).abort_selection() | |
935 | }, | |
936 | }; | |
937 | let new_port = match result { Ok(b) => return b, Err(p) => p }; | |
938 | was_upgrade = true; | |
939 | unsafe { | |
940 | mem::swap(self.inner_mut(), | |
941 | new_port.inner_mut()); | |
942 | } | |
943 | } | |
944 | } | |
945 | } | |
946 | ||
85aaf69f | 947 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 948 | impl<'a, T> Iterator for Iter<'a, T> { |
1a4d82fc JJ |
949 | type Item = T; |
950 | ||
951 | fn next(&mut self) -> Option<T> { self.rx.recv().ok() } | |
952 | } | |
953 | ||
d9579d0f AL |
954 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] |
955 | impl<'a, T> IntoIterator for &'a Receiver<T> { | |
956 | type Item = T; | |
957 | type IntoIter = Iter<'a, T>; | |
958 | ||
959 | fn into_iter(self) -> Iter<'a, T> { self.iter() } | |
960 | } | |
961 | ||
92a42be0 | 962 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] |
d9579d0f AL |
963 | impl<T> Iterator for IntoIter<T> { |
964 | type Item = T; | |
965 | fn next(&mut self) -> Option<T> { self.rx.recv().ok() } | |
966 | } | |
967 | ||
968 | #[stable(feature = "receiver_into_iter", since = "1.1.0")] | |
969 | impl <T> IntoIterator for Receiver<T> { | |
970 | type Item = T; | |
971 | type IntoIter = IntoIter<T>; | |
972 | ||
973 | fn into_iter(self) -> IntoIter<T> { | |
974 | IntoIter { rx: self } | |
975 | } | |
976 | } | |
977 | ||
85aaf69f | 978 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 979 | impl<T> Drop for Receiver<T> { |
1a4d82fc JJ |
980 | fn drop(&mut self) { |
981 | match *unsafe { self.inner_mut() } { | |
982 | Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, | |
983 | Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, | |
984 | Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, | |
985 | Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); }, | |
986 | } | |
987 | } | |
988 | } | |
989 | ||
85aaf69f SL |
990 | #[stable(feature = "rust1", since = "1.0.0")] |
991 | impl<T> fmt::Debug for SendError<T> { | |
992 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
993 | "SendError(..)".fmt(f) | |
1a4d82fc | 994 | } |
85aaf69f | 995 | } |
1a4d82fc | 996 | |
85aaf69f SL |
997 | #[stable(feature = "rust1", since = "1.0.0")] |
998 | impl<T> fmt::Display for SendError<T> { | |
999 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
1000 | "sending on a closed channel".fmt(f) | |
1a4d82fc | 1001 | } |
1a4d82fc JJ |
1002 | } |
1003 | ||
c34b1796 | 1004 | #[stable(feature = "rust1", since = "1.0.0")] |
bd371182 | 1005 | impl<T: Send + Reflect> error::Error for SendError<T> { |
c34b1796 AL |
1006 | fn description(&self) -> &str { |
1007 | "sending on a closed channel" | |
1008 | } | |
1009 | ||
1010 | fn cause(&self) -> Option<&error::Error> { | |
1011 | None | |
1012 | } | |
1013 | } | |
1014 | ||
85aaf69f SL |
1015 | #[stable(feature = "rust1", since = "1.0.0")] |
1016 | impl<T> fmt::Debug for TrySendError<T> { | |
1a4d82fc | 1017 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
85aaf69f SL |
1018 | match *self { |
1019 | TrySendError::Full(..) => "Full(..)".fmt(f), | |
1020 | TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), | |
1021 | } | |
1a4d82fc JJ |
1022 | } |
1023 | } | |
1024 | ||
85aaf69f SL |
1025 | #[stable(feature = "rust1", since = "1.0.0")] |
1026 | impl<T> fmt::Display for TrySendError<T> { | |
1a4d82fc JJ |
1027 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1028 | match *self { | |
1029 | TrySendError::Full(..) => { | |
1030 | "sending on a full channel".fmt(f) | |
1031 | } | |
1032 | TrySendError::Disconnected(..) => { | |
1033 | "sending on a closed channel".fmt(f) | |
1034 | } | |
1035 | } | |
1036 | } | |
1037 | } | |
1038 | ||
c34b1796 | 1039 | #[stable(feature = "rust1", since = "1.0.0")] |
bd371182 | 1040 | impl<T: Send + Reflect> error::Error for TrySendError<T> { |
c34b1796 AL |
1041 | |
1042 | fn description(&self) -> &str { | |
1043 | match *self { | |
1044 | TrySendError::Full(..) => { | |
1045 | "sending on a full channel" | |
1046 | } | |
1047 | TrySendError::Disconnected(..) => { | |
1048 | "sending on a closed channel" | |
1049 | } | |
1050 | } | |
1051 | } | |
1052 | ||
1053 | fn cause(&self) -> Option<&error::Error> { | |
1054 | None | |
1055 | } | |
1056 | } | |
1057 | ||
85aaf69f SL |
1058 | #[stable(feature = "rust1", since = "1.0.0")] |
1059 | impl fmt::Display for RecvError { | |
1a4d82fc JJ |
1060 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1061 | "receiving on a closed channel".fmt(f) | |
1062 | } | |
1063 | } | |
1064 | ||
c34b1796 AL |
1065 | #[stable(feature = "rust1", since = "1.0.0")] |
1066 | impl error::Error for RecvError { | |
1067 | ||
1068 | fn description(&self) -> &str { | |
1069 | "receiving on a closed channel" | |
1070 | } | |
1071 | ||
1072 | fn cause(&self) -> Option<&error::Error> { | |
1073 | None | |
1074 | } | |
1075 | } | |
1076 | ||
85aaf69f SL |
1077 | #[stable(feature = "rust1", since = "1.0.0")] |
1078 | impl fmt::Display for TryRecvError { | |
1a4d82fc JJ |
1079 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1080 | match *self { | |
1081 | TryRecvError::Empty => { | |
1082 | "receiving on an empty channel".fmt(f) | |
1083 | } | |
1084 | TryRecvError::Disconnected => { | |
1085 | "receiving on a closed channel".fmt(f) | |
1086 | } | |
1087 | } | |
1088 | } | |
1089 | } | |
1090 | ||
c34b1796 AL |
1091 | #[stable(feature = "rust1", since = "1.0.0")] |
1092 | impl error::Error for TryRecvError { | |
1093 | ||
1094 | fn description(&self) -> &str { | |
1095 | match *self { | |
1096 | TryRecvError::Empty => { | |
1097 | "receiving on an empty channel" | |
1098 | } | |
1099 | TryRecvError::Disconnected => { | |
1100 | "receiving on a closed channel" | |
1101 | } | |
1102 | } | |
1103 | } | |
1104 | ||
1105 | fn cause(&self) -> Option<&error::Error> { | |
1106 | None | |
1107 | } | |
1108 | } | |
1109 | ||
1a4d82fc | 1110 | #[cfg(test)] |
d9579d0f | 1111 | mod tests { |
1a4d82fc JJ |
1112 | use prelude::v1::*; |
1113 | ||
c1a9b12d | 1114 | use env; |
1a4d82fc | 1115 | use super::*; |
85aaf69f | 1116 | use thread; |
1a4d82fc | 1117 | |
c34b1796 | 1118 | pub fn stress_factor() -> usize { |
85aaf69f SL |
1119 | match env::var("RUST_TEST_STRESS") { |
1120 | Ok(val) => val.parse().unwrap(), | |
1121 | Err(..) => 1, | |
1a4d82fc JJ |
1122 | } |
1123 | } | |
1124 | ||
1125 | #[test] | |
1126 | fn smoke() { | |
c34b1796 | 1127 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1128 | tx.send(1).unwrap(); |
1129 | assert_eq!(rx.recv().unwrap(), 1); | |
1130 | } | |
1131 | ||
1132 | #[test] | |
1133 | fn drop_full() { | |
c34b1796 | 1134 | let (tx, _rx) = channel::<Box<isize>>(); |
85aaf69f | 1135 | tx.send(box 1).unwrap(); |
1a4d82fc JJ |
1136 | } |
1137 | ||
1138 | #[test] | |
1139 | fn drop_full_shared() { | |
c34b1796 | 1140 | let (tx, _rx) = channel::<Box<isize>>(); |
1a4d82fc JJ |
1141 | drop(tx.clone()); |
1142 | drop(tx.clone()); | |
85aaf69f | 1143 | tx.send(box 1).unwrap(); |
1a4d82fc JJ |
1144 | } |
1145 | ||
1146 | #[test] | |
1147 | fn smoke_shared() { | |
c34b1796 | 1148 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1149 | tx.send(1).unwrap(); |
1150 | assert_eq!(rx.recv().unwrap(), 1); | |
1151 | let tx = tx.clone(); | |
1152 | tx.send(1).unwrap(); | |
1153 | assert_eq!(rx.recv().unwrap(), 1); | |
1154 | } | |
1155 | ||
1156 | #[test] | |
1157 | fn smoke_threads() { | |
c34b1796 | 1158 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 1159 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1160 | tx.send(1).unwrap(); |
1161 | }); | |
1162 | assert_eq!(rx.recv().unwrap(), 1); | |
1163 | } | |
1164 | ||
1165 | #[test] | |
1166 | fn smoke_port_gone() { | |
c34b1796 | 1167 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1168 | drop(rx); |
1169 | assert!(tx.send(1).is_err()); | |
1170 | } | |
1171 | ||
1172 | #[test] | |
1173 | fn smoke_shared_port_gone() { | |
c34b1796 | 1174 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1175 | drop(rx); |
1176 | assert!(tx.send(1).is_err()) | |
1177 | } | |
1178 | ||
1179 | #[test] | |
1180 | fn smoke_shared_port_gone2() { | |
c34b1796 | 1181 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1182 | drop(rx); |
1183 | let tx2 = tx.clone(); | |
1184 | drop(tx); | |
1185 | assert!(tx2.send(1).is_err()); | |
1186 | } | |
1187 | ||
1188 | #[test] | |
1189 | fn port_gone_concurrent() { | |
c34b1796 | 1190 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 1191 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1192 | rx.recv().unwrap(); |
1193 | }); | |
1194 | while tx.send(1).is_ok() {} | |
1195 | } | |
1196 | ||
1197 | #[test] | |
1198 | fn port_gone_concurrent_shared() { | |
c34b1796 | 1199 | let (tx, rx) = channel::<i32>(); |
1a4d82fc | 1200 | let tx2 = tx.clone(); |
85aaf69f | 1201 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1202 | rx.recv().unwrap(); |
1203 | }); | |
1204 | while tx.send(1).is_ok() && tx2.send(1).is_ok() {} | |
1205 | } | |
1206 | ||
1207 | #[test] | |
1208 | fn smoke_chan_gone() { | |
c34b1796 | 1209 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1210 | drop(tx); |
1211 | assert!(rx.recv().is_err()); | |
1212 | } | |
1213 | ||
1214 | #[test] | |
1215 | fn smoke_chan_gone_shared() { | |
1216 | let (tx, rx) = channel::<()>(); | |
1217 | let tx2 = tx.clone(); | |
1218 | drop(tx); | |
1219 | drop(tx2); | |
1220 | assert!(rx.recv().is_err()); | |
1221 | } | |
1222 | ||
1223 | #[test] | |
1224 | fn chan_gone_concurrent() { | |
c34b1796 | 1225 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 1226 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1227 | tx.send(1).unwrap(); |
1228 | tx.send(1).unwrap(); | |
1229 | }); | |
1230 | while rx.recv().is_ok() {} | |
1231 | } | |
1232 | ||
1233 | #[test] | |
1234 | fn stress() { | |
c34b1796 | 1235 | let (tx, rx) = channel::<i32>(); |
85aaf69f SL |
1236 | let t = thread::spawn(move|| { |
1237 | for _ in 0..10000 { tx.send(1).unwrap(); } | |
1a4d82fc | 1238 | }); |
85aaf69f | 1239 | for _ in 0..10000 { |
1a4d82fc JJ |
1240 | assert_eq!(rx.recv().unwrap(), 1); |
1241 | } | |
1242 | t.join().ok().unwrap(); | |
1243 | } | |
1244 | ||
1245 | #[test] | |
1246 | fn stress_shared() { | |
c34b1796 AL |
1247 | const AMT: u32 = 10000; |
1248 | const NTHREADS: u32 = 8; | |
1249 | let (tx, rx) = channel::<i32>(); | |
1a4d82fc | 1250 | |
85aaf69f SL |
1251 | let t = thread::spawn(move|| { |
1252 | for _ in 0..AMT * NTHREADS { | |
1a4d82fc JJ |
1253 | assert_eq!(rx.recv().unwrap(), 1); |
1254 | } | |
1255 | match rx.try_recv() { | |
1256 | Ok(..) => panic!(), | |
1257 | _ => {} | |
1258 | } | |
1259 | }); | |
1260 | ||
85aaf69f | 1261 | for _ in 0..NTHREADS { |
1a4d82fc | 1262 | let tx = tx.clone(); |
85aaf69f SL |
1263 | thread::spawn(move|| { |
1264 | for _ in 0..AMT { tx.send(1).unwrap(); } | |
1a4d82fc JJ |
1265 | }); |
1266 | } | |
1267 | drop(tx); | |
1268 | t.join().ok().unwrap(); | |
1269 | } | |
1270 | ||
1271 | #[test] | |
1272 | fn send_from_outside_runtime() { | |
1273 | let (tx1, rx1) = channel::<()>(); | |
c34b1796 | 1274 | let (tx2, rx2) = channel::<i32>(); |
85aaf69f | 1275 | let t1 = thread::spawn(move|| { |
1a4d82fc | 1276 | tx1.send(()).unwrap(); |
85aaf69f | 1277 | for _ in 0..40 { |
1a4d82fc JJ |
1278 | assert_eq!(rx2.recv().unwrap(), 1); |
1279 | } | |
1280 | }); | |
1281 | rx1.recv().unwrap(); | |
85aaf69f SL |
1282 | let t2 = thread::spawn(move|| { |
1283 | for _ in 0..40 { | |
1a4d82fc JJ |
1284 | tx2.send(1).unwrap(); |
1285 | } | |
1286 | }); | |
1287 | t1.join().ok().unwrap(); | |
1288 | t2.join().ok().unwrap(); | |
1289 | } | |
1290 | ||
1291 | #[test] | |
1292 | fn recv_from_outside_runtime() { | |
c34b1796 | 1293 | let (tx, rx) = channel::<i32>(); |
85aaf69f SL |
1294 | let t = thread::spawn(move|| { |
1295 | for _ in 0..40 { | |
1a4d82fc JJ |
1296 | assert_eq!(rx.recv().unwrap(), 1); |
1297 | } | |
1298 | }); | |
85aaf69f | 1299 | for _ in 0..40 { |
1a4d82fc JJ |
1300 | tx.send(1).unwrap(); |
1301 | } | |
1302 | t.join().ok().unwrap(); | |
1303 | } | |
1304 | ||
1305 | #[test] | |
1306 | fn no_runtime() { | |
c34b1796 AL |
1307 | let (tx1, rx1) = channel::<i32>(); |
1308 | let (tx2, rx2) = channel::<i32>(); | |
85aaf69f | 1309 | let t1 = thread::spawn(move|| { |
1a4d82fc JJ |
1310 | assert_eq!(rx1.recv().unwrap(), 1); |
1311 | tx2.send(2).unwrap(); | |
1312 | }); | |
85aaf69f | 1313 | let t2 = thread::spawn(move|| { |
1a4d82fc JJ |
1314 | tx1.send(1).unwrap(); |
1315 | assert_eq!(rx2.recv().unwrap(), 2); | |
1316 | }); | |
1317 | t1.join().ok().unwrap(); | |
1318 | t2.join().ok().unwrap(); | |
1319 | } | |
1320 | ||
1321 | #[test] | |
1322 | fn oneshot_single_thread_close_port_first() { | |
1323 | // Simple test of closing without sending | |
c34b1796 | 1324 | let (_tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1325 | drop(rx); |
1326 | } | |
1327 | ||
1328 | #[test] | |
1329 | fn oneshot_single_thread_close_chan_first() { | |
1330 | // Simple test of closing without sending | |
c34b1796 | 1331 | let (tx, _rx) = channel::<i32>(); |
1a4d82fc JJ |
1332 | drop(tx); |
1333 | } | |
1334 | ||
1335 | #[test] | |
1336 | fn oneshot_single_thread_send_port_close() { | |
1337 | // Testing that the sender cleans up the payload if receiver is closed | |
c34b1796 | 1338 | let (tx, rx) = channel::<Box<i32>>(); |
1a4d82fc JJ |
1339 | drop(rx); |
1340 | assert!(tx.send(box 0).is_err()); | |
1341 | } | |
1342 | ||
1343 | #[test] | |
1344 | fn oneshot_single_thread_recv_chan_close() { | |
1345 | // Receiving on a closed chan will panic | |
85aaf69f | 1346 | let res = thread::spawn(move|| { |
c34b1796 | 1347 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1348 | drop(tx); |
1349 | rx.recv().unwrap(); | |
1350 | }).join(); | |
1351 | // What is our res? | |
1352 | assert!(res.is_err()); | |
1353 | } | |
1354 | ||
1355 | #[test] | |
1356 | fn oneshot_single_thread_send_then_recv() { | |
c34b1796 | 1357 | let (tx, rx) = channel::<Box<i32>>(); |
1a4d82fc JJ |
1358 | tx.send(box 10).unwrap(); |
1359 | assert!(rx.recv().unwrap() == box 10); | |
1360 | } | |
1361 | ||
1362 | #[test] | |
1363 | fn oneshot_single_thread_try_send_open() { | |
c34b1796 | 1364 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1365 | assert!(tx.send(10).is_ok()); |
1366 | assert!(rx.recv().unwrap() == 10); | |
1367 | } | |
1368 | ||
1369 | #[test] | |
1370 | fn oneshot_single_thread_try_send_closed() { | |
c34b1796 | 1371 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1372 | drop(rx); |
1373 | assert!(tx.send(10).is_err()); | |
1374 | } | |
1375 | ||
1376 | #[test] | |
1377 | fn oneshot_single_thread_try_recv_open() { | |
c34b1796 | 1378 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1379 | tx.send(10).unwrap(); |
1380 | assert!(rx.recv() == Ok(10)); | |
1381 | } | |
1382 | ||
1383 | #[test] | |
1384 | fn oneshot_single_thread_try_recv_closed() { | |
c34b1796 | 1385 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1386 | drop(tx); |
1387 | assert!(rx.recv().is_err()); | |
1388 | } | |
1389 | ||
1390 | #[test] | |
1391 | fn oneshot_single_thread_peek_data() { | |
c34b1796 | 1392 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1393 | assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); |
1394 | tx.send(10).unwrap(); | |
1395 | assert_eq!(rx.try_recv(), Ok(10)); | |
1396 | } | |
1397 | ||
1398 | #[test] | |
1399 | fn oneshot_single_thread_peek_close() { | |
c34b1796 | 1400 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1401 | drop(tx); |
1402 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); | |
1403 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); | |
1404 | } | |
1405 | ||
1406 | #[test] | |
1407 | fn oneshot_single_thread_peek_open() { | |
c34b1796 | 1408 | let (_tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1409 | assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); |
1410 | } | |
1411 | ||
1412 | #[test] | |
1413 | fn oneshot_multi_task_recv_then_send() { | |
c34b1796 | 1414 | let (tx, rx) = channel::<Box<i32>>(); |
85aaf69f | 1415 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1416 | assert!(rx.recv().unwrap() == box 10); |
1417 | }); | |
1418 | ||
1419 | tx.send(box 10).unwrap(); | |
1420 | } | |
1421 | ||
1422 | #[test] | |
1423 | fn oneshot_multi_task_recv_then_close() { | |
c34b1796 | 1424 | let (tx, rx) = channel::<Box<i32>>(); |
85aaf69f | 1425 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1426 | drop(tx); |
1427 | }); | |
85aaf69f | 1428 | let res = thread::spawn(move|| { |
1a4d82fc JJ |
1429 | assert!(rx.recv().unwrap() == box 10); |
1430 | }).join(); | |
1431 | assert!(res.is_err()); | |
1432 | } | |
1433 | ||
1434 | #[test] | |
1435 | fn oneshot_multi_thread_close_stress() { | |
85aaf69f | 1436 | for _ in 0..stress_factor() { |
c34b1796 | 1437 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 1438 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1439 | drop(rx); |
1440 | }); | |
1441 | drop(tx); | |
1442 | } | |
1443 | } | |
1444 | ||
1445 | #[test] | |
1446 | fn oneshot_multi_thread_send_close_stress() { | |
85aaf69f | 1447 | for _ in 0..stress_factor() { |
c34b1796 | 1448 | let (tx, rx) = channel::<i32>(); |
85aaf69f | 1449 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1450 | drop(rx); |
1451 | }); | |
85aaf69f | 1452 | let _ = thread::spawn(move|| { |
1a4d82fc JJ |
1453 | tx.send(1).unwrap(); |
1454 | }).join(); | |
1455 | } | |
1456 | } | |
1457 | ||
1458 | #[test] | |
1459 | fn oneshot_multi_thread_recv_close_stress() { | |
85aaf69f | 1460 | for _ in 0..stress_factor() { |
c34b1796 | 1461 | let (tx, rx) = channel::<i32>(); |
85aaf69f SL |
1462 | thread::spawn(move|| { |
1463 | let res = thread::spawn(move|| { | |
1a4d82fc JJ |
1464 | rx.recv().unwrap(); |
1465 | }).join(); | |
1466 | assert!(res.is_err()); | |
1467 | }); | |
85aaf69f SL |
1468 | let _t = thread::spawn(move|| { |
1469 | thread::spawn(move|| { | |
1a4d82fc JJ |
1470 | drop(tx); |
1471 | }); | |
1472 | }); | |
1473 | } | |
1474 | } | |
1475 | ||
1476 | #[test] | |
1477 | fn oneshot_multi_thread_send_recv_stress() { | |
85aaf69f | 1478 | for _ in 0..stress_factor() { |
c34b1796 | 1479 | let (tx, rx) = channel::<Box<isize>>(); |
85aaf69f SL |
1480 | let _t = thread::spawn(move|| { |
1481 | tx.send(box 10).unwrap(); | |
1a4d82fc | 1482 | }); |
85aaf69f | 1483 | assert!(rx.recv().unwrap() == box 10); |
1a4d82fc JJ |
1484 | } |
1485 | } | |
1486 | ||
1487 | #[test] | |
1488 | fn stream_send_recv_stress() { | |
85aaf69f | 1489 | for _ in 0..stress_factor() { |
1a4d82fc JJ |
1490 | let (tx, rx) = channel(); |
1491 | ||
1492 | send(tx, 0); | |
1493 | recv(rx, 0); | |
1494 | ||
c34b1796 | 1495 | fn send(tx: Sender<Box<i32>>, i: i32) { |
1a4d82fc JJ |
1496 | if i == 10 { return } |
1497 | ||
85aaf69f | 1498 | thread::spawn(move|| { |
1a4d82fc JJ |
1499 | tx.send(box i).unwrap(); |
1500 | send(tx, i + 1); | |
1501 | }); | |
1502 | } | |
1503 | ||
c34b1796 | 1504 | fn recv(rx: Receiver<Box<i32>>, i: i32) { |
1a4d82fc JJ |
1505 | if i == 10 { return } |
1506 | ||
85aaf69f | 1507 | thread::spawn(move|| { |
1a4d82fc JJ |
1508 | assert!(rx.recv().unwrap() == box i); |
1509 | recv(rx, i + 1); | |
1510 | }); | |
1511 | } | |
1512 | } | |
1513 | } | |
1514 | ||
1515 | #[test] | |
1516 | fn recv_a_lot() { | |
1517 | // Regression test that we don't run out of stack in scheduler context | |
1518 | let (tx, rx) = channel(); | |
85aaf69f SL |
1519 | for _ in 0..10000 { tx.send(()).unwrap(); } |
1520 | for _ in 0..10000 { rx.recv().unwrap(); } | |
1a4d82fc JJ |
1521 | } |
1522 | ||
1523 | #[test] | |
1524 | fn shared_chan_stress() { | |
1525 | let (tx, rx) = channel(); | |
1526 | let total = stress_factor() + 100; | |
85aaf69f | 1527 | for _ in 0..total { |
1a4d82fc | 1528 | let tx = tx.clone(); |
85aaf69f | 1529 | thread::spawn(move|| { |
1a4d82fc JJ |
1530 | tx.send(()).unwrap(); |
1531 | }); | |
1532 | } | |
1533 | ||
85aaf69f | 1534 | for _ in 0..total { |
1a4d82fc JJ |
1535 | rx.recv().unwrap(); |
1536 | } | |
1537 | } | |
1538 | ||
1539 | #[test] | |
1540 | fn test_nested_recv_iter() { | |
c34b1796 AL |
1541 | let (tx, rx) = channel::<i32>(); |
1542 | let (total_tx, total_rx) = channel::<i32>(); | |
1a4d82fc | 1543 | |
85aaf69f | 1544 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1545 | let mut acc = 0; |
1546 | for x in rx.iter() { | |
1547 | acc += x; | |
1548 | } | |
1549 | total_tx.send(acc).unwrap(); | |
1550 | }); | |
1551 | ||
1552 | tx.send(3).unwrap(); | |
1553 | tx.send(1).unwrap(); | |
1554 | tx.send(2).unwrap(); | |
1555 | drop(tx); | |
1556 | assert_eq!(total_rx.recv().unwrap(), 6); | |
1557 | } | |
1558 | ||
1559 | #[test] | |
1560 | fn test_recv_iter_break() { | |
c34b1796 | 1561 | let (tx, rx) = channel::<i32>(); |
1a4d82fc JJ |
1562 | let (count_tx, count_rx) = channel(); |
1563 | ||
85aaf69f | 1564 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1565 | let mut count = 0; |
1566 | for x in rx.iter() { | |
1567 | if count >= 3 { | |
1568 | break; | |
1569 | } else { | |
1570 | count += x; | |
1571 | } | |
1572 | } | |
1573 | count_tx.send(count).unwrap(); | |
1574 | }); | |
1575 | ||
1576 | tx.send(2).unwrap(); | |
1577 | tx.send(2).unwrap(); | |
1578 | tx.send(2).unwrap(); | |
1579 | let _ = tx.send(2); | |
1580 | drop(tx); | |
1581 | assert_eq!(count_rx.recv().unwrap(), 4); | |
1582 | } | |
1583 | ||
d9579d0f AL |
1584 | #[test] |
1585 | fn test_recv_into_iter_owned() { | |
1586 | let mut iter = { | |
1587 | let (tx, rx) = channel::<i32>(); | |
1588 | tx.send(1).unwrap(); | |
1589 | tx.send(2).unwrap(); | |
1590 | ||
1591 | rx.into_iter() | |
1592 | }; | |
1593 | assert_eq!(iter.next().unwrap(), 1); | |
1594 | assert_eq!(iter.next().unwrap(), 2); | |
1595 | assert_eq!(iter.next().is_none(), true); | |
1596 | } | |
1597 | ||
1598 | #[test] | |
1599 | fn test_recv_into_iter_borrowed() { | |
1600 | let (tx, rx) = channel::<i32>(); | |
1601 | tx.send(1).unwrap(); | |
1602 | tx.send(2).unwrap(); | |
1603 | drop(tx); | |
1604 | let mut iter = (&rx).into_iter(); | |
1605 | assert_eq!(iter.next().unwrap(), 1); | |
1606 | assert_eq!(iter.next().unwrap(), 2); | |
1607 | assert_eq!(iter.next().is_none(), true); | |
1608 | } | |
1609 | ||
1a4d82fc JJ |
1610 | #[test] |
1611 | fn try_recv_states() { | |
c34b1796 | 1612 | let (tx1, rx1) = channel::<i32>(); |
1a4d82fc JJ |
1613 | let (tx2, rx2) = channel::<()>(); |
1614 | let (tx3, rx3) = channel::<()>(); | |
85aaf69f | 1615 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1616 | rx2.recv().unwrap(); |
1617 | tx1.send(1).unwrap(); | |
1618 | tx3.send(()).unwrap(); | |
1619 | rx2.recv().unwrap(); | |
1620 | drop(tx1); | |
1621 | tx3.send(()).unwrap(); | |
1622 | }); | |
1623 | ||
1624 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); | |
1625 | tx2.send(()).unwrap(); | |
1626 | rx3.recv().unwrap(); | |
1627 | assert_eq!(rx1.try_recv(), Ok(1)); | |
1628 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); | |
1629 | tx2.send(()).unwrap(); | |
1630 | rx3.recv().unwrap(); | |
1631 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); | |
1632 | } | |
1633 | ||
1634 | // This bug used to end up in a livelock inside of the Receiver destructor | |
1635 | // because the internal state of the Shared packet was corrupted | |
1636 | #[test] | |
1637 | fn destroy_upgraded_shared_port_when_sender_still_active() { | |
1638 | let (tx, rx) = channel(); | |
1639 | let (tx2, rx2) = channel(); | |
85aaf69f | 1640 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1641 | rx.recv().unwrap(); // wait on a oneshot |
1642 | drop(rx); // destroy a shared | |
1643 | tx2.send(()).unwrap(); | |
1644 | }); | |
bd371182 | 1645 | // make sure the other thread has gone to sleep |
85aaf69f | 1646 | for _ in 0..5000 { thread::yield_now(); } |
1a4d82fc JJ |
1647 | |
1648 | // upgrade to a shared chan and send a message | |
1649 | let t = tx.clone(); | |
1650 | drop(tx); | |
1651 | t.send(()).unwrap(); | |
1652 | ||
bd371182 | 1653 | // wait for the child thread to exit before we exit |
1a4d82fc JJ |
1654 | rx2.recv().unwrap(); |
1655 | } | |
1656 | } | |
1657 | ||
1658 | #[cfg(test)] | |
1659 | mod sync_tests { | |
1660 | use prelude::v1::*; | |
1661 | ||
c1a9b12d | 1662 | use env; |
85aaf69f | 1663 | use thread; |
1a4d82fc JJ |
1664 | use super::*; |
1665 | ||
c34b1796 | 1666 | pub fn stress_factor() -> usize { |
85aaf69f SL |
1667 | match env::var("RUST_TEST_STRESS") { |
1668 | Ok(val) => val.parse().unwrap(), | |
1669 | Err(..) => 1, | |
1a4d82fc JJ |
1670 | } |
1671 | } | |
1672 | ||
1673 | #[test] | |
1674 | fn smoke() { | |
c34b1796 | 1675 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
1676 | tx.send(1).unwrap(); |
1677 | assert_eq!(rx.recv().unwrap(), 1); | |
1678 | } | |
1679 | ||
1680 | #[test] | |
1681 | fn drop_full() { | |
c34b1796 | 1682 | let (tx, _rx) = sync_channel::<Box<isize>>(1); |
85aaf69f | 1683 | tx.send(box 1).unwrap(); |
1a4d82fc JJ |
1684 | } |
1685 | ||
1686 | #[test] | |
1687 | fn smoke_shared() { | |
c34b1796 | 1688 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
1689 | tx.send(1).unwrap(); |
1690 | assert_eq!(rx.recv().unwrap(), 1); | |
1691 | let tx = tx.clone(); | |
1692 | tx.send(1).unwrap(); | |
1693 | assert_eq!(rx.recv().unwrap(), 1); | |
1694 | } | |
1695 | ||
1696 | #[test] | |
1697 | fn smoke_threads() { | |
c34b1796 | 1698 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 1699 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1700 | tx.send(1).unwrap(); |
1701 | }); | |
1702 | assert_eq!(rx.recv().unwrap(), 1); | |
1703 | } | |
1704 | ||
1705 | #[test] | |
1706 | fn smoke_port_gone() { | |
c34b1796 | 1707 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1708 | drop(rx); |
1709 | assert!(tx.send(1).is_err()); | |
1710 | } | |
1711 | ||
1712 | #[test] | |
1713 | fn smoke_shared_port_gone2() { | |
c34b1796 | 1714 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1715 | drop(rx); |
1716 | let tx2 = tx.clone(); | |
1717 | drop(tx); | |
1718 | assert!(tx2.send(1).is_err()); | |
1719 | } | |
1720 | ||
1721 | #[test] | |
1722 | fn port_gone_concurrent() { | |
c34b1796 | 1723 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 1724 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1725 | rx.recv().unwrap(); |
1726 | }); | |
1727 | while tx.send(1).is_ok() {} | |
1728 | } | |
1729 | ||
1730 | #[test] | |
1731 | fn port_gone_concurrent_shared() { | |
c34b1796 | 1732 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc | 1733 | let tx2 = tx.clone(); |
85aaf69f | 1734 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1735 | rx.recv().unwrap(); |
1736 | }); | |
1737 | while tx.send(1).is_ok() && tx2.send(1).is_ok() {} | |
1738 | } | |
1739 | ||
1740 | #[test] | |
1741 | fn smoke_chan_gone() { | |
c34b1796 | 1742 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1743 | drop(tx); |
1744 | assert!(rx.recv().is_err()); | |
1745 | } | |
1746 | ||
1747 | #[test] | |
1748 | fn smoke_chan_gone_shared() { | |
1749 | let (tx, rx) = sync_channel::<()>(0); | |
1750 | let tx2 = tx.clone(); | |
1751 | drop(tx); | |
1752 | drop(tx2); | |
1753 | assert!(rx.recv().is_err()); | |
1754 | } | |
1755 | ||
1756 | #[test] | |
1757 | fn chan_gone_concurrent() { | |
c34b1796 | 1758 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 1759 | thread::spawn(move|| { |
1a4d82fc JJ |
1760 | tx.send(1).unwrap(); |
1761 | tx.send(1).unwrap(); | |
1762 | }); | |
1763 | while rx.recv().is_ok() {} | |
1764 | } | |
1765 | ||
1766 | #[test] | |
1767 | fn stress() { | |
c34b1796 | 1768 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f SL |
1769 | thread::spawn(move|| { |
1770 | for _ in 0..10000 { tx.send(1).unwrap(); } | |
1a4d82fc | 1771 | }); |
85aaf69f | 1772 | for _ in 0..10000 { |
1a4d82fc JJ |
1773 | assert_eq!(rx.recv().unwrap(), 1); |
1774 | } | |
1775 | } | |
1776 | ||
1777 | #[test] | |
1778 | fn stress_shared() { | |
c34b1796 AL |
1779 | const AMT: u32 = 1000; |
1780 | const NTHREADS: u32 = 8; | |
1781 | let (tx, rx) = sync_channel::<i32>(0); | |
1a4d82fc JJ |
1782 | let (dtx, drx) = sync_channel::<()>(0); |
1783 | ||
85aaf69f SL |
1784 | thread::spawn(move|| { |
1785 | for _ in 0..AMT * NTHREADS { | |
1a4d82fc JJ |
1786 | assert_eq!(rx.recv().unwrap(), 1); |
1787 | } | |
1788 | match rx.try_recv() { | |
1789 | Ok(..) => panic!(), | |
1790 | _ => {} | |
1791 | } | |
1792 | dtx.send(()).unwrap(); | |
1793 | }); | |
1794 | ||
85aaf69f | 1795 | for _ in 0..NTHREADS { |
1a4d82fc | 1796 | let tx = tx.clone(); |
85aaf69f SL |
1797 | thread::spawn(move|| { |
1798 | for _ in 0..AMT { tx.send(1).unwrap(); } | |
1a4d82fc JJ |
1799 | }); |
1800 | } | |
1801 | drop(tx); | |
1802 | drx.recv().unwrap(); | |
1803 | } | |
1804 | ||
1805 | #[test] | |
1806 | fn oneshot_single_thread_close_port_first() { | |
1807 | // Simple test of closing without sending | |
c34b1796 | 1808 | let (_tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1809 | drop(rx); |
1810 | } | |
1811 | ||
1812 | #[test] | |
1813 | fn oneshot_single_thread_close_chan_first() { | |
1814 | // Simple test of closing without sending | |
c34b1796 | 1815 | let (tx, _rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1816 | drop(tx); |
1817 | } | |
1818 | ||
1819 | #[test] | |
1820 | fn oneshot_single_thread_send_port_close() { | |
1821 | // Testing that the sender cleans up the payload if receiver is closed | |
c34b1796 | 1822 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
1a4d82fc JJ |
1823 | drop(rx); |
1824 | assert!(tx.send(box 0).is_err()); | |
1825 | } | |
1826 | ||
1827 | #[test] | |
1828 | fn oneshot_single_thread_recv_chan_close() { | |
1829 | // Receiving on a closed chan will panic | |
85aaf69f | 1830 | let res = thread::spawn(move|| { |
c34b1796 | 1831 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1832 | drop(tx); |
1833 | rx.recv().unwrap(); | |
1834 | }).join(); | |
1835 | // What is our res? | |
1836 | assert!(res.is_err()); | |
1837 | } | |
1838 | ||
1839 | #[test] | |
1840 | fn oneshot_single_thread_send_then_recv() { | |
c34b1796 | 1841 | let (tx, rx) = sync_channel::<Box<i32>>(1); |
1a4d82fc JJ |
1842 | tx.send(box 10).unwrap(); |
1843 | assert!(rx.recv().unwrap() == box 10); | |
1844 | } | |
1845 | ||
1846 | #[test] | |
1847 | fn oneshot_single_thread_try_send_open() { | |
c34b1796 | 1848 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
1849 | assert_eq!(tx.try_send(10), Ok(())); |
1850 | assert!(rx.recv().unwrap() == 10); | |
1851 | } | |
1852 | ||
1853 | #[test] | |
1854 | fn oneshot_single_thread_try_send_closed() { | |
c34b1796 | 1855 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1856 | drop(rx); |
1857 | assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); | |
1858 | } | |
1859 | ||
1860 | #[test] | |
1861 | fn oneshot_single_thread_try_send_closed2() { | |
c34b1796 | 1862 | let (tx, _rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1863 | assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); |
1864 | } | |
1865 | ||
1866 | #[test] | |
1867 | fn oneshot_single_thread_try_recv_open() { | |
c34b1796 | 1868 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
1869 | tx.send(10).unwrap(); |
1870 | assert!(rx.recv() == Ok(10)); | |
1871 | } | |
1872 | ||
1873 | #[test] | |
1874 | fn oneshot_single_thread_try_recv_closed() { | |
c34b1796 | 1875 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1876 | drop(tx); |
1877 | assert!(rx.recv().is_err()); | |
1878 | } | |
1879 | ||
1880 | #[test] | |
1881 | fn oneshot_single_thread_peek_data() { | |
c34b1796 | 1882 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
1883 | assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); |
1884 | tx.send(10).unwrap(); | |
1885 | assert_eq!(rx.try_recv(), Ok(10)); | |
1886 | } | |
1887 | ||
1888 | #[test] | |
1889 | fn oneshot_single_thread_peek_close() { | |
c34b1796 | 1890 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1891 | drop(tx); |
1892 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); | |
1893 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); | |
1894 | } | |
1895 | ||
1896 | #[test] | |
1897 | fn oneshot_single_thread_peek_open() { | |
c34b1796 | 1898 | let (_tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
1899 | assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); |
1900 | } | |
1901 | ||
1902 | #[test] | |
1903 | fn oneshot_multi_task_recv_then_send() { | |
c34b1796 | 1904 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
85aaf69f | 1905 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1906 | assert!(rx.recv().unwrap() == box 10); |
1907 | }); | |
1908 | ||
1909 | tx.send(box 10).unwrap(); | |
1910 | } | |
1911 | ||
1912 | #[test] | |
1913 | fn oneshot_multi_task_recv_then_close() { | |
c34b1796 | 1914 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
85aaf69f | 1915 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1916 | drop(tx); |
1917 | }); | |
85aaf69f | 1918 | let res = thread::spawn(move|| { |
1a4d82fc JJ |
1919 | assert!(rx.recv().unwrap() == box 10); |
1920 | }).join(); | |
1921 | assert!(res.is_err()); | |
1922 | } | |
1923 | ||
1924 | #[test] | |
1925 | fn oneshot_multi_thread_close_stress() { | |
85aaf69f | 1926 | for _ in 0..stress_factor() { |
c34b1796 | 1927 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 1928 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1929 | drop(rx); |
1930 | }); | |
1931 | drop(tx); | |
1932 | } | |
1933 | } | |
1934 | ||
1935 | #[test] | |
1936 | fn oneshot_multi_thread_send_close_stress() { | |
85aaf69f | 1937 | for _ in 0..stress_factor() { |
c34b1796 | 1938 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 1939 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
1940 | drop(rx); |
1941 | }); | |
85aaf69f | 1942 | let _ = thread::spawn(move || { |
1a4d82fc JJ |
1943 | tx.send(1).unwrap(); |
1944 | }).join(); | |
1945 | } | |
1946 | } | |
1947 | ||
1948 | #[test] | |
1949 | fn oneshot_multi_thread_recv_close_stress() { | |
85aaf69f | 1950 | for _ in 0..stress_factor() { |
c34b1796 | 1951 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f SL |
1952 | let _t = thread::spawn(move|| { |
1953 | let res = thread::spawn(move|| { | |
1a4d82fc JJ |
1954 | rx.recv().unwrap(); |
1955 | }).join(); | |
1956 | assert!(res.is_err()); | |
1957 | }); | |
85aaf69f SL |
1958 | let _t = thread::spawn(move|| { |
1959 | thread::spawn(move|| { | |
1a4d82fc JJ |
1960 | drop(tx); |
1961 | }); | |
1962 | }); | |
1963 | } | |
1964 | } | |
1965 | ||
1966 | #[test] | |
1967 | fn oneshot_multi_thread_send_recv_stress() { | |
85aaf69f | 1968 | for _ in 0..stress_factor() { |
c34b1796 | 1969 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
85aaf69f SL |
1970 | let _t = thread::spawn(move|| { |
1971 | tx.send(box 10).unwrap(); | |
1a4d82fc | 1972 | }); |
85aaf69f | 1973 | assert!(rx.recv().unwrap() == box 10); |
1a4d82fc JJ |
1974 | } |
1975 | } | |
1976 | ||
1977 | #[test] | |
1978 | fn stream_send_recv_stress() { | |
85aaf69f | 1979 | for _ in 0..stress_factor() { |
c34b1796 | 1980 | let (tx, rx) = sync_channel::<Box<i32>>(0); |
1a4d82fc JJ |
1981 | |
1982 | send(tx, 0); | |
1983 | recv(rx, 0); | |
1984 | ||
c34b1796 | 1985 | fn send(tx: SyncSender<Box<i32>>, i: i32) { |
1a4d82fc JJ |
1986 | if i == 10 { return } |
1987 | ||
85aaf69f | 1988 | thread::spawn(move|| { |
1a4d82fc JJ |
1989 | tx.send(box i).unwrap(); |
1990 | send(tx, i + 1); | |
1991 | }); | |
1992 | } | |
1993 | ||
c34b1796 | 1994 | fn recv(rx: Receiver<Box<i32>>, i: i32) { |
1a4d82fc JJ |
1995 | if i == 10 { return } |
1996 | ||
85aaf69f | 1997 | thread::spawn(move|| { |
1a4d82fc JJ |
1998 | assert!(rx.recv().unwrap() == box i); |
1999 | recv(rx, i + 1); | |
2000 | }); | |
2001 | } | |
2002 | } | |
2003 | } | |
2004 | ||
2005 | #[test] | |
2006 | fn recv_a_lot() { | |
2007 | // Regression test that we don't run out of stack in scheduler context | |
2008 | let (tx, rx) = sync_channel(10000); | |
85aaf69f SL |
2009 | for _ in 0..10000 { tx.send(()).unwrap(); } |
2010 | for _ in 0..10000 { rx.recv().unwrap(); } | |
1a4d82fc JJ |
2011 | } |
2012 | ||
2013 | #[test] | |
2014 | fn shared_chan_stress() { | |
2015 | let (tx, rx) = sync_channel(0); | |
2016 | let total = stress_factor() + 100; | |
85aaf69f | 2017 | for _ in 0..total { |
1a4d82fc | 2018 | let tx = tx.clone(); |
85aaf69f | 2019 | thread::spawn(move|| { |
1a4d82fc JJ |
2020 | tx.send(()).unwrap(); |
2021 | }); | |
2022 | } | |
2023 | ||
85aaf69f | 2024 | for _ in 0..total { |
1a4d82fc JJ |
2025 | rx.recv().unwrap(); |
2026 | } | |
2027 | } | |
2028 | ||
2029 | #[test] | |
2030 | fn test_nested_recv_iter() { | |
c34b1796 AL |
2031 | let (tx, rx) = sync_channel::<i32>(0); |
2032 | let (total_tx, total_rx) = sync_channel::<i32>(0); | |
1a4d82fc | 2033 | |
85aaf69f | 2034 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2035 | let mut acc = 0; |
2036 | for x in rx.iter() { | |
2037 | acc += x; | |
2038 | } | |
2039 | total_tx.send(acc).unwrap(); | |
2040 | }); | |
2041 | ||
2042 | tx.send(3).unwrap(); | |
2043 | tx.send(1).unwrap(); | |
2044 | tx.send(2).unwrap(); | |
2045 | drop(tx); | |
2046 | assert_eq!(total_rx.recv().unwrap(), 6); | |
2047 | } | |
2048 | ||
2049 | #[test] | |
2050 | fn test_recv_iter_break() { | |
c34b1796 | 2051 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2052 | let (count_tx, count_rx) = sync_channel(0); |
2053 | ||
85aaf69f | 2054 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2055 | let mut count = 0; |
2056 | for x in rx.iter() { | |
2057 | if count >= 3 { | |
2058 | break; | |
2059 | } else { | |
2060 | count += x; | |
2061 | } | |
2062 | } | |
2063 | count_tx.send(count).unwrap(); | |
2064 | }); | |
2065 | ||
2066 | tx.send(2).unwrap(); | |
2067 | tx.send(2).unwrap(); | |
2068 | tx.send(2).unwrap(); | |
2069 | let _ = tx.try_send(2); | |
2070 | drop(tx); | |
2071 | assert_eq!(count_rx.recv().unwrap(), 4); | |
2072 | } | |
2073 | ||
2074 | #[test] | |
2075 | fn try_recv_states() { | |
c34b1796 | 2076 | let (tx1, rx1) = sync_channel::<i32>(1); |
1a4d82fc JJ |
2077 | let (tx2, rx2) = sync_channel::<()>(1); |
2078 | let (tx3, rx3) = sync_channel::<()>(1); | |
85aaf69f | 2079 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2080 | rx2.recv().unwrap(); |
2081 | tx1.send(1).unwrap(); | |
2082 | tx3.send(()).unwrap(); | |
2083 | rx2.recv().unwrap(); | |
2084 | drop(tx1); | |
2085 | tx3.send(()).unwrap(); | |
2086 | }); | |
2087 | ||
2088 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); | |
2089 | tx2.send(()).unwrap(); | |
2090 | rx3.recv().unwrap(); | |
2091 | assert_eq!(rx1.try_recv(), Ok(1)); | |
2092 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); | |
2093 | tx2.send(()).unwrap(); | |
2094 | rx3.recv().unwrap(); | |
2095 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); | |
2096 | } | |
2097 | ||
2098 | // This bug used to end up in a livelock inside of the Receiver destructor | |
2099 | // because the internal state of the Shared packet was corrupted | |
2100 | #[test] | |
2101 | fn destroy_upgraded_shared_port_when_sender_still_active() { | |
2102 | let (tx, rx) = sync_channel::<()>(0); | |
2103 | let (tx2, rx2) = sync_channel::<()>(0); | |
85aaf69f | 2104 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2105 | rx.recv().unwrap(); // wait on a oneshot |
2106 | drop(rx); // destroy a shared | |
2107 | tx2.send(()).unwrap(); | |
2108 | }); | |
bd371182 | 2109 | // make sure the other thread has gone to sleep |
85aaf69f | 2110 | for _ in 0..5000 { thread::yield_now(); } |
1a4d82fc JJ |
2111 | |
2112 | // upgrade to a shared chan and send a message | |
2113 | let t = tx.clone(); | |
2114 | drop(tx); | |
2115 | t.send(()).unwrap(); | |
2116 | ||
bd371182 | 2117 | // wait for the child thread to exit before we exit |
1a4d82fc JJ |
2118 | rx2.recv().unwrap(); |
2119 | } | |
2120 | ||
2121 | #[test] | |
2122 | fn send1() { | |
c34b1796 | 2123 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 2124 | let _t = thread::spawn(move|| { rx.recv().unwrap(); }); |
1a4d82fc JJ |
2125 | assert_eq!(tx.send(1), Ok(())); |
2126 | } | |
2127 | ||
2128 | #[test] | |
2129 | fn send2() { | |
c34b1796 | 2130 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f | 2131 | let _t = thread::spawn(move|| { drop(rx); }); |
1a4d82fc JJ |
2132 | assert!(tx.send(1).is_err()); |
2133 | } | |
2134 | ||
2135 | #[test] | |
2136 | fn send3() { | |
c34b1796 | 2137 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc | 2138 | assert_eq!(tx.send(1), Ok(())); |
85aaf69f | 2139 | let _t =thread::spawn(move|| { drop(rx); }); |
1a4d82fc JJ |
2140 | assert!(tx.send(1).is_err()); |
2141 | } | |
2142 | ||
2143 | #[test] | |
2144 | fn send4() { | |
c34b1796 | 2145 | let (tx, rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2146 | let tx2 = tx.clone(); |
2147 | let (done, donerx) = channel(); | |
2148 | let done2 = done.clone(); | |
85aaf69f | 2149 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2150 | assert!(tx.send(1).is_err()); |
2151 | done.send(()).unwrap(); | |
2152 | }); | |
85aaf69f | 2153 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2154 | assert!(tx2.send(2).is_err()); |
2155 | done2.send(()).unwrap(); | |
2156 | }); | |
2157 | drop(rx); | |
2158 | donerx.recv().unwrap(); | |
2159 | donerx.recv().unwrap(); | |
2160 | } | |
2161 | ||
2162 | #[test] | |
2163 | fn try_send1() { | |
c34b1796 | 2164 | let (tx, _rx) = sync_channel::<i32>(0); |
1a4d82fc JJ |
2165 | assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); |
2166 | } | |
2167 | ||
2168 | #[test] | |
2169 | fn try_send2() { | |
c34b1796 | 2170 | let (tx, _rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
2171 | assert_eq!(tx.try_send(1), Ok(())); |
2172 | assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); | |
2173 | } | |
2174 | ||
2175 | #[test] | |
2176 | fn try_send3() { | |
c34b1796 | 2177 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
2178 | assert_eq!(tx.try_send(1), Ok(())); |
2179 | drop(rx); | |
2180 | assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); | |
2181 | } | |
2182 | ||
2183 | #[test] | |
2184 | fn issue_15761() { | |
2185 | fn repro() { | |
2186 | let (tx1, rx1) = sync_channel::<()>(3); | |
2187 | let (tx2, rx2) = sync_channel::<()>(3); | |
2188 | ||
85aaf69f | 2189 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
2190 | rx1.recv().unwrap(); |
2191 | tx2.try_send(()).unwrap(); | |
2192 | }); | |
2193 | ||
2194 | tx1.try_send(()).unwrap(); | |
2195 | rx2.recv().unwrap(); | |
2196 | } | |
2197 | ||
85aaf69f | 2198 | for _ in 0..100 { |
1a4d82fc JJ |
2199 | repro() |
2200 | } | |
2201 | } | |
2202 | } |