]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/client/pxar_backup_stream.rs
reader: acquire shared flock on open snapshot
[proxmox-backup.git] / src / client / pxar_backup_stream.rs
index 7c739aa094a7186d8ab2353a9e528d633ef52690..aa3355feb48c2cf315c087dd3d2728b66a6d1622 100644 (file)
@@ -1,20 +1,21 @@
-use failure::*;
-
-use std::thread;
-use std::sync::{Arc, Mutex};
-use std::os::unix::io::FromRawFd;
-use std::path::{Path, PathBuf};
 use std::collections::HashSet;
+use std::io::Write;
+//use std::os::unix::io::FromRawFd;
+use std::path::Path;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+use std::thread;
 
-use futures::Poll;
+use anyhow::{format_err, Error};
 use futures::stream::Stream;
-
+use nix::dir::Dir;
 use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
-use nix::dir::Dir;
 
-use crate::pxar;
-use crate::tools::wrapped_reader_stream::WrappedReaderStream;
+use pathpatterns::MatchEntry;
+
+use crate::backup::CatalogWriter;
 
 /// Stream implementation to encode and upload .pxar archives.
 ///
@@ -22,68 +23,118 @@ use crate::tools::wrapped_reader_stream::WrappedReaderStream;
 /// spawn an extra thread to encode the .pxar data and pipe it to the
 /// consumer.
 pub struct PxarBackupStream {
-    stream: Option<WrappedReaderStream<std::fs::File>>,
+    rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
     child: Option<thread::JoinHandle<()>>,
     error: Arc<Mutex<Option<String>>>,
 }
 
 impl Drop for PxarBackupStream {
-
     fn drop(&mut self) {
-        self.stream = None;
+        self.rx = None;
         self.child.take().unwrap().join().unwrap();
     }
 }
 
 impl PxarBackupStream {
-
-    pub fn new(mut dir: Dir, path: PathBuf, device_set: Option<HashSet<u64>>, verbose: bool) -> Result<Self, Error> {
-
-        let (rx, tx) = nix::unistd::pipe()?;
-
-        let buffer_size = 1024*1024;
-        nix::fcntl::fcntl(rx, nix::fcntl::FcntlArg::F_SETPIPE_SZ(buffer_size as i32))?;
+    pub fn new<W: Write + Send + 'static>(
+        dir: Dir,
+        device_set: Option<HashSet<u64>>,
+        verbose: bool,
+        skip_lost_and_found: bool,
+        catalog: Arc<Mutex<CatalogWriter<W>>>,
+        patterns: Vec<MatchEntry>,
+        entries_max: usize,
+    ) -> Result<Self, Error> {
+        let (tx, rx) = std::sync::mpsc::sync_channel(10);
+
+        let buffer_size = 256 * 1024;
 
         let error = Arc::new(Mutex::new(None));
-        let error2 = error.clone();
-
-        let child = thread::spawn(move|| {
-            let mut writer = unsafe { std::fs::File::from_raw_fd(tx) };
-            if let Err(err) = pxar::Encoder::encode(path, &mut dir, &mut writer, device_set, verbose, pxar::CA_FORMAT_DEFAULT) {
-                let mut error = error2.lock().unwrap();
-                *error = Some(err.to_string());
-            }
-        });
-
-        let pipe = unsafe { std::fs::File::from_raw_fd(rx) };
-        let stream = crate::tools::wrapped_reader_stream::WrappedReaderStream::new(pipe);
+        let child = std::thread::Builder::new()
+            .name("PxarBackupStream".to_string())
+            .spawn({
+                let error = Arc::clone(&error);
+                move || {
+                    let mut catalog_guard = catalog.lock().unwrap();
+                    let writer = std::io::BufWriter::with_capacity(
+                        buffer_size,
+                        crate::tools::StdChannelWriter::new(tx),
+                    );
+
+                    let writer = pxar::encoder::sync::StandardWriter::new(writer);
+                    if let Err(err) = crate::pxar::create_archive(
+                        dir,
+                        writer,
+                        patterns,
+                        crate::pxar::Flags::DEFAULT,
+                        device_set,
+                        skip_lost_and_found,
+                        |path| {
+                            if verbose {
+                                println!("{:?}", path);
+                            }
+                            Ok(())
+                        },
+                        entries_max,
+                        Some(&mut *catalog_guard),
+                    ) {
+                        let mut error = error.lock().unwrap();
+                        *error = Some(err.to_string());
+                    }
+                }
+            })?;
 
         Ok(Self {
-            stream: Some(stream),
+            rx: Some(rx),
             child: Some(child),
             error,
         })
     }
 
-    pub fn open(dirname: &Path, device_set: Option<HashSet<u64>>, verbose: bool) -> Result<Self, Error> {
-
+    pub fn open<W: Write + Send + 'static>(
+        dirname: &Path,
+        device_set: Option<HashSet<u64>>,
+        verbose: bool,
+        skip_lost_and_found: bool,
+        catalog: Arc<Mutex<CatalogWriter<W>>>,
+        patterns: Vec<MatchEntry>,
+        entries_max: usize,
+    ) -> Result<Self, Error> {
         let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
-        let path = std::path::PathBuf::from(dirname);
 
-        Self::new(dir, path, device_set, verbose)
+        Self::new(
+            dir,
+            device_set,
+            verbose,
+            skip_lost_and_found,
+            catalog,
+            patterns,
+            entries_max,
+        )
     }
 }
 
 impl Stream for PxarBackupStream {
+    type Item = Result<Vec<u8>, Error>;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        {
+            // limit lock scope
+            let error = self.error.lock().unwrap();
+            if let Some(ref msg) = *error {
+                return Poll::Ready(Some(Err(format_err!("{}", msg))));
+            }
+        }
 
-    type Item = Vec<u8>;
-    type Error = Error;
-
-    fn poll(&mut self) -> Poll<Option<Vec<u8>>, Error> {
-        let error = self.error.lock().unwrap();
-        if let Some(ref msg) = *error {
-            return Err(format_err!("{}", msg));
+        match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
+            Ok(data) => Poll::Ready(Some(data)),
+            Err(_) => {
+                let error = self.error.lock().unwrap();
+                if let Some(ref msg) = *error {
+                    return Poll::Ready(Some(Err(format_err!("{}", msg))));
+                }
+                Poll::Ready(None) // channel closed, no error
+            }
         }
-        self.stream.as_mut().unwrap().poll().map_err(Error::from)
     }
 }