]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/pxar_backup_stream.rs
pxar: remove unused parameter
[proxmox-backup.git] / src / client / pxar_backup_stream.rs
CommitLineData
369a87e3 1use std::collections::HashSet;
bf6e3217 2use std::io::Write;
02141b4d 3//use std::os::unix::io::FromRawFd;
254ec194 4use std::path::Path;
369a87e3
WB
5use std::pin::Pin;
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll};
8use std::thread;
e8edbbd4 9
f7d4e4b5 10use anyhow::{format_err, Error};
e8edbbd4 11use futures::stream::Stream;
c443f58b 12use nix::dir::Dir;
e8edbbd4
DM
13use nix::fcntl::OFlag;
14use nix::sys::stat::Mode;
15
c443f58b
WB
16use pathpatterns::MatchEntry;
17
bf6e3217 18use crate::backup::CatalogWriter;
9d135fe6 19
8968258b 20/// Stream implementation to encode and upload .pxar archives.
151c6ce2
DM
21///
22/// The hyper client needs an async Stream for file upload, so we
8968258b 23/// spawn an extra thread to encode the .pxar data and pipe it to the
151c6ce2 24/// consumer.
8968258b 25pub struct PxarBackupStream {
02141b4d 26 rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
e8edbbd4 27 child: Option<thread::JoinHandle<()>>,
5be106ee 28 error: Arc<Mutex<Option<String>>>,
e8edbbd4
DM
29}
30
8968258b 31impl Drop for PxarBackupStream {
e8edbbd4 32 fn drop(&mut self) {
02141b4d 33 self.rx = None;
e8edbbd4
DM
34 self.child.take().unwrap().join().unwrap();
35 }
36}
37
8968258b 38impl PxarBackupStream {
bf6e3217 39 pub fn new<W: Write + Send + 'static>(
c443f58b 40 dir: Dir,
2761d6a4 41 device_set: Option<HashSet<u64>>,
97bbd1bf 42 verbose: bool,
2761d6a4 43 skip_lost_and_found: bool,
bf6e3217 44 catalog: Arc<Mutex<CatalogWriter<W>>>,
239e49f9 45 patterns: Vec<MatchEntry>,
6fc053ed 46 entries_max: usize,
2761d6a4 47 ) -> Result<Self, Error> {
02141b4d 48 let (tx, rx) = std::sync::mpsc::sync_channel(10);
e8edbbd4 49
c443f58b 50 let buffer_size = 256 * 1024;
c6e28b66 51
5be106ee 52 let error = Arc::new(Mutex::new(None));
c443f58b
WB
53 let child = std::thread::Builder::new()
54 .name("PxarBackupStream".to_string())
55 .spawn({
56 let error = Arc::clone(&error);
57 move || {
58 let mut catalog_guard = catalog.lock().unwrap();
59 let writer = std::io::BufWriter::with_capacity(
60 buffer_size,
61 crate::tools::StdChannelWriter::new(tx),
62 );
63
64 let writer = pxar::encoder::sync::StandardWriter::new(writer);
65 if let Err(err) = crate::pxar::create_archive(
66 dir,
67 writer,
239e49f9 68 patterns,
5444fa94 69 crate::pxar::Flags::DEFAULT,
c443f58b
WB
70 device_set,
71 skip_lost_and_found,
97bbd1bf
WB
72 |path| {
73 if verbose {
74 println!("{:?}", path);
75 }
76 Ok(())
77 },
c443f58b
WB
78 entries_max,
79 Some(&mut *catalog_guard),
80 ) {
81 let mut error = error.lock().unwrap();
82 *error = Some(err.to_string());
83 }
84 }
85 })?;
e8edbbd4 86
5be106ee 87 Ok(Self {
02141b4d 88 rx: Some(rx),
5be106ee
DM
89 child: Some(child),
90 error,
91 })
e8edbbd4 92 }
23bb8780 93
bf6e3217 94 pub fn open<W: Write + Send + 'static>(
2761d6a4
DM
95 dirname: &Path,
96 device_set: Option<HashSet<u64>>,
97 verbose: bool,
98 skip_lost_and_found: bool,
bf6e3217 99 catalog: Arc<Mutex<CatalogWriter<W>>>,
239e49f9 100 patterns: Vec<MatchEntry>,
6fc053ed 101 entries_max: usize,
2761d6a4 102 ) -> Result<Self, Error> {
728797d0 103 let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
23bb8780 104
c443f58b
WB
105 Self::new(
106 dir,
c443f58b
WB
107 device_set,
108 verbose,
109 skip_lost_and_found,
110 catalog,
239e49f9 111 patterns,
c443f58b
WB
112 entries_max,
113 )
23bb8780 114 }
e8edbbd4
DM
115}
116
8968258b 117impl Stream for PxarBackupStream {
369a87e3 118 type Item = Result<Vec<u8>, Error>;
e8edbbd4 119
02141b4d 120 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
c443f58b
WB
121 {
122 // limit lock scope
6c3c9bce
DM
123 let error = self.error.lock().unwrap();
124 if let Some(ref msg) = *error {
369a87e3 125 return Poll::Ready(Some(Err(format_err!("{}", msg))));
6c3c9bce 126 }
5be106ee 127 }
02141b4d
DM
128
129 match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
130 Ok(data) => Poll::Ready(Some(data)),
131 Err(_) => {
132 let error = self.error.lock().unwrap();
133 if let Some(ref msg) = *error {
134 return Poll::Ready(Some(Err(format_err!("{}", msg))));
135 }
136 Poll::Ready(None) // channel closed, no error
137 }
138 }
e8edbbd4
DM
139 }
140}