]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/tools/wrapped_reader_stream.rs
src/bin/proxmox-backup-client.rs: use a std channel to write the catalog
[proxmox-backup.git] / src / tools / wrapped_reader_stream.rs
index 100169a6d110f77474d91cb37a6fc4279e20e8a3..267a0badf9aae4db90cc9128d1a9a1ad3cae0b45 100644 (file)
@@ -1,10 +1,14 @@
 use std::io::{self, Read};
 use std::pin::Pin;
 use std::task::{Context, Poll};
+use std::sync::mpsc::Receiver;
+
 
-use tokio_executor::threadpool::blocking;
 use futures::stream::Stream;
 
+use crate::tools::runtime::block_in_place;
+
+/// Wrapper struct to convert a Reader into a Stream
 pub struct WrappedReaderStream<R: Read + Unpin> {
     reader: R,
     buffer: Vec<u8>,
@@ -24,8 +28,8 @@ impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
 
     fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
         let this = self.get_mut();
-        match blocking(|| this.reader.read(&mut this.buffer)) {
-            Poll::Ready(Ok(Ok(n))) => {
+        match block_in_place(|| this.reader.read(&mut this.buffer)) {
+            Ok(n) => {
                 if n == 0 {
                     // EOF
                     Poll::Ready(None)
@@ -33,12 +37,22 @@ impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
                     Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
                 }
             }
-            Poll::Ready(Ok(Err(err))) => Poll::Ready(Some(Err(err))),
-            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(io::Error::new(
-                io::ErrorKind::Other,
-                err.to_string(),
-            )))),
-            Poll::Pending => Poll::Pending,
+            Err(err) => Poll::Ready(Some(Err(err))),
+        }
+    }
+}
+
+
+/// Wrapper struct to convert a channel Receiver into a Stream
+pub struct StdChannelStream<T>(pub Receiver<T>);
+
+impl<T> Stream for StdChannelStream<T> {
+    type Item = T;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        match block_in_place(|| self.0.recv()) {
+            Ok(data) => Poll::Ready(Some(data)),
+            Err(_) => Poll::Ready(None),// channel closed
         }
     }
 }
@@ -52,7 +66,6 @@ mod test {
 
     #[test]
     fn test_wrapped_stream_reader() -> Result<(), Error> {
-        // This cannot be used currently, because it doesn't permit blocking() annotations:
         crate::tools::runtime::main(async {
             run_wrapped_stream_reader_test().await
         })