]> git.proxmox.com Git - rustc.git/blob - src/libstd/sync/mpsc/mod.rs
21993e51669d359789f04767c4911391c36350cb
[rustc.git] / src / libstd / sync / mpsc / mod.rs
1 // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Multi-producer, single-consumer FIFO queue communication primitives.
12 //!
13 //! This module provides message-based communication over channels, concretely
14 //! defined among three types:
15 //!
16 //! * `Sender`
17 //! * `SyncSender`
18 //! * `Receiver`
19 //!
20 //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
21 //! senders are clone-able (multi-producer) such that many threads can send
22 //! simultaneously to one receiver (single-consumer).
23 //!
24 //! These channels come in two flavors:
25 //!
26 //! 1. An asynchronous, infinitely buffered channel. The `channel()` function
27 //! will return a `(Sender, Receiver)` tuple where all sends will be
28 //! **asynchronous** (they never block). The channel conceptually has an
29 //! infinite buffer.
30 //!
31 //! 2. A synchronous, bounded channel. The `sync_channel()` function will return
32 //! a `(SyncSender, Receiver)` tuple where the storage for pending messages
33 //! is a pre-allocated buffer of a fixed size. All sends will be
34 //! **synchronous** by blocking until there is buffer space available. Note
35 //! that a bound of 0 is allowed, causing the channel to become a
36 //! "rendezvous" channel where each sender atomically hands off a message to
37 //! a receiver.
38 //!
39 //! ## Disconnection
40 //!
41 //! The send and receive operations on channels will all return a `Result`
42 //! indicating whether the operation succeeded or not. An unsuccessful operation
43 //! is normally indicative of the other half of a channel having "hung up" by
44 //! being dropped in its corresponding thread.
45 //!
46 //! Once half of a channel has been deallocated, most operations can no longer
47 //! continue to make progress, so `Err` will be returned. Many applications will
48 //! continue to `unwrap()` the results returned from this module, instigating a
49 //! propagation of failure among threads if one unexpectedly dies.
50 //!
51 //! # Examples
52 //!
53 //! Simple usage:
54 //!
55 //! ```
56 //! use std::thread;
57 //! use std::sync::mpsc::channel;
58 //!
59 //! // Create a simple streaming channel
60 //! let (tx, rx) = channel();
61 //! thread::spawn(move|| {
62 //! tx.send(10).unwrap();
63 //! });
64 //! assert_eq!(rx.recv().unwrap(), 10);
65 //! ```
66 //!
67 //! Shared usage:
68 //!
69 //! ```
70 //! use std::thread;
71 //! use std::sync::mpsc::channel;
72 //!
73 //! // Create a shared channel that can be sent along from many threads
74 //! // where tx is the sending half (tx for transmission), and rx is the receiving
75 //! // half (rx for receiving).
76 //! let (tx, rx) = channel();
77 //! for i in 0..10 {
78 //! let tx = tx.clone();
79 //! thread::spawn(move|| {
80 //! tx.send(i).unwrap();
81 //! });
82 //! }
83 //!
84 //! for _ in 0..10 {
85 //! let j = rx.recv().unwrap();
86 //! assert!(0 <= j && j < 10);
87 //! }
88 //! ```
89 //!
90 //! Propagating panics:
91 //!
92 //! ```
93 //! use std::sync::mpsc::channel;
94 //!
95 //! // The call to recv() will return an error because the channel has already
96 //! // hung up (or been deallocated)
97 //! let (tx, rx) = channel::<int>();
98 //! drop(tx);
99 //! assert!(rx.recv().is_err());
100 //! ```
101 //!
102 //! Synchronous channels:
103 //!
104 //! ```
105 //! use std::thread;
106 //! use std::sync::mpsc::sync_channel;
107 //!
108 //! let (tx, rx) = sync_channel::<int>(0);
109 //! thread::spawn(move|| {
110 //! // This will wait for the parent task to start receiving
111 //! tx.send(53).unwrap();
112 //! });
113 //! rx.recv().unwrap();
114 //! ```
115 //!
116 //! Reading from a channel with a timeout requires to use a Timer together
117 //! with the channel. You can use the select! macro to select either and
118 //! handle the timeout case. This first example will break out of the loop
119 //! after 10 seconds no matter what:
120 //!
121 //! ```no_run
122 //! use std::sync::mpsc::channel;
123 //! use std::old_io::timer::Timer;
124 //! use std::time::Duration;
125 //!
126 //! let (tx, rx) = channel::<int>();
127 //! let mut timer = Timer::new().unwrap();
128 //! let timeout = timer.oneshot(Duration::seconds(10));
129 //!
130 //! loop {
131 //! select! {
132 //! val = rx.recv() => println!("Received {}", val.unwrap()),
133 //! _ = timeout.recv() => {
134 //! println!("timed out, total time was more than 10 seconds");
135 //! break;
136 //! }
137 //! }
138 //! }
139 //! ```
140 //!
141 //! This second example is more costly since it allocates a new timer every
142 //! time a message is received, but it allows you to timeout after the channel
143 //! has been inactive for 5 seconds:
144 //!
145 //! ```no_run
146 //! use std::sync::mpsc::channel;
147 //! use std::old_io::timer::Timer;
148 //! use std::time::Duration;
149 //!
150 //! let (tx, rx) = channel::<int>();
151 //! let mut timer = Timer::new().unwrap();
152 //!
153 //! loop {
154 //! let timeout = timer.oneshot(Duration::seconds(5));
155 //!
156 //! select! {
157 //! val = rx.recv() => println!("Received {}", val.unwrap()),
158 //! _ = timeout.recv() => {
159 //! println!("timed out, no message received in 5 seconds");
160 //! break;
161 //! }
162 //! }
163 //! }
164 //! ```
165
166 #![stable(feature = "rust1", since = "1.0.0")]
167
168 // A description of how Rust's channel implementation works
169 //
170 // Channels are supposed to be the basic building block for all other
171 // concurrent primitives that are used in Rust. As a result, the channel type
172 // needs to be highly optimized, flexible, and broad enough for use everywhere.
173 //
174 // The choice of implementation of all channels is to be built on lock-free data
175 // structures. The channels themselves are then consequently also lock-free data
176 // structures. As always with lock-free code, this is a very "here be dragons"
177 // territory, especially because I'm unaware of any academic papers that have
178 // gone into great length about channels of these flavors.
179 //
180 // ## Flavors of channels
181 //
182 // From the perspective of a consumer of this library, there is only one flavor
183 // of channel. This channel can be used as a stream and cloned to allow multiple
184 // senders. Under the hood, however, there are actually three flavors of
185 // channels in play.
186 //
187 // * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
188 // They contain as few atomics as possible and involve one and
189 // exactly one allocation.
190 // * Streams - these channels are optimized for the non-shared use case. They
191 // use a different concurrent queue that is more tailored for this
192 // use case. The initial allocation of this flavor of channel is not
193 // optimized.
194 // * Shared - this is the most general form of channel that this module offers,
195 // a channel with multiple senders. This type is as optimized as it
196 // can be, but the previous two types mentioned are much faster for
197 // their use-cases.
198 //
199 // ## Concurrent queues
200 //
201 // The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
202 // recv() obviously blocks. This means that under the hood there must be some
203 // shared and concurrent queue holding all of the actual data.
204 //
205 // With two flavors of channels, two flavors of queues are also used. We have
206 // chosen to use queues from a well-known author that are abbreviated as SPSC
207 // and MPSC (single producer, single consumer and multiple producer, single
208 // consumer). SPSC queues are used for streams while MPSC queues are used for
209 // shared channels.
210 //
211 // ### SPSC optimizations
212 //
213 // The SPSC queue found online is essentially a linked list of nodes where one
214 // half of the nodes are the "queue of data" and the other half of nodes are a
215 // cache of unused nodes. The unused nodes are used such that an allocation is
216 // not required on every push() and a free doesn't need to happen on every
217 // pop().
218 //
219 // As found online, however, the cache of nodes is of an infinite size. This
220 // means that if a channel at one point in its life had 50k items in the queue,
221 // then the queue will always have the capacity for 50k items. I believed that
222 // this was an unnecessary limitation of the implementation, so I have altered
223 // the queue to optionally have a bound on the cache size.
224 //
225 // By default, streams will have an unbounded SPSC queue with a small-ish cache
226 // size. The hope is that the cache is still large enough to have very fast
227 // send() operations while not too large such that millions of channels can
228 // coexist at once.
229 //
230 // ### MPSC optimizations
231 //
232 // Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
233 // a linked list under the hood to earn its unboundedness, but I have not put
234 // forth much effort into having a cache of nodes similar to the SPSC queue.
235 //
236 // For now, I believe that this is "ok" because shared channels are not the most
237 // common type, but soon we may wish to revisit this queue choice and determine
238 // another candidate for backend storage of shared channels.
239 //
240 // ## Overview of the Implementation
241 //
242 // Now that there's a little background on the concurrent queues used, it's
243 // worth going into much more detail about the channels themselves. The basic
244 // pseudocode for a send/recv are:
245 //
246 //
247 // send(t) recv()
248 // queue.push(t) return if queue.pop()
249 // if increment() == -1 deschedule {
250 // wakeup() if decrement() > 0
251 // cancel_deschedule()
252 // }
253 // queue.pop()
254 //
255 // As mentioned before, there are no locks in this implementation, only atomic
256 // instructions are used.
257 //
258 // ### The internal atomic counter
259 //
260 // Every channel has a shared counter with each half to keep track of the size
261 // of the queue. This counter is used to abort descheduling by the receiver and
262 // to know when to wake up on the sending side.
263 //
264 // As seen in the pseudocode, senders will increment this count and receivers
265 // will decrement the count. The theory behind this is that if a sender sees a
266 // -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
267 // then it doesn't need to block.
268 //
269 // The recv() method has a beginning call to pop(), and if successful, it needs
270 // to decrement the count. It is a crucial implementation detail that this
271 // decrement does *not* happen to the shared counter. If this were the case,
272 // then it would be possible for the counter to be very negative when there were
273 // no receivers waiting, in which case the senders would have to determine when
274 // it was actually appropriate to wake up a receiver.
275 //
276 // Instead, the "steal count" is kept track of separately (not atomically
277 // because it's only used by receivers), and then the decrement() call when
278 // descheduling will lump in all of the recent steals into one large decrement.
279 //
280 // The implication of this is that if a sender sees a -1 count, then there's
281 // guaranteed to be a waiter waiting!
282 //
283 // ## Native Implementation
284 //
285 // A major goal of these channels is to work seamlessly on and off the runtime.
286 // All of the previous race conditions have been worded in terms of
287 // scheduler-isms (which is obviously not available without the runtime).
288 //
289 // For now, native usage of channels (off the runtime) will fall back onto
290 // mutexes/cond vars for descheduling/atomic decisions. The no-contention path
291 // is still entirely lock-free, the "deschedule" blocks above are surrounded by
292 // a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
293 // condition variable.
294 //
295 // ## Select
296 //
297 // Being able to support selection over channels has greatly influenced this
298 // design, and not only does selection need to work inside the runtime, but also
299 // outside the runtime.
300 //
301 // The implementation is fairly straightforward. The goal of select() is not to
302 // return some data, but only to return which channel can receive data without
303 // blocking. The implementation is essentially the entire blocking procedure
304 // followed by an increment as soon as its woken up. The cancellation procedure
305 // involves an increment and swapping out of to_wake to acquire ownership of the
306 // task to unblock.
307 //
308 // Sadly this current implementation requires multiple allocations, so I have
309 // seen the throughput of select() be much worse than it should be. I do not
310 // believe that there is anything fundamental that needs to change about these
311 // channels, however, in order to support a more efficient select().
312 //
313 // # Conclusion
314 //
315 // And now that you've seen all the races that I found and attempted to fix,
316 // here's the code for you to find some more!
317
318 use prelude::v1::*;
319
320 use sync::Arc;
321 use fmt;
322 use mem;
323 use cell::UnsafeCell;
324
325 pub use self::select::{Select, Handle};
326 use self::select::StartResult;
327 use self::select::StartResult::*;
328 use self::blocking::SignalToken;
329
330 mod blocking;
331 mod oneshot;
332 mod select;
333 mod shared;
334 mod stream;
335 mod sync;
336 mod mpsc_queue;
337 mod spsc_queue;
338
339 /// The receiving-half of Rust's channel type. This half can only be owned by
340 /// one task
341 #[stable(feature = "rust1", since = "1.0.0")]
342 pub struct Receiver<T> {
343 inner: UnsafeCell<Flavor<T>>,
344 }
345
346 // The receiver port can be sent from place to place, so long as it
347 // is not used to receive non-sendable things.
348 unsafe impl<T: Send + 'static> Send for Receiver<T> { }
349
350 /// An iterator over messages on a receiver, this iterator will block
351 /// whenever `next` is called, waiting for a new message, and `None` will be
352 /// returned when the corresponding channel has hung up.
353 #[stable(feature = "rust1", since = "1.0.0")]
354 pub struct Iter<'a, T:'a> {
355 rx: &'a Receiver<T>
356 }
357
358 /// The sending-half of Rust's asynchronous channel type. This half can only be
359 /// owned by one task, but it can be cloned to send to other tasks.
360 #[stable(feature = "rust1", since = "1.0.0")]
361 pub struct Sender<T> {
362 inner: UnsafeCell<Flavor<T>>,
363 }
364
365 // The send port can be sent from place to place, so long as it
366 // is not used to send non-sendable things.
367 unsafe impl<T: Send + 'static> Send for Sender<T> { }
368
369 /// The sending-half of Rust's synchronous channel type. This half can only be
370 /// owned by one task, but it can be cloned to send to other tasks.
371 #[stable(feature = "rust1", since = "1.0.0")]
372 pub struct SyncSender<T> {
373 inner: Arc<UnsafeCell<sync::Packet<T>>>,
374 }
375
376 unsafe impl<T: Send + 'static> Send for SyncSender<T> {}
377
378 impl<T> !Sync for SyncSender<T> {}
379
380 /// An error returned from the `send` function on channels.
381 ///
382 /// A `send` operation can only fail if the receiving end of a channel is
383 /// disconnected, implying that the data could never be received. The error
384 /// contains the data being sent as a payload so it can be recovered.
385 #[stable(feature = "rust1", since = "1.0.0")]
386 #[derive(PartialEq, Eq, Clone, Copy)]
387 pub struct SendError<T>(pub T);
388
389 /// An error returned from the `recv` function on a `Receiver`.
390 ///
391 /// The `recv` operation can only fail if the sending half of a channel is
392 /// disconnected, implying that no further messages will ever be received.
393 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
394 #[stable(feature = "rust1", since = "1.0.0")]
395 pub struct RecvError;
396
397 /// This enumeration is the list of the possible reasons that try_recv could not
398 /// return data when called.
399 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
400 #[stable(feature = "rust1", since = "1.0.0")]
401 pub enum TryRecvError {
402 /// This channel is currently empty, but the sender(s) have not yet
403 /// disconnected, so data may yet become available.
404 #[stable(feature = "rust1", since = "1.0.0")]
405 Empty,
406
407 /// This channel's sending half has become disconnected, and there will
408 /// never be any more data received on this channel
409 #[stable(feature = "rust1", since = "1.0.0")]
410 Disconnected,
411 }
412
413 /// This enumeration is the list of the possible error outcomes for the
414 /// `SyncSender::try_send` method.
415 #[stable(feature = "rust1", since = "1.0.0")]
416 #[derive(PartialEq, Eq, Clone, Copy)]
417 pub enum TrySendError<T> {
418 /// The data could not be sent on the channel because it would require that
419 /// the callee block to send the data.
420 ///
421 /// If this is a buffered channel, then the buffer is full at this time. If
422 /// this is not a buffered channel, then there is no receiver available to
423 /// acquire the data.
424 #[stable(feature = "rust1", since = "1.0.0")]
425 Full(T),
426
427 /// This channel's receiving half has disconnected, so the data could not be
428 /// sent. The data is returned back to the callee in this case.
429 #[stable(feature = "rust1", since = "1.0.0")]
430 Disconnected(T),
431 }
432
433 enum Flavor<T> {
434 Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
435 Stream(Arc<UnsafeCell<stream::Packet<T>>>),
436 Shared(Arc<UnsafeCell<shared::Packet<T>>>),
437 Sync(Arc<UnsafeCell<sync::Packet<T>>>),
438 }
439
440 #[doc(hidden)]
441 trait UnsafeFlavor<T> {
442 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
443 unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
444 &mut *self.inner_unsafe().get()
445 }
446 unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
447 &*self.inner_unsafe().get()
448 }
449 }
450 impl<T> UnsafeFlavor<T> for Sender<T> {
451 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
452 &self.inner
453 }
454 }
455 impl<T> UnsafeFlavor<T> for Receiver<T> {
456 fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
457 &self.inner
458 }
459 }
460
461 /// Creates a new asynchronous channel, returning the sender/receiver halves.
462 ///
463 /// All data sent on the sender will become available on the receiver, and no
464 /// send will block the calling task (this channel has an "infinite buffer").
465 ///
466 /// # Example
467 ///
468 /// ```
469 /// use std::sync::mpsc::channel;
470 /// use std::thread;
471 ///
472 /// // tx is is the sending half (tx for transmission), and rx is the receiving
473 /// // half (rx for receiving).
474 /// let (tx, rx) = channel();
475 ///
476 /// // Spawn off an expensive computation
477 /// thread::spawn(move|| {
478 /// # fn expensive_computation() {}
479 /// tx.send(expensive_computation()).unwrap();
480 /// });
481 ///
482 /// // Do some useful work for awhile
483 ///
484 /// // Let's see what that answer was
485 /// println!("{:?}", rx.recv().unwrap());
486 /// ```
487 #[stable(feature = "rust1", since = "1.0.0")]
488 pub fn channel<T: Send + 'static>() -> (Sender<T>, Receiver<T>) {
489 let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
490 (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
491 }
492
493 /// Creates a new synchronous, bounded channel.
494 ///
495 /// Like asynchronous channels, the `Receiver` will block until a message
496 /// becomes available. These channels differ greatly in the semantics of the
497 /// sender from asynchronous channels, however.
498 ///
499 /// This channel has an internal buffer on which messages will be queued. When
500 /// the internal buffer becomes full, future sends will *block* waiting for the
501 /// buffer to open up. Note that a buffer size of 0 is valid, in which case this
502 /// becomes "rendezvous channel" where each send will not return until a recv
503 /// is paired with it.
504 ///
505 /// As with asynchronous channels, all senders will panic in `send` if the
506 /// `Receiver` has been destroyed.
507 ///
508 /// # Example
509 ///
510 /// ```
511 /// use std::sync::mpsc::sync_channel;
512 /// use std::thread;
513 ///
514 /// let (tx, rx) = sync_channel(1);
515 ///
516 /// // this returns immediately
517 /// tx.send(1).unwrap();
518 ///
519 /// thread::spawn(move|| {
520 /// // this will block until the previous message has been received
521 /// tx.send(2).unwrap();
522 /// });
523 ///
524 /// assert_eq!(rx.recv().unwrap(), 1);
525 /// assert_eq!(rx.recv().unwrap(), 2);
526 /// ```
527 #[stable(feature = "rust1", since = "1.0.0")]
528 pub fn sync_channel<T: Send + 'static>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
529 let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
530 (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
531 }
532
533 ////////////////////////////////////////////////////////////////////////////////
534 // Sender
535 ////////////////////////////////////////////////////////////////////////////////
536
537 impl<T: Send + 'static> Sender<T> {
538 fn new(inner: Flavor<T>) -> Sender<T> {
539 Sender {
540 inner: UnsafeCell::new(inner),
541 }
542 }
543
544 /// Attempts to send a value on this channel, returning it back if it could
545 /// not be sent.
546 ///
547 /// A successful send occurs when it is determined that the other end of
548 /// the channel has not hung up already. An unsuccessful send would be one
549 /// where the corresponding receiver has already been deallocated. Note
550 /// that a return value of `Err` means that the data will never be
551 /// received, but a return value of `Ok` does *not* mean that the data
552 /// will be received. It is possible for the corresponding receiver to
553 /// hang up immediately after this function returns `Ok`.
554 ///
555 /// This method will never block the current thread.
556 ///
557 /// # Example
558 ///
559 /// ```
560 /// use std::sync::mpsc::channel;
561 ///
562 /// let (tx, rx) = channel();
563 ///
564 /// // This send is always successful
565 /// tx.send(1).unwrap();
566 ///
567 /// // This send will fail because the receiver is gone
568 /// drop(rx);
569 /// assert_eq!(tx.send(1).err().unwrap().0, 1);
570 /// ```
571 #[stable(feature = "rust1", since = "1.0.0")]
572 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
573 let (new_inner, ret) = match *unsafe { self.inner() } {
574 Flavor::Oneshot(ref p) => {
575 unsafe {
576 let p = p.get();
577 if !(*p).sent() {
578 return (*p).send(t).map_err(SendError);
579 } else {
580 let a =
581 Arc::new(UnsafeCell::new(stream::Packet::new()));
582 let rx = Receiver::new(Flavor::Stream(a.clone()));
583 match (*p).upgrade(rx) {
584 oneshot::UpSuccess => {
585 let ret = (*a.get()).send(t);
586 (a, ret)
587 }
588 oneshot::UpDisconnected => (a, Err(t)),
589 oneshot::UpWoke(token) => {
590 // This send cannot panic because the thread is
591 // asleep (we're looking at it), so the receiver
592 // can't go away.
593 (*a.get()).send(t).ok().unwrap();
594 token.signal();
595 (a, Ok(()))
596 }
597 }
598 }
599 }
600 }
601 Flavor::Stream(ref p) => return unsafe {
602 (*p.get()).send(t).map_err(SendError)
603 },
604 Flavor::Shared(ref p) => return unsafe {
605 (*p.get()).send(t).map_err(SendError)
606 },
607 Flavor::Sync(..) => unreachable!(),
608 };
609
610 unsafe {
611 let tmp = Sender::new(Flavor::Stream(new_inner));
612 mem::swap(self.inner_mut(), tmp.inner_mut());
613 }
614 ret.map_err(SendError)
615 }
616 }
617
618 #[stable(feature = "rust1", since = "1.0.0")]
619 impl<T: Send + 'static> Clone for Sender<T> {
620 fn clone(&self) -> Sender<T> {
621 let (packet, sleeper, guard) = match *unsafe { self.inner() } {
622 Flavor::Oneshot(ref p) => {
623 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
624 unsafe {
625 let guard = (*a.get()).postinit_lock();
626 let rx = Receiver::new(Flavor::Shared(a.clone()));
627 match (*p.get()).upgrade(rx) {
628 oneshot::UpSuccess |
629 oneshot::UpDisconnected => (a, None, guard),
630 oneshot::UpWoke(task) => (a, Some(task), guard)
631 }
632 }
633 }
634 Flavor::Stream(ref p) => {
635 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
636 unsafe {
637 let guard = (*a.get()).postinit_lock();
638 let rx = Receiver::new(Flavor::Shared(a.clone()));
639 match (*p.get()).upgrade(rx) {
640 stream::UpSuccess |
641 stream::UpDisconnected => (a, None, guard),
642 stream::UpWoke(task) => (a, Some(task), guard),
643 }
644 }
645 }
646 Flavor::Shared(ref p) => {
647 unsafe { (*p.get()).clone_chan(); }
648 return Sender::new(Flavor::Shared(p.clone()));
649 }
650 Flavor::Sync(..) => unreachable!(),
651 };
652
653 unsafe {
654 (*packet.get()).inherit_blocker(sleeper, guard);
655
656 let tmp = Sender::new(Flavor::Shared(packet.clone()));
657 mem::swap(self.inner_mut(), tmp.inner_mut());
658 }
659 Sender::new(Flavor::Shared(packet))
660 }
661 }
662
663 #[unsafe_destructor]
664 #[stable(feature = "rust1", since = "1.0.0")]
665 impl<T: Send + 'static> Drop for Sender<T> {
666 fn drop(&mut self) {
667 match *unsafe { self.inner_mut() } {
668 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
669 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
670 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
671 Flavor::Sync(..) => unreachable!(),
672 }
673 }
674 }
675
676 ////////////////////////////////////////////////////////////////////////////////
677 // SyncSender
678 ////////////////////////////////////////////////////////////////////////////////
679
680 impl<T: Send + 'static> SyncSender<T> {
681 fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
682 SyncSender { inner: inner }
683 }
684
685 /// Sends a value on this synchronous channel.
686 ///
687 /// This function will *block* until space in the internal buffer becomes
688 /// available or a receiver is available to hand off the message to.
689 ///
690 /// Note that a successful send does *not* guarantee that the receiver will
691 /// ever see the data if there is a buffer on this channel. Items may be
692 /// enqueued in the internal buffer for the receiver to receive at a later
693 /// time. If the buffer size is 0, however, it can be guaranteed that the
694 /// receiver has indeed received the data if this function returns success.
695 ///
696 /// This function will never panic, but it may return `Err` if the
697 /// `Receiver` has disconnected and is no longer able to receive
698 /// information.
699 #[stable(feature = "rust1", since = "1.0.0")]
700 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
701 unsafe { (*self.inner.get()).send(t).map_err(SendError) }
702 }
703
704 /// Attempts to send a value on this channel without blocking.
705 ///
706 /// This method differs from `send` by returning immediately if the
707 /// channel's buffer is full or no receiver is waiting to acquire some
708 /// data. Compared with `send`, this function has two failure cases
709 /// instead of one (one for disconnection, one for a full buffer).
710 ///
711 /// See `SyncSender::send` for notes about guarantees of whether the
712 /// receiver has received the data or not if this function is successful.
713 #[stable(feature = "rust1", since = "1.0.0")]
714 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
715 unsafe { (*self.inner.get()).try_send(t) }
716 }
717 }
718
719 #[stable(feature = "rust1", since = "1.0.0")]
720 impl<T: Send + 'static> Clone for SyncSender<T> {
721 fn clone(&self) -> SyncSender<T> {
722 unsafe { (*self.inner.get()).clone_chan(); }
723 return SyncSender::new(self.inner.clone());
724 }
725 }
726
727 #[unsafe_destructor]
728 #[stable(feature = "rust1", since = "1.0.0")]
729 impl<T: Send + 'static> Drop for SyncSender<T> {
730 fn drop(&mut self) {
731 unsafe { (*self.inner.get()).drop_chan(); }
732 }
733 }
734
735 ////////////////////////////////////////////////////////////////////////////////
736 // Receiver
737 ////////////////////////////////////////////////////////////////////////////////
738
739 impl<T: Send + 'static> Receiver<T> {
740 fn new(inner: Flavor<T>) -> Receiver<T> {
741 Receiver { inner: UnsafeCell::new(inner) }
742 }
743
744 /// Attempts to return a pending value on this receiver without blocking
745 ///
746 /// This method will never block the caller in order to wait for data to
747 /// become available. Instead, this will always return immediately with a
748 /// possible option of pending data on the channel.
749 ///
750 /// This is useful for a flavor of "optimistic check" before deciding to
751 /// block on a receiver.
752 #[stable(feature = "rust1", since = "1.0.0")]
753 pub fn try_recv(&self) -> Result<T, TryRecvError> {
754 loop {
755 let new_port = match *unsafe { self.inner() } {
756 Flavor::Oneshot(ref p) => {
757 match unsafe { (*p.get()).try_recv() } {
758 Ok(t) => return Ok(t),
759 Err(oneshot::Empty) => return Err(TryRecvError::Empty),
760 Err(oneshot::Disconnected) => {
761 return Err(TryRecvError::Disconnected)
762 }
763 Err(oneshot::Upgraded(rx)) => rx,
764 }
765 }
766 Flavor::Stream(ref p) => {
767 match unsafe { (*p.get()).try_recv() } {
768 Ok(t) => return Ok(t),
769 Err(stream::Empty) => return Err(TryRecvError::Empty),
770 Err(stream::Disconnected) => {
771 return Err(TryRecvError::Disconnected)
772 }
773 Err(stream::Upgraded(rx)) => rx,
774 }
775 }
776 Flavor::Shared(ref p) => {
777 match unsafe { (*p.get()).try_recv() } {
778 Ok(t) => return Ok(t),
779 Err(shared::Empty) => return Err(TryRecvError::Empty),
780 Err(shared::Disconnected) => {
781 return Err(TryRecvError::Disconnected)
782 }
783 }
784 }
785 Flavor::Sync(ref p) => {
786 match unsafe { (*p.get()).try_recv() } {
787 Ok(t) => return Ok(t),
788 Err(sync::Empty) => return Err(TryRecvError::Empty),
789 Err(sync::Disconnected) => {
790 return Err(TryRecvError::Disconnected)
791 }
792 }
793 }
794 };
795 unsafe {
796 mem::swap(self.inner_mut(),
797 new_port.inner_mut());
798 }
799 }
800 }
801
802 /// Attempt to wait for a value on this receiver, returning an error if the
803 /// corresponding channel has hung up.
804 ///
805 /// This function will always block the current thread if there is no data
806 /// available and it's possible for more data to be sent. Once a message is
807 /// sent to the corresponding `Sender`, then this receiver will wake up and
808 /// return that message.
809 ///
810 /// If the corresponding `Sender` has disconnected, or it disconnects while
811 /// this call is blocking, this call will wake up and return `Err` to
812 /// indicate that no more messages can ever be received on this channel.
813 #[stable(feature = "rust1", since = "1.0.0")]
814 pub fn recv(&self) -> Result<T, RecvError> {
815 loop {
816 let new_port = match *unsafe { self.inner() } {
817 Flavor::Oneshot(ref p) => {
818 match unsafe { (*p.get()).recv() } {
819 Ok(t) => return Ok(t),
820 Err(oneshot::Empty) => return unreachable!(),
821 Err(oneshot::Disconnected) => return Err(RecvError),
822 Err(oneshot::Upgraded(rx)) => rx,
823 }
824 }
825 Flavor::Stream(ref p) => {
826 match unsafe { (*p.get()).recv() } {
827 Ok(t) => return Ok(t),
828 Err(stream::Empty) => return unreachable!(),
829 Err(stream::Disconnected) => return Err(RecvError),
830 Err(stream::Upgraded(rx)) => rx,
831 }
832 }
833 Flavor::Shared(ref p) => {
834 match unsafe { (*p.get()).recv() } {
835 Ok(t) => return Ok(t),
836 Err(shared::Empty) => return unreachable!(),
837 Err(shared::Disconnected) => return Err(RecvError),
838 }
839 }
840 Flavor::Sync(ref p) => return unsafe {
841 (*p.get()).recv().map_err(|()| RecvError)
842 }
843 };
844 unsafe {
845 mem::swap(self.inner_mut(), new_port.inner_mut());
846 }
847 }
848 }
849
850 /// Returns an iterator that will block waiting for messages, but never
851 /// `panic!`. It will return `None` when the channel has hung up.
852 #[stable(feature = "rust1", since = "1.0.0")]
853 pub fn iter(&self) -> Iter<T> {
854 Iter { rx: self }
855 }
856 }
857
858 impl<T: Send + 'static> select::Packet for Receiver<T> {
859 fn can_recv(&self) -> bool {
860 loop {
861 let new_port = match *unsafe { self.inner() } {
862 Flavor::Oneshot(ref p) => {
863 match unsafe { (*p.get()).can_recv() } {
864 Ok(ret) => return ret,
865 Err(upgrade) => upgrade,
866 }
867 }
868 Flavor::Stream(ref p) => {
869 match unsafe { (*p.get()).can_recv() } {
870 Ok(ret) => return ret,
871 Err(upgrade) => upgrade,
872 }
873 }
874 Flavor::Shared(ref p) => {
875 return unsafe { (*p.get()).can_recv() };
876 }
877 Flavor::Sync(ref p) => {
878 return unsafe { (*p.get()).can_recv() };
879 }
880 };
881 unsafe {
882 mem::swap(self.inner_mut(),
883 new_port.inner_mut());
884 }
885 }
886 }
887
888 fn start_selection(&self, mut token: SignalToken) -> StartResult {
889 loop {
890 let (t, new_port) = match *unsafe { self.inner() } {
891 Flavor::Oneshot(ref p) => {
892 match unsafe { (*p.get()).start_selection(token) } {
893 oneshot::SelSuccess => return Installed,
894 oneshot::SelCanceled => return Abort,
895 oneshot::SelUpgraded(t, rx) => (t, rx),
896 }
897 }
898 Flavor::Stream(ref p) => {
899 match unsafe { (*p.get()).start_selection(token) } {
900 stream::SelSuccess => return Installed,
901 stream::SelCanceled => return Abort,
902 stream::SelUpgraded(t, rx) => (t, rx),
903 }
904 }
905 Flavor::Shared(ref p) => {
906 return unsafe { (*p.get()).start_selection(token) };
907 }
908 Flavor::Sync(ref p) => {
909 return unsafe { (*p.get()).start_selection(token) };
910 }
911 };
912 token = t;
913 unsafe {
914 mem::swap(self.inner_mut(), new_port.inner_mut());
915 }
916 }
917 }
918
919 fn abort_selection(&self) -> bool {
920 let mut was_upgrade = false;
921 loop {
922 let result = match *unsafe { self.inner() } {
923 Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
924 Flavor::Stream(ref p) => unsafe {
925 (*p.get()).abort_selection(was_upgrade)
926 },
927 Flavor::Shared(ref p) => return unsafe {
928 (*p.get()).abort_selection(was_upgrade)
929 },
930 Flavor::Sync(ref p) => return unsafe {
931 (*p.get()).abort_selection()
932 },
933 };
934 let new_port = match result { Ok(b) => return b, Err(p) => p };
935 was_upgrade = true;
936 unsafe {
937 mem::swap(self.inner_mut(),
938 new_port.inner_mut());
939 }
940 }
941 }
942 }
943
944 #[stable(feature = "rust1", since = "1.0.0")]
945 impl<'a, T: Send + 'static> Iterator for Iter<'a, T> {
946 type Item = T;
947
948 fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
949 }
950
951 #[unsafe_destructor]
952 #[stable(feature = "rust1", since = "1.0.0")]
953 impl<T: Send + 'static> Drop for Receiver<T> {
954 fn drop(&mut self) {
955 match *unsafe { self.inner_mut() } {
956 Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
957 Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
958 Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
959 Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
960 }
961 }
962 }
963
964 #[stable(feature = "rust1", since = "1.0.0")]
965 impl<T> fmt::Debug for SendError<T> {
966 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
967 "SendError(..)".fmt(f)
968 }
969 }
970
971 #[stable(feature = "rust1", since = "1.0.0")]
972 impl<T> fmt::Display for SendError<T> {
973 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
974 "sending on a closed channel".fmt(f)
975 }
976 }
977
978 #[stable(feature = "rust1", since = "1.0.0")]
979 impl<T> fmt::Debug for TrySendError<T> {
980 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
981 match *self {
982 TrySendError::Full(..) => "Full(..)".fmt(f),
983 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
984 }
985 }
986 }
987
988 #[stable(feature = "rust1", since = "1.0.0")]
989 impl<T> fmt::Display for TrySendError<T> {
990 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
991 match *self {
992 TrySendError::Full(..) => {
993 "sending on a full channel".fmt(f)
994 }
995 TrySendError::Disconnected(..) => {
996 "sending on a closed channel".fmt(f)
997 }
998 }
999 }
1000 }
1001
1002 #[stable(feature = "rust1", since = "1.0.0")]
1003 impl fmt::Display for RecvError {
1004 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1005 "receiving on a closed channel".fmt(f)
1006 }
1007 }
1008
1009 #[stable(feature = "rust1", since = "1.0.0")]
1010 impl fmt::Display for TryRecvError {
1011 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1012 match *self {
1013 TryRecvError::Empty => {
1014 "receiving on an empty channel".fmt(f)
1015 }
1016 TryRecvError::Disconnected => {
1017 "receiving on a closed channel".fmt(f)
1018 }
1019 }
1020 }
1021 }
1022
1023 #[cfg(test)]
1024 mod test {
1025 use prelude::v1::*;
1026
1027 use std::env;
1028 use super::*;
1029 use thread;
1030
1031 pub fn stress_factor() -> uint {
1032 match env::var("RUST_TEST_STRESS") {
1033 Ok(val) => val.parse().unwrap(),
1034 Err(..) => 1,
1035 }
1036 }
1037
1038 #[test]
1039 fn smoke() {
1040 let (tx, rx) = channel::<int>();
1041 tx.send(1).unwrap();
1042 assert_eq!(rx.recv().unwrap(), 1);
1043 }
1044
1045 #[test]
1046 fn drop_full() {
1047 let (tx, _rx) = channel();
1048 tx.send(box 1).unwrap();
1049 }
1050
1051 #[test]
1052 fn drop_full_shared() {
1053 let (tx, _rx) = channel();
1054 drop(tx.clone());
1055 drop(tx.clone());
1056 tx.send(box 1).unwrap();
1057 }
1058
1059 #[test]
1060 fn smoke_shared() {
1061 let (tx, rx) = channel::<int>();
1062 tx.send(1).unwrap();
1063 assert_eq!(rx.recv().unwrap(), 1);
1064 let tx = tx.clone();
1065 tx.send(1).unwrap();
1066 assert_eq!(rx.recv().unwrap(), 1);
1067 }
1068
1069 #[test]
1070 fn smoke_threads() {
1071 let (tx, rx) = channel::<int>();
1072 let _t = thread::spawn(move|| {
1073 tx.send(1).unwrap();
1074 });
1075 assert_eq!(rx.recv().unwrap(), 1);
1076 }
1077
1078 #[test]
1079 fn smoke_port_gone() {
1080 let (tx, rx) = channel::<int>();
1081 drop(rx);
1082 assert!(tx.send(1).is_err());
1083 }
1084
1085 #[test]
1086 fn smoke_shared_port_gone() {
1087 let (tx, rx) = channel::<int>();
1088 drop(rx);
1089 assert!(tx.send(1).is_err())
1090 }
1091
1092 #[test]
1093 fn smoke_shared_port_gone2() {
1094 let (tx, rx) = channel::<int>();
1095 drop(rx);
1096 let tx2 = tx.clone();
1097 drop(tx);
1098 assert!(tx2.send(1).is_err());
1099 }
1100
1101 #[test]
1102 fn port_gone_concurrent() {
1103 let (tx, rx) = channel::<int>();
1104 let _t = thread::spawn(move|| {
1105 rx.recv().unwrap();
1106 });
1107 while tx.send(1).is_ok() {}
1108 }
1109
1110 #[test]
1111 fn port_gone_concurrent_shared() {
1112 let (tx, rx) = channel::<int>();
1113 let tx2 = tx.clone();
1114 let _t = thread::spawn(move|| {
1115 rx.recv().unwrap();
1116 });
1117 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1118 }
1119
1120 #[test]
1121 fn smoke_chan_gone() {
1122 let (tx, rx) = channel::<int>();
1123 drop(tx);
1124 assert!(rx.recv().is_err());
1125 }
1126
1127 #[test]
1128 fn smoke_chan_gone_shared() {
1129 let (tx, rx) = channel::<()>();
1130 let tx2 = tx.clone();
1131 drop(tx);
1132 drop(tx2);
1133 assert!(rx.recv().is_err());
1134 }
1135
1136 #[test]
1137 fn chan_gone_concurrent() {
1138 let (tx, rx) = channel::<int>();
1139 let _t = thread::spawn(move|| {
1140 tx.send(1).unwrap();
1141 tx.send(1).unwrap();
1142 });
1143 while rx.recv().is_ok() {}
1144 }
1145
1146 #[test]
1147 fn stress() {
1148 let (tx, rx) = channel::<int>();
1149 let t = thread::spawn(move|| {
1150 for _ in 0..10000 { tx.send(1).unwrap(); }
1151 });
1152 for _ in 0..10000 {
1153 assert_eq!(rx.recv().unwrap(), 1);
1154 }
1155 t.join().ok().unwrap();
1156 }
1157
1158 #[test]
1159 fn stress_shared() {
1160 static AMT: uint = 10000;
1161 static NTHREADS: uint = 8;
1162 let (tx, rx) = channel::<int>();
1163
1164 let t = thread::spawn(move|| {
1165 for _ in 0..AMT * NTHREADS {
1166 assert_eq!(rx.recv().unwrap(), 1);
1167 }
1168 match rx.try_recv() {
1169 Ok(..) => panic!(),
1170 _ => {}
1171 }
1172 });
1173
1174 for _ in 0..NTHREADS {
1175 let tx = tx.clone();
1176 thread::spawn(move|| {
1177 for _ in 0..AMT { tx.send(1).unwrap(); }
1178 });
1179 }
1180 drop(tx);
1181 t.join().ok().unwrap();
1182 }
1183
1184 #[test]
1185 fn send_from_outside_runtime() {
1186 let (tx1, rx1) = channel::<()>();
1187 let (tx2, rx2) = channel::<int>();
1188 let t1 = thread::spawn(move|| {
1189 tx1.send(()).unwrap();
1190 for _ in 0..40 {
1191 assert_eq!(rx2.recv().unwrap(), 1);
1192 }
1193 });
1194 rx1.recv().unwrap();
1195 let t2 = thread::spawn(move|| {
1196 for _ in 0..40 {
1197 tx2.send(1).unwrap();
1198 }
1199 });
1200 t1.join().ok().unwrap();
1201 t2.join().ok().unwrap();
1202 }
1203
1204 #[test]
1205 fn recv_from_outside_runtime() {
1206 let (tx, rx) = channel::<int>();
1207 let t = thread::spawn(move|| {
1208 for _ in 0..40 {
1209 assert_eq!(rx.recv().unwrap(), 1);
1210 }
1211 });
1212 for _ in 0..40 {
1213 tx.send(1).unwrap();
1214 }
1215 t.join().ok().unwrap();
1216 }
1217
1218 #[test]
1219 fn no_runtime() {
1220 let (tx1, rx1) = channel::<int>();
1221 let (tx2, rx2) = channel::<int>();
1222 let t1 = thread::spawn(move|| {
1223 assert_eq!(rx1.recv().unwrap(), 1);
1224 tx2.send(2).unwrap();
1225 });
1226 let t2 = thread::spawn(move|| {
1227 tx1.send(1).unwrap();
1228 assert_eq!(rx2.recv().unwrap(), 2);
1229 });
1230 t1.join().ok().unwrap();
1231 t2.join().ok().unwrap();
1232 }
1233
1234 #[test]
1235 fn oneshot_single_thread_close_port_first() {
1236 // Simple test of closing without sending
1237 let (_tx, rx) = channel::<int>();
1238 drop(rx);
1239 }
1240
1241 #[test]
1242 fn oneshot_single_thread_close_chan_first() {
1243 // Simple test of closing without sending
1244 let (tx, _rx) = channel::<int>();
1245 drop(tx);
1246 }
1247
1248 #[test]
1249 fn oneshot_single_thread_send_port_close() {
1250 // Testing that the sender cleans up the payload if receiver is closed
1251 let (tx, rx) = channel::<Box<int>>();
1252 drop(rx);
1253 assert!(tx.send(box 0).is_err());
1254 }
1255
1256 #[test]
1257 fn oneshot_single_thread_recv_chan_close() {
1258 // Receiving on a closed chan will panic
1259 let res = thread::spawn(move|| {
1260 let (tx, rx) = channel::<int>();
1261 drop(tx);
1262 rx.recv().unwrap();
1263 }).join();
1264 // What is our res?
1265 assert!(res.is_err());
1266 }
1267
1268 #[test]
1269 fn oneshot_single_thread_send_then_recv() {
1270 let (tx, rx) = channel::<Box<int>>();
1271 tx.send(box 10).unwrap();
1272 assert!(rx.recv().unwrap() == box 10);
1273 }
1274
1275 #[test]
1276 fn oneshot_single_thread_try_send_open() {
1277 let (tx, rx) = channel::<int>();
1278 assert!(tx.send(10).is_ok());
1279 assert!(rx.recv().unwrap() == 10);
1280 }
1281
1282 #[test]
1283 fn oneshot_single_thread_try_send_closed() {
1284 let (tx, rx) = channel::<int>();
1285 drop(rx);
1286 assert!(tx.send(10).is_err());
1287 }
1288
1289 #[test]
1290 fn oneshot_single_thread_try_recv_open() {
1291 let (tx, rx) = channel::<int>();
1292 tx.send(10).unwrap();
1293 assert!(rx.recv() == Ok(10));
1294 }
1295
1296 #[test]
1297 fn oneshot_single_thread_try_recv_closed() {
1298 let (tx, rx) = channel::<int>();
1299 drop(tx);
1300 assert!(rx.recv().is_err());
1301 }
1302
1303 #[test]
1304 fn oneshot_single_thread_peek_data() {
1305 let (tx, rx) = channel::<int>();
1306 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1307 tx.send(10).unwrap();
1308 assert_eq!(rx.try_recv(), Ok(10));
1309 }
1310
1311 #[test]
1312 fn oneshot_single_thread_peek_close() {
1313 let (tx, rx) = channel::<int>();
1314 drop(tx);
1315 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1316 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1317 }
1318
1319 #[test]
1320 fn oneshot_single_thread_peek_open() {
1321 let (_tx, rx) = channel::<int>();
1322 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1323 }
1324
1325 #[test]
1326 fn oneshot_multi_task_recv_then_send() {
1327 let (tx, rx) = channel::<Box<int>>();
1328 let _t = thread::spawn(move|| {
1329 assert!(rx.recv().unwrap() == box 10);
1330 });
1331
1332 tx.send(box 10).unwrap();
1333 }
1334
1335 #[test]
1336 fn oneshot_multi_task_recv_then_close() {
1337 let (tx, rx) = channel::<Box<int>>();
1338 let _t = thread::spawn(move|| {
1339 drop(tx);
1340 });
1341 let res = thread::spawn(move|| {
1342 assert!(rx.recv().unwrap() == box 10);
1343 }).join();
1344 assert!(res.is_err());
1345 }
1346
1347 #[test]
1348 fn oneshot_multi_thread_close_stress() {
1349 for _ in 0..stress_factor() {
1350 let (tx, rx) = channel::<int>();
1351 let _t = thread::spawn(move|| {
1352 drop(rx);
1353 });
1354 drop(tx);
1355 }
1356 }
1357
1358 #[test]
1359 fn oneshot_multi_thread_send_close_stress() {
1360 for _ in 0..stress_factor() {
1361 let (tx, rx) = channel::<int>();
1362 let _t = thread::spawn(move|| {
1363 drop(rx);
1364 });
1365 let _ = thread::spawn(move|| {
1366 tx.send(1).unwrap();
1367 }).join();
1368 }
1369 }
1370
1371 #[test]
1372 fn oneshot_multi_thread_recv_close_stress() {
1373 for _ in 0..stress_factor() {
1374 let (tx, rx) = channel::<int>();
1375 thread::spawn(move|| {
1376 let res = thread::spawn(move|| {
1377 rx.recv().unwrap();
1378 }).join();
1379 assert!(res.is_err());
1380 });
1381 let _t = thread::spawn(move|| {
1382 thread::spawn(move|| {
1383 drop(tx);
1384 });
1385 });
1386 }
1387 }
1388
1389 #[test]
1390 fn oneshot_multi_thread_send_recv_stress() {
1391 for _ in 0..stress_factor() {
1392 let (tx, rx) = channel();
1393 let _t = thread::spawn(move|| {
1394 tx.send(box 10).unwrap();
1395 });
1396 assert!(rx.recv().unwrap() == box 10);
1397 }
1398 }
1399
1400 #[test]
1401 fn stream_send_recv_stress() {
1402 for _ in 0..stress_factor() {
1403 let (tx, rx) = channel();
1404
1405 send(tx, 0);
1406 recv(rx, 0);
1407
1408 fn send(tx: Sender<Box<int>>, i: int) {
1409 if i == 10 { return }
1410
1411 thread::spawn(move|| {
1412 tx.send(box i).unwrap();
1413 send(tx, i + 1);
1414 });
1415 }
1416
1417 fn recv(rx: Receiver<Box<int>>, i: int) {
1418 if i == 10 { return }
1419
1420 thread::spawn(move|| {
1421 assert!(rx.recv().unwrap() == box i);
1422 recv(rx, i + 1);
1423 });
1424 }
1425 }
1426 }
1427
1428 #[test]
1429 fn recv_a_lot() {
1430 // Regression test that we don't run out of stack in scheduler context
1431 let (tx, rx) = channel();
1432 for _ in 0..10000 { tx.send(()).unwrap(); }
1433 for _ in 0..10000 { rx.recv().unwrap(); }
1434 }
1435
1436 #[test]
1437 fn shared_chan_stress() {
1438 let (tx, rx) = channel();
1439 let total = stress_factor() + 100;
1440 for _ in 0..total {
1441 let tx = tx.clone();
1442 thread::spawn(move|| {
1443 tx.send(()).unwrap();
1444 });
1445 }
1446
1447 for _ in 0..total {
1448 rx.recv().unwrap();
1449 }
1450 }
1451
1452 #[test]
1453 fn test_nested_recv_iter() {
1454 let (tx, rx) = channel::<int>();
1455 let (total_tx, total_rx) = channel::<int>();
1456
1457 let _t = thread::spawn(move|| {
1458 let mut acc = 0;
1459 for x in rx.iter() {
1460 acc += x;
1461 }
1462 total_tx.send(acc).unwrap();
1463 });
1464
1465 tx.send(3).unwrap();
1466 tx.send(1).unwrap();
1467 tx.send(2).unwrap();
1468 drop(tx);
1469 assert_eq!(total_rx.recv().unwrap(), 6);
1470 }
1471
1472 #[test]
1473 fn test_recv_iter_break() {
1474 let (tx, rx) = channel::<int>();
1475 let (count_tx, count_rx) = channel();
1476
1477 let _t = thread::spawn(move|| {
1478 let mut count = 0;
1479 for x in rx.iter() {
1480 if count >= 3 {
1481 break;
1482 } else {
1483 count += x;
1484 }
1485 }
1486 count_tx.send(count).unwrap();
1487 });
1488
1489 tx.send(2).unwrap();
1490 tx.send(2).unwrap();
1491 tx.send(2).unwrap();
1492 let _ = tx.send(2);
1493 drop(tx);
1494 assert_eq!(count_rx.recv().unwrap(), 4);
1495 }
1496
1497 #[test]
1498 fn try_recv_states() {
1499 let (tx1, rx1) = channel::<int>();
1500 let (tx2, rx2) = channel::<()>();
1501 let (tx3, rx3) = channel::<()>();
1502 let _t = thread::spawn(move|| {
1503 rx2.recv().unwrap();
1504 tx1.send(1).unwrap();
1505 tx3.send(()).unwrap();
1506 rx2.recv().unwrap();
1507 drop(tx1);
1508 tx3.send(()).unwrap();
1509 });
1510
1511 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1512 tx2.send(()).unwrap();
1513 rx3.recv().unwrap();
1514 assert_eq!(rx1.try_recv(), Ok(1));
1515 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1516 tx2.send(()).unwrap();
1517 rx3.recv().unwrap();
1518 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1519 }
1520
1521 // This bug used to end up in a livelock inside of the Receiver destructor
1522 // because the internal state of the Shared packet was corrupted
1523 #[test]
1524 fn destroy_upgraded_shared_port_when_sender_still_active() {
1525 let (tx, rx) = channel();
1526 let (tx2, rx2) = channel();
1527 let _t = thread::spawn(move|| {
1528 rx.recv().unwrap(); // wait on a oneshot
1529 drop(rx); // destroy a shared
1530 tx2.send(()).unwrap();
1531 });
1532 // make sure the other task has gone to sleep
1533 for _ in 0..5000 { thread::yield_now(); }
1534
1535 // upgrade to a shared chan and send a message
1536 let t = tx.clone();
1537 drop(tx);
1538 t.send(()).unwrap();
1539
1540 // wait for the child task to exit before we exit
1541 rx2.recv().unwrap();
1542 }
1543 }
1544
1545 #[cfg(test)]
1546 mod sync_tests {
1547 use prelude::v1::*;
1548
1549 use std::env;
1550 use thread;
1551 use super::*;
1552
1553 pub fn stress_factor() -> uint {
1554 match env::var("RUST_TEST_STRESS") {
1555 Ok(val) => val.parse().unwrap(),
1556 Err(..) => 1,
1557 }
1558 }
1559
1560 #[test]
1561 fn smoke() {
1562 let (tx, rx) = sync_channel::<int>(1);
1563 tx.send(1).unwrap();
1564 assert_eq!(rx.recv().unwrap(), 1);
1565 }
1566
1567 #[test]
1568 fn drop_full() {
1569 let (tx, _rx) = sync_channel(1);
1570 tx.send(box 1).unwrap();
1571 }
1572
1573 #[test]
1574 fn smoke_shared() {
1575 let (tx, rx) = sync_channel::<int>(1);
1576 tx.send(1).unwrap();
1577 assert_eq!(rx.recv().unwrap(), 1);
1578 let tx = tx.clone();
1579 tx.send(1).unwrap();
1580 assert_eq!(rx.recv().unwrap(), 1);
1581 }
1582
1583 #[test]
1584 fn smoke_threads() {
1585 let (tx, rx) = sync_channel::<int>(0);
1586 let _t = thread::spawn(move|| {
1587 tx.send(1).unwrap();
1588 });
1589 assert_eq!(rx.recv().unwrap(), 1);
1590 }
1591
1592 #[test]
1593 fn smoke_port_gone() {
1594 let (tx, rx) = sync_channel::<int>(0);
1595 drop(rx);
1596 assert!(tx.send(1).is_err());
1597 }
1598
1599 #[test]
1600 fn smoke_shared_port_gone2() {
1601 let (tx, rx) = sync_channel::<int>(0);
1602 drop(rx);
1603 let tx2 = tx.clone();
1604 drop(tx);
1605 assert!(tx2.send(1).is_err());
1606 }
1607
1608 #[test]
1609 fn port_gone_concurrent() {
1610 let (tx, rx) = sync_channel::<int>(0);
1611 let _t = thread::spawn(move|| {
1612 rx.recv().unwrap();
1613 });
1614 while tx.send(1).is_ok() {}
1615 }
1616
1617 #[test]
1618 fn port_gone_concurrent_shared() {
1619 let (tx, rx) = sync_channel::<int>(0);
1620 let tx2 = tx.clone();
1621 let _t = thread::spawn(move|| {
1622 rx.recv().unwrap();
1623 });
1624 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1625 }
1626
1627 #[test]
1628 fn smoke_chan_gone() {
1629 let (tx, rx) = sync_channel::<int>(0);
1630 drop(tx);
1631 assert!(rx.recv().is_err());
1632 }
1633
1634 #[test]
1635 fn smoke_chan_gone_shared() {
1636 let (tx, rx) = sync_channel::<()>(0);
1637 let tx2 = tx.clone();
1638 drop(tx);
1639 drop(tx2);
1640 assert!(rx.recv().is_err());
1641 }
1642
1643 #[test]
1644 fn chan_gone_concurrent() {
1645 let (tx, rx) = sync_channel::<int>(0);
1646 thread::spawn(move|| {
1647 tx.send(1).unwrap();
1648 tx.send(1).unwrap();
1649 });
1650 while rx.recv().is_ok() {}
1651 }
1652
1653 #[test]
1654 fn stress() {
1655 let (tx, rx) = sync_channel::<int>(0);
1656 thread::spawn(move|| {
1657 for _ in 0..10000 { tx.send(1).unwrap(); }
1658 });
1659 for _ in 0..10000 {
1660 assert_eq!(rx.recv().unwrap(), 1);
1661 }
1662 }
1663
1664 #[test]
1665 fn stress_shared() {
1666 static AMT: uint = 1000;
1667 static NTHREADS: uint = 8;
1668 let (tx, rx) = sync_channel::<int>(0);
1669 let (dtx, drx) = sync_channel::<()>(0);
1670
1671 thread::spawn(move|| {
1672 for _ in 0..AMT * NTHREADS {
1673 assert_eq!(rx.recv().unwrap(), 1);
1674 }
1675 match rx.try_recv() {
1676 Ok(..) => panic!(),
1677 _ => {}
1678 }
1679 dtx.send(()).unwrap();
1680 });
1681
1682 for _ in 0..NTHREADS {
1683 let tx = tx.clone();
1684 thread::spawn(move|| {
1685 for _ in 0..AMT { tx.send(1).unwrap(); }
1686 });
1687 }
1688 drop(tx);
1689 drx.recv().unwrap();
1690 }
1691
1692 #[test]
1693 fn oneshot_single_thread_close_port_first() {
1694 // Simple test of closing without sending
1695 let (_tx, rx) = sync_channel::<int>(0);
1696 drop(rx);
1697 }
1698
1699 #[test]
1700 fn oneshot_single_thread_close_chan_first() {
1701 // Simple test of closing without sending
1702 let (tx, _rx) = sync_channel::<int>(0);
1703 drop(tx);
1704 }
1705
1706 #[test]
1707 fn oneshot_single_thread_send_port_close() {
1708 // Testing that the sender cleans up the payload if receiver is closed
1709 let (tx, rx) = sync_channel::<Box<int>>(0);
1710 drop(rx);
1711 assert!(tx.send(box 0).is_err());
1712 }
1713
1714 #[test]
1715 fn oneshot_single_thread_recv_chan_close() {
1716 // Receiving on a closed chan will panic
1717 let res = thread::spawn(move|| {
1718 let (tx, rx) = sync_channel::<int>(0);
1719 drop(tx);
1720 rx.recv().unwrap();
1721 }).join();
1722 // What is our res?
1723 assert!(res.is_err());
1724 }
1725
1726 #[test]
1727 fn oneshot_single_thread_send_then_recv() {
1728 let (tx, rx) = sync_channel::<Box<int>>(1);
1729 tx.send(box 10).unwrap();
1730 assert!(rx.recv().unwrap() == box 10);
1731 }
1732
1733 #[test]
1734 fn oneshot_single_thread_try_send_open() {
1735 let (tx, rx) = sync_channel::<int>(1);
1736 assert_eq!(tx.try_send(10), Ok(()));
1737 assert!(rx.recv().unwrap() == 10);
1738 }
1739
1740 #[test]
1741 fn oneshot_single_thread_try_send_closed() {
1742 let (tx, rx) = sync_channel::<int>(0);
1743 drop(rx);
1744 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1745 }
1746
1747 #[test]
1748 fn oneshot_single_thread_try_send_closed2() {
1749 let (tx, _rx) = sync_channel::<int>(0);
1750 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1751 }
1752
1753 #[test]
1754 fn oneshot_single_thread_try_recv_open() {
1755 let (tx, rx) = sync_channel::<int>(1);
1756 tx.send(10).unwrap();
1757 assert!(rx.recv() == Ok(10));
1758 }
1759
1760 #[test]
1761 fn oneshot_single_thread_try_recv_closed() {
1762 let (tx, rx) = sync_channel::<int>(0);
1763 drop(tx);
1764 assert!(rx.recv().is_err());
1765 }
1766
1767 #[test]
1768 fn oneshot_single_thread_peek_data() {
1769 let (tx, rx) = sync_channel::<int>(1);
1770 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1771 tx.send(10).unwrap();
1772 assert_eq!(rx.try_recv(), Ok(10));
1773 }
1774
1775 #[test]
1776 fn oneshot_single_thread_peek_close() {
1777 let (tx, rx) = sync_channel::<int>(0);
1778 drop(tx);
1779 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1780 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1781 }
1782
1783 #[test]
1784 fn oneshot_single_thread_peek_open() {
1785 let (_tx, rx) = sync_channel::<int>(0);
1786 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1787 }
1788
1789 #[test]
1790 fn oneshot_multi_task_recv_then_send() {
1791 let (tx, rx) = sync_channel::<Box<int>>(0);
1792 let _t = thread::spawn(move|| {
1793 assert!(rx.recv().unwrap() == box 10);
1794 });
1795
1796 tx.send(box 10).unwrap();
1797 }
1798
1799 #[test]
1800 fn oneshot_multi_task_recv_then_close() {
1801 let (tx, rx) = sync_channel::<Box<int>>(0);
1802 let _t = thread::spawn(move|| {
1803 drop(tx);
1804 });
1805 let res = thread::spawn(move|| {
1806 assert!(rx.recv().unwrap() == box 10);
1807 }).join();
1808 assert!(res.is_err());
1809 }
1810
1811 #[test]
1812 fn oneshot_multi_thread_close_stress() {
1813 for _ in 0..stress_factor() {
1814 let (tx, rx) = sync_channel::<int>(0);
1815 let _t = thread::spawn(move|| {
1816 drop(rx);
1817 });
1818 drop(tx);
1819 }
1820 }
1821
1822 #[test]
1823 fn oneshot_multi_thread_send_close_stress() {
1824 for _ in 0..stress_factor() {
1825 let (tx, rx) = sync_channel::<int>(0);
1826 let _t = thread::spawn(move|| {
1827 drop(rx);
1828 });
1829 let _ = thread::spawn(move || {
1830 tx.send(1).unwrap();
1831 }).join();
1832 }
1833 }
1834
1835 #[test]
1836 fn oneshot_multi_thread_recv_close_stress() {
1837 for _ in 0..stress_factor() {
1838 let (tx, rx) = sync_channel::<int>(0);
1839 let _t = thread::spawn(move|| {
1840 let res = thread::spawn(move|| {
1841 rx.recv().unwrap();
1842 }).join();
1843 assert!(res.is_err());
1844 });
1845 let _t = thread::spawn(move|| {
1846 thread::spawn(move|| {
1847 drop(tx);
1848 });
1849 });
1850 }
1851 }
1852
1853 #[test]
1854 fn oneshot_multi_thread_send_recv_stress() {
1855 for _ in 0..stress_factor() {
1856 let (tx, rx) = sync_channel::<Box<int>>(0);
1857 let _t = thread::spawn(move|| {
1858 tx.send(box 10).unwrap();
1859 });
1860 assert!(rx.recv().unwrap() == box 10);
1861 }
1862 }
1863
1864 #[test]
1865 fn stream_send_recv_stress() {
1866 for _ in 0..stress_factor() {
1867 let (tx, rx) = sync_channel::<Box<int>>(0);
1868
1869 send(tx, 0);
1870 recv(rx, 0);
1871
1872 fn send(tx: SyncSender<Box<int>>, i: int) {
1873 if i == 10 { return }
1874
1875 thread::spawn(move|| {
1876 tx.send(box i).unwrap();
1877 send(tx, i + 1);
1878 });
1879 }
1880
1881 fn recv(rx: Receiver<Box<int>>, i: int) {
1882 if i == 10 { return }
1883
1884 thread::spawn(move|| {
1885 assert!(rx.recv().unwrap() == box i);
1886 recv(rx, i + 1);
1887 });
1888 }
1889 }
1890 }
1891
1892 #[test]
1893 fn recv_a_lot() {
1894 // Regression test that we don't run out of stack in scheduler context
1895 let (tx, rx) = sync_channel(10000);
1896 for _ in 0..10000 { tx.send(()).unwrap(); }
1897 for _ in 0..10000 { rx.recv().unwrap(); }
1898 }
1899
1900 #[test]
1901 fn shared_chan_stress() {
1902 let (tx, rx) = sync_channel(0);
1903 let total = stress_factor() + 100;
1904 for _ in 0..total {
1905 let tx = tx.clone();
1906 thread::spawn(move|| {
1907 tx.send(()).unwrap();
1908 });
1909 }
1910
1911 for _ in 0..total {
1912 rx.recv().unwrap();
1913 }
1914 }
1915
1916 #[test]
1917 fn test_nested_recv_iter() {
1918 let (tx, rx) = sync_channel::<int>(0);
1919 let (total_tx, total_rx) = sync_channel::<int>(0);
1920
1921 let _t = thread::spawn(move|| {
1922 let mut acc = 0;
1923 for x in rx.iter() {
1924 acc += x;
1925 }
1926 total_tx.send(acc).unwrap();
1927 });
1928
1929 tx.send(3).unwrap();
1930 tx.send(1).unwrap();
1931 tx.send(2).unwrap();
1932 drop(tx);
1933 assert_eq!(total_rx.recv().unwrap(), 6);
1934 }
1935
1936 #[test]
1937 fn test_recv_iter_break() {
1938 let (tx, rx) = sync_channel::<int>(0);
1939 let (count_tx, count_rx) = sync_channel(0);
1940
1941 let _t = thread::spawn(move|| {
1942 let mut count = 0;
1943 for x in rx.iter() {
1944 if count >= 3 {
1945 break;
1946 } else {
1947 count += x;
1948 }
1949 }
1950 count_tx.send(count).unwrap();
1951 });
1952
1953 tx.send(2).unwrap();
1954 tx.send(2).unwrap();
1955 tx.send(2).unwrap();
1956 let _ = tx.try_send(2);
1957 drop(tx);
1958 assert_eq!(count_rx.recv().unwrap(), 4);
1959 }
1960
1961 #[test]
1962 fn try_recv_states() {
1963 let (tx1, rx1) = sync_channel::<int>(1);
1964 let (tx2, rx2) = sync_channel::<()>(1);
1965 let (tx3, rx3) = sync_channel::<()>(1);
1966 let _t = thread::spawn(move|| {
1967 rx2.recv().unwrap();
1968 tx1.send(1).unwrap();
1969 tx3.send(()).unwrap();
1970 rx2.recv().unwrap();
1971 drop(tx1);
1972 tx3.send(()).unwrap();
1973 });
1974
1975 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1976 tx2.send(()).unwrap();
1977 rx3.recv().unwrap();
1978 assert_eq!(rx1.try_recv(), Ok(1));
1979 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1980 tx2.send(()).unwrap();
1981 rx3.recv().unwrap();
1982 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1983 }
1984
1985 // This bug used to end up in a livelock inside of the Receiver destructor
1986 // because the internal state of the Shared packet was corrupted
1987 #[test]
1988 fn destroy_upgraded_shared_port_when_sender_still_active() {
1989 let (tx, rx) = sync_channel::<()>(0);
1990 let (tx2, rx2) = sync_channel::<()>(0);
1991 let _t = thread::spawn(move|| {
1992 rx.recv().unwrap(); // wait on a oneshot
1993 drop(rx); // destroy a shared
1994 tx2.send(()).unwrap();
1995 });
1996 // make sure the other task has gone to sleep
1997 for _ in 0..5000 { thread::yield_now(); }
1998
1999 // upgrade to a shared chan and send a message
2000 let t = tx.clone();
2001 drop(tx);
2002 t.send(()).unwrap();
2003
2004 // wait for the child task to exit before we exit
2005 rx2.recv().unwrap();
2006 }
2007
2008 #[test]
2009 fn send1() {
2010 let (tx, rx) = sync_channel::<int>(0);
2011 let _t = thread::spawn(move|| { rx.recv().unwrap(); });
2012 assert_eq!(tx.send(1), Ok(()));
2013 }
2014
2015 #[test]
2016 fn send2() {
2017 let (tx, rx) = sync_channel::<int>(0);
2018 let _t = thread::spawn(move|| { drop(rx); });
2019 assert!(tx.send(1).is_err());
2020 }
2021
2022 #[test]
2023 fn send3() {
2024 let (tx, rx) = sync_channel::<int>(1);
2025 assert_eq!(tx.send(1), Ok(()));
2026 let _t =thread::spawn(move|| { drop(rx); });
2027 assert!(tx.send(1).is_err());
2028 }
2029
2030 #[test]
2031 fn send4() {
2032 let (tx, rx) = sync_channel::<int>(0);
2033 let tx2 = tx.clone();
2034 let (done, donerx) = channel();
2035 let done2 = done.clone();
2036 let _t = thread::spawn(move|| {
2037 assert!(tx.send(1).is_err());
2038 done.send(()).unwrap();
2039 });
2040 let _t = thread::spawn(move|| {
2041 assert!(tx2.send(2).is_err());
2042 done2.send(()).unwrap();
2043 });
2044 drop(rx);
2045 donerx.recv().unwrap();
2046 donerx.recv().unwrap();
2047 }
2048
2049 #[test]
2050 fn try_send1() {
2051 let (tx, _rx) = sync_channel::<int>(0);
2052 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2053 }
2054
2055 #[test]
2056 fn try_send2() {
2057 let (tx, _rx) = sync_channel::<int>(1);
2058 assert_eq!(tx.try_send(1), Ok(()));
2059 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
2060 }
2061
2062 #[test]
2063 fn try_send3() {
2064 let (tx, rx) = sync_channel::<int>(1);
2065 assert_eq!(tx.try_send(1), Ok(()));
2066 drop(rx);
2067 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
2068 }
2069
2070 #[test]
2071 fn issue_15761() {
2072 fn repro() {
2073 let (tx1, rx1) = sync_channel::<()>(3);
2074 let (tx2, rx2) = sync_channel::<()>(3);
2075
2076 let _t = thread::spawn(move|| {
2077 rx1.recv().unwrap();
2078 tx2.try_send(()).unwrap();
2079 });
2080
2081 tx1.try_send(()).unwrap();
2082 rx2.recv().unwrap();
2083 }
2084
2085 for _ in 0..100 {
2086 repro()
2087 }
2088 }
2089 }