]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/client/pxar_backup_stream.rs: switch to async
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Fri, 23 Aug 2019 12:36:51 +0000 (14:36 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 2 Sep 2019 13:21:26 +0000 (15:21 +0200)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
src/client/pxar_backup_stream.rs

index 0e837991f6b2d35fecafb3d4257f57206f19d127..884407b5ab417e03b2e7a5f2806133a1b6df3a54 100644 (file)
@@ -1,12 +1,13 @@
-use failure::*;
-use std::io::{Write, Seek};
-use std::thread;
-use std::sync::{Arc, Mutex};
+use std::collections::HashSet;
+use std::io::{Seek, Write};
 use std::os::unix::io::FromRawFd;
 use std::path::{Path, PathBuf};
-use std::collections::HashSet;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+use std::thread;
 
-use futures::Poll;
+use failure::*;
 use futures::stream::Stream;
 
 use nix::fcntl::OFlag;
@@ -38,6 +39,7 @@ impl Drop for PxarBackupStream {
 }
 
 impl PxarBackupStream {
+    pin_utils::unsafe_pinned!(stream: Option<WrappedReaderStream<std::fs::File>>);
 
     pub fn new<W: Write + Seek + Send + 'static>(
         mut dir: Dir,
@@ -93,16 +95,22 @@ impl PxarBackupStream {
 
 impl Stream for PxarBackupStream {
 
-    type Item = Vec<u8>;
-    type Error = Error;
+    type Item = Result<Vec<u8>, Error>;
 
-    fn poll(&mut self) -> Poll<Option<Vec<u8>>, Error> {
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
         { // limit lock scope
             let error = self.error.lock().unwrap();
             if let Some(ref msg) = *error {
-                return Err(format_err!("{}", msg));
+                return Poll::Ready(Some(Err(format_err!("{}", msg))));
             }
         }
-        self.stream.as_mut().unwrap().poll().map_err(Error::from)
+        let res = self.as_mut()
+            .stream()
+            .as_pin_mut()
+            .unwrap()
+            .poll_next(cx);
+        Poll::Ready(futures::ready!(res)
+            .map(|v| v.map_err(Error::from))
+        )
     }
 }