]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/client/pxar_backup_stream.rs
split out pbs-runtime module
[proxmox-backup.git] / src / client / pxar_backup_stream.rs
index 4ec3bc3d74ee9ae0a20516d9ca4bb6779fc0a418..1d3fc228293163df1dd74a7455d2e73d05544cc8 100644 (file)
@@ -1,23 +1,22 @@
-use std::collections::HashSet;
 use std::io::Write;
-use std::os::unix::io::FromRawFd;
-use std::path::{Path, PathBuf};
+//use std::os::unix::io::FromRawFd;
+use std::path::Path;
 use std::pin::Pin;
 use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
-use std::thread;
 
-use failure::*;
+use anyhow::{format_err, Error};
 use futures::stream::Stream;
-
+use futures::future::{Abortable, AbortHandle};
+use nix::dir::Dir;
 use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
-use nix::dir::Dir;
 
-use crate::pxar;
 use crate::backup::CatalogWriter;
-
-use crate::tools::wrapped_reader_stream::WrappedReaderStream;
+use crate::tools::{
+    StdChannelWriter,
+    TokioWriterAdapter,
+};
 
 /// Stream implementation to encode and upload .pxar archives.
 ///
@@ -25,106 +24,104 @@ use crate::tools::wrapped_reader_stream::WrappedReaderStream;
 /// spawn an extra thread to encode the .pxar data and pipe it to the
 /// consumer.
 pub struct PxarBackupStream {
-    stream: Option<WrappedReaderStream<std::fs::File>>,
-    child: Option<thread::JoinHandle<()>>,
+    rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
+    handle: Option<AbortHandle>,
     error: Arc<Mutex<Option<String>>>,
 }
 
 impl Drop for PxarBackupStream {
-
     fn drop(&mut self) {
-        self.stream = None;
-        self.child.take().unwrap().join().unwrap();
+        self.rx = None;
+        self.handle.take().unwrap().abort();
     }
 }
 
 impl PxarBackupStream {
-    pin_utils::unsafe_pinned!(stream: Option<WrappedReaderStream<std::fs::File>>);
-
     pub fn new<W: Write + Send + 'static>(
-        mut dir: Dir,
-        path: PathBuf,
-        device_set: Option<HashSet<u64>>,
-        verbose: bool,
-        skip_lost_and_found: bool,
+        dir: Dir,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
-        entries_max: usize,
+        options: crate::pxar::PxarCreateOptions,
     ) -> Result<Self, Error> {
+        let (tx, rx) = std::sync::mpsc::sync_channel(10);
 
-        let (rx, tx) = nix::unistd::pipe()?;
-
-        let buffer_size = 1024*1024;
-        nix::fcntl::fcntl(rx, nix::fcntl::FcntlArg::F_SETPIPE_SZ(buffer_size as i32))?;
+        let buffer_size = 256 * 1024;
 
         let error = Arc::new(Mutex::new(None));
-        let error2 = error.clone();
-
-        let catalog = catalog.clone();
-        let exclude_pattern = Vec::new();
-        let child = thread::spawn(move || {
-            let mut guard = catalog.lock().unwrap();
-            let mut writer = unsafe { std::fs::File::from_raw_fd(tx) };
-            if let Err(err) = pxar::Encoder::encode(
-                path,
-                &mut dir,
-                &mut writer,
-                Some(&mut *guard),
-                device_set,
-                verbose,
-                skip_lost_and_found,
-                pxar::flags::DEFAULT,
-                exclude_pattern,
-                entries_max,
-            ) {
+        let error2 = Arc::clone(&error);
+        let handler = async move {
+            let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
+                buffer_size,
+                StdChannelWriter::new(tx),
+            ));
+
+            let verbose = options.verbose;
+
+            let writer = pxar::encoder::sync::StandardWriter::new(writer);
+            if let Err(err) = crate::pxar::create_archive(
+                dir,
+                writer,
+                crate::pxar::Flags::DEFAULT,
+                move |path| {
+                    if verbose {
+                        println!("{:?}", path);
+                    }
+                    Ok(())
+                },
+                Some(catalog),
+                options,
+            ).await {
                 let mut error = error2.lock().unwrap();
                 *error = Some(err.to_string());
             }
-        });
+        };
 
-        let pipe = unsafe { std::fs::File::from_raw_fd(rx) };
-        let stream = crate::tools::wrapped_reader_stream::WrappedReaderStream::new(pipe);
+        let (handle, registration) = AbortHandle::new_pair();
+        let future = Abortable::new(handler, registration);
+        tokio::spawn(future);
 
         Ok(Self {
-            stream: Some(stream),
-            child: Some(child),
+            rx: Some(rx),
+            handle: Some(handle),
             error,
         })
     }
 
     pub fn open<W: Write + Send + 'static>(
         dirname: &Path,
-        device_set: Option<HashSet<u64>>,
-        verbose: bool,
-        skip_lost_and_found: bool,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
-        entries_max: usize,
+        options: crate::pxar::PxarCreateOptions,
     ) -> Result<Self, Error> {
-
         let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
-        let path = std::path::PathBuf::from(dirname);
 
-        Self::new(dir, path, device_set, verbose, skip_lost_and_found, catalog, entries_max)
+        Self::new(
+            dir,
+            catalog,
+            options,
+        )
     }
 }
 
 impl Stream for PxarBackupStream {
-
     type Item = Result<Vec<u8>, Error>;
 
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
-        { // limit lock scope
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        {
+            // limit lock scope
             let error = self.error.lock().unwrap();
             if let Some(ref msg) = *error {
                 return Poll::Ready(Some(Err(format_err!("{}", msg))));
             }
         }
-        let res = self.as_mut()
-            .stream()
-            .as_pin_mut()
-            .unwrap()
-            .poll_next(cx);
-        Poll::Ready(futures::ready!(res)
-            .map(|v| v.map_err(Error::from))
-        )
+
+        match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
+            Ok(data) => Poll::Ready(Some(data)),
+            Err(_) => {
+                let error = self.error.lock().unwrap();
+                if let Some(ref msg) = *error {
+                    return Poll::Ready(Some(Err(format_err!("{}", msg))));
+                }
+                Poll::Ready(None) // channel closed, no error
+            }
+        }
     }
 }