]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/tools/wrapped_reader_stream.rs
src/bin/proxmox-backup-client.rs: use a std channel to write the catalog
[proxmox-backup.git] / src / tools / wrapped_reader_stream.rs
index 77611612e395dbf7311e29bfe39e52eca2f47d1f..267a0badf9aae4db90cc9128d1a9a1ad3cae0b45 100644 (file)
@@ -1,15 +1,20 @@
-use failure::*;
-use tokio_threadpool;
-use std::io::Read;
-use futures::Async;
+use std::io::{self, Read};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::sync::mpsc::Receiver;
+
+
 use futures::stream::Stream;
 
-pub struct WrappedReaderStream<R: Read> {
+use crate::tools::runtime::block_in_place;
+
+/// Wrapper struct to convert a Reader into a Stream
+pub struct WrappedReaderStream<R: Read + Unpin> {
     reader: R,
     buffer: Vec<u8>,
 }
 
-impl <R: Read> WrappedReaderStream<R> {
+impl <R: Read + Unpin> WrappedReaderStream<R> {
 
     pub fn new(reader: R) -> Self {
         let mut buffer = Vec::with_capacity(64*1024);
@@ -18,29 +23,77 @@ impl <R: Read> WrappedReaderStream<R> {
     }
 }
 
-fn blocking_err() -> std::io::Error {
-    std::io::Error::new(
-        std::io::ErrorKind::Other,
-        "`blocking` annotated I/O must be called from the context of the Tokio runtime.")
+impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
+    type Item = Result<Vec<u8>, io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+        match block_in_place(|| this.reader.read(&mut this.buffer)) {
+            Ok(n) => {
+                if n == 0 {
+                    // EOF
+                    Poll::Ready(None)
+                } else {
+                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
+                }
+            }
+            Err(err) => Poll::Ready(Some(Err(err))),
+        }
+    }
 }
 
-impl <R: Read> Stream for WrappedReaderStream<R> {
 
-    type Item = Vec<u8>;
-    type Error = std::io::Error;
+/// Wrapper struct to convert a channel Receiver into a Stream
+pub struct StdChannelStream<T>(pub Receiver<T>);
 
-    fn poll(&mut self) -> Result<Async<Option<Vec<u8>>>, std::io::Error> {
-        match tokio_threadpool::blocking(|| self.reader.read(&mut self.buffer)) {
-            Ok(Async::Ready(Ok(n))) => {
-                 if n == 0 { // EOF
-                    Ok(Async::Ready(None))
-                } else {
-                    Ok(Async::Ready(Some(self.buffer[..n].to_vec())))
-                }
-            },
-            Ok(Async::Ready(Err(err))) => Err(err),
-            Ok(Async::NotReady) => Ok(Async::NotReady),
-            Err(err) => Err(blocking_err()),
+impl<T> Stream for StdChannelStream<T> {
+    type Item = T;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        match block_in_place(|| self.0.recv()) {
+            Ok(data) => Poll::Ready(Some(data)),
+            Err(_) => Poll::Ready(None),// channel closed
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::io;
+
+    use failure::Error;
+    use futures::stream::TryStreamExt;
+
+    #[test]
+    fn test_wrapped_stream_reader() -> Result<(), Error> {
+        crate::tools::runtime::main(async {
+            run_wrapped_stream_reader_test().await
+        })
+    }
+
+    struct DummyReader(usize);
+
+    impl io::Read for DummyReader {
+        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+            self.0 += 1;
+
+            if self.0 >= 10 {
+                return Ok(0);
+            }
+
+            unsafe {
+                std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
+            }
+
+            Ok(buf.len())
+        }
+    }
+
+    async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
+        let mut reader = super::WrappedReaderStream::new(DummyReader(0));
+        while let Some(_data) = reader.try_next().await? {
+            // just waiting
         }
+        Ok(())
     }
 }