]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/client/pxar_backup_stream.rs
split out pbs-runtime module
[proxmox-backup.git] / src / client / pxar_backup_stream.rs
index 8258229f85a9bf91329e5fd34438a2c7b1021902..1d3fc228293163df1dd74a7455d2e73d05544cc8 100644 (file)
@@ -1,18 +1,22 @@
-use failure::*;
-
-use std::thread;
-use std::os::unix::io::FromRawFd;
-use std::path::{Path, PathBuf};
-
-use futures::Poll;
+use std::io::Write;
+//use std::os::unix::io::FromRawFd;
+use std::path::Path;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use anyhow::{format_err, Error};
 use futures::stream::Stream;
-
+use futures::future::{Abortable, AbortHandle};
+use nix::dir::Dir;
 use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
-use nix::dir::Dir;
 
-use crate::pxar;
-use crate::tools::wrapped_reader_stream::WrappedReaderStream;
+use crate::backup::CatalogWriter;
+use crate::tools::{
+    StdChannelWriter,
+    TokioWriterAdapter,
+};
 
 /// Stream implementation to encode and upload .pxar archives.
 ///
@@ -20,55 +24,104 @@ use crate::tools::wrapped_reader_stream::WrappedReaderStream;
 /// spawn an extra thread to encode the .pxar data and pipe it to the
 /// consumer.
 pub struct PxarBackupStream {
-    stream: Option<WrappedReaderStream<std::fs::File>>,
-    child: Option<thread::JoinHandle<()>>,
+    rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
+    handle: Option<AbortHandle>,
+    error: Arc<Mutex<Option<String>>>,
 }
 
 impl Drop for PxarBackupStream {
-
     fn drop(&mut self) {
-        self.stream = None;
-        self.child.take().unwrap().join().unwrap();
+        self.rx = None;
+        self.handle.take().unwrap().abort();
     }
 }
 
 impl PxarBackupStream {
-
-    pub fn new(mut dir: Dir, path: PathBuf, all_file_systems: bool, verbose: bool) -> Result<Self, Error> {
-
-        let (rx, tx) = nix::unistd::pipe()?;
-
-        let buffer_size = 1024*1024;
-        nix::fcntl::fcntl(rx, nix::fcntl::FcntlArg::F_SETPIPE_SZ(buffer_size as i32))?;
-
-        let child = thread::spawn(move|| {
-            let mut writer = unsafe { std::fs::File::from_raw_fd(tx) };
-            if let Err(err) = pxar::Encoder::encode(path, &mut dir, &mut writer, all_file_systems, verbose) {
-                eprintln!("pxar encode failed - {}", err);
+    pub fn new<W: Write + Send + 'static>(
+        dir: Dir,
+        catalog: Arc<Mutex<CatalogWriter<W>>>,
+        options: crate::pxar::PxarCreateOptions,
+    ) -> Result<Self, Error> {
+        let (tx, rx) = std::sync::mpsc::sync_channel(10);
+
+        let buffer_size = 256 * 1024;
+
+        let error = Arc::new(Mutex::new(None));
+        let error2 = Arc::clone(&error);
+        let handler = async move {
+            let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
+                buffer_size,
+                StdChannelWriter::new(tx),
+            ));
+
+            let verbose = options.verbose;
+
+            let writer = pxar::encoder::sync::StandardWriter::new(writer);
+            if let Err(err) = crate::pxar::create_archive(
+                dir,
+                writer,
+                crate::pxar::Flags::DEFAULT,
+                move |path| {
+                    if verbose {
+                        println!("{:?}", path);
+                    }
+                    Ok(())
+                },
+                Some(catalog),
+                options,
+            ).await {
+                let mut error = error2.lock().unwrap();
+                *error = Some(err.to_string());
             }
-        });
+        };
 
-        let pipe = unsafe { std::fs::File::from_raw_fd(rx) };
-        let stream = crate::tools::wrapped_reader_stream::WrappedReaderStream::new(pipe);
+        let (handle, registration) = AbortHandle::new_pair();
+        let future = Abortable::new(handler, registration);
+        tokio::spawn(future);
 
-        Ok(Self { stream: Some(stream), child: Some(child) })
+        Ok(Self {
+            rx: Some(rx),
+            handle: Some(handle),
+            error,
+        })
     }
 
-    pub fn open(dirname: &Path,  all_file_systems: bool, verbose: bool) -> Result<Self, Error> {
-
+    pub fn open<W: Write + Send + 'static>(
+        dirname: &Path,
+        catalog: Arc<Mutex<CatalogWriter<W>>>,
+        options: crate::pxar::PxarCreateOptions,
+    ) -> Result<Self, Error> {
         let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
-        let path = std::path::PathBuf::from(dirname);
 
-        Self::new(dir, path, all_file_systems, verbose)
+        Self::new(
+            dir,
+            catalog,
+            options,
+        )
     }
 }
 
 impl Stream for PxarBackupStream {
-
-    type Item = Vec<u8>;
-    type Error = Error;
-
-    fn poll(&mut self) -> Poll<Option<Vec<u8>>, Error> {
-        self.stream.as_mut().unwrap().poll().map_err(Error::from)
+    type Item = Result<Vec<u8>, Error>;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        {
+            // limit lock scope
+            let error = self.error.lock().unwrap();
+            if let Some(ref msg) = *error {
+                return Poll::Ready(Some(Err(format_err!("{}", msg))));
+            }
+        }
+
+        match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
+            Ok(data) => Poll::Ready(Some(data)),
+            Err(_) => {
+                let error = self.error.lock().unwrap();
+                if let Some(ref msg) = *error {
+                    return Poll::Ready(Some(Err(format_err!("{}", msg))));
+                }
+                Poll::Ready(None) // channel closed, no error
+            }
+        }
     }
 }