]> git.proxmox.com Git - rustc.git/blobdiff - src/libstd/sync/mpsc/mod.rs
New upstream version 1.12.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mod.rs
index 34bc210b3c8239eb3bfef6261653a5a35a610a82..d8b8c6a77a26629510b33d4f5b58f5e772bc3e3b 100644 (file)
@@ -311,6 +311,17 @@ pub struct Iter<'a, T: 'a> {
     rx: &'a Receiver<T>
 }
 
+/// An iterator that attempts to yield all pending values for a receiver.
+/// `None` will be returned when there are no pending values remaining or
+/// if the corresponding channel has hung up.
+///
+/// This Iterator will never block the caller in order to wait for data to
+/// become available. Instead, it will return `None`.
+#[unstable(feature = "receiver_try_iter", issue = "34931")]
+pub struct TryIter<'a, T: 'a> {
+    rx: &'a Receiver<T>
+}
+
 /// An owning iterator over messages on a receiver, this iterator will block
 /// whenever `next` is called, waiting for a new message, and `None` will be
 /// returned when the corresponding channel has hung up.
@@ -383,13 +394,15 @@ pub enum TryRecvError {
 /// This enumeration is the list of possible errors that `recv_timeout` could
 /// not return data when called.
 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
-#[unstable(feature = "mpsc_recv_timeout", issue = "34029")]
+#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
 pub enum RecvTimeoutError {
     /// This channel is currently empty, but the sender(s) have not yet
     /// disconnected, so data may yet become available.
+    #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
     Timeout,
     /// This channel's sending half has become disconnected, and there will
     /// never be any more data received on this channel
+    #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
     Disconnected,
 }
 
@@ -901,8 +914,6 @@ impl<T> Receiver<T> {
     /// # Examples
     ///
     /// ```no_run
-    /// #![feature(mpsc_recv_timeout)]
-    ///
     /// use std::sync::mpsc::{self, RecvTimeoutError};
     /// use std::time::Duration;
     ///
@@ -911,7 +922,7 @@ impl<T> Receiver<T> {
     /// let timeout = Duration::from_millis(100);
     /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout));
     /// ```
-    #[unstable(feature = "mpsc_recv_timeout", issue = "34029")]
+    #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
         // Do an optimistic try_recv to avoid the performance impact of
         // Instant::now() in the full-channel case.
@@ -982,6 +993,16 @@ impl<T> Receiver<T> {
     pub fn iter(&self) -> Iter<T> {
         Iter { rx: self }
     }
+
+    /// Returns an iterator that will attempt to yield all pending values.
+    /// It will return `None` if there are no more pending values or if the
+    /// channel has hung up. The iterator will never `panic!` or block the
+    /// user by waiting for values.
+    #[unstable(feature = "receiver_try_iter", issue = "34931")]
+    pub fn try_iter(&self) -> TryIter<T> {
+        TryIter { rx: self }
+    }
+
 }
 
 impl<T> select::Packet for Receiver<T> {
@@ -1077,6 +1098,13 @@ impl<'a, T> Iterator for Iter<'a, T> {
     fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
 }
 
+#[unstable(feature = "receiver_try_iter", issue = "34931")]
+impl<'a, T> Iterator for TryIter<'a, T> {
+    type Item = T;
+
+    fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
+}
+
 #[stable(feature = "receiver_into_iter", since = "1.1.0")]
 impl<'a, T> IntoIterator for &'a Receiver<T> {
     type Item = T;
@@ -1814,6 +1842,34 @@ mod tests {
         assert_eq!(count_rx.recv().unwrap(), 4);
     }
 
+    #[test]
+    fn test_recv_try_iter() {
+        let (request_tx, request_rx) = channel();
+        let (response_tx, response_rx) = channel();
+
+        // Request `x`s until we have `6`.
+        let t = thread::spawn(move|| {
+            let mut count = 0;
+            loop {
+                for x in response_rx.try_iter() {
+                    count += x;
+                    if count == 6 {
+                        return count;
+                    }
+                }
+                request_tx.send(()).unwrap();
+            }
+        });
+
+        for _ in request_rx.iter() {
+            if response_tx.send(2).is_err() {
+                break;
+            }
+        }
+
+        assert_eq!(t.join().unwrap(), 6);
+    }
+
     #[test]
     fn test_recv_into_iter_owned() {
         let mut iter = {
@@ -2180,6 +2236,15 @@ mod sync_tests {
         assert!(rx.recv().is_err());
     }
 
+    #[test]
+    fn oneshot_single_thread_try_recv_closed_with_data() {
+        let (tx, rx) = sync_channel::<i32>(1);
+        tx.send(10).unwrap();
+        drop(tx);
+        assert_eq!(rx.try_recv(), Ok(10));
+        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+    }
+
     #[test]
     fn oneshot_single_thread_peek_data() {
         let (tx, rx) = sync_channel::<i32>(1);