]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/pxar_backup_stream.rs
pxar: create .pxarexclude-cli file
[proxmox-backup.git] / src / client / pxar_backup_stream.rs
CommitLineData
369a87e3 1use std::collections::HashSet;
bf6e3217 2use std::io::Write;
02141b4d 3//use std::os::unix::io::FromRawFd;
17d6979a 4use std::path::{Path, PathBuf};
369a87e3
WB
5use std::pin::Pin;
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll};
8use std::thread;
e8edbbd4 9
f7d4e4b5 10use anyhow::{format_err, Error};
e8edbbd4 11use futures::stream::Stream;
c443f58b 12use nix::dir::Dir;
e8edbbd4
DM
13use nix::fcntl::OFlag;
14use nix::sys::stat::Mode;
15
c443f58b
WB
16use pathpatterns::MatchEntry;
17
bf6e3217 18use crate::backup::CatalogWriter;
9d135fe6 19
8968258b 20/// Stream implementation to encode and upload .pxar archives.
151c6ce2
DM
21///
22/// The hyper client needs an async Stream for file upload, so we
8968258b 23/// spawn an extra thread to encode the .pxar data and pipe it to the
151c6ce2 24/// consumer.
8968258b 25pub struct PxarBackupStream {
02141b4d 26 rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
e8edbbd4 27 child: Option<thread::JoinHandle<()>>,
5be106ee 28 error: Arc<Mutex<Option<String>>>,
e8edbbd4
DM
29}
30
8968258b 31impl Drop for PxarBackupStream {
e8edbbd4 32 fn drop(&mut self) {
02141b4d 33 self.rx = None;
e8edbbd4
DM
34 self.child.take().unwrap().join().unwrap();
35 }
36}
37
8968258b 38impl PxarBackupStream {
bf6e3217 39 pub fn new<W: Write + Send + 'static>(
c443f58b
WB
40 dir: Dir,
41 _path: PathBuf,
2761d6a4 42 device_set: Option<HashSet<u64>>,
c443f58b 43 _verbose: bool,
2761d6a4 44 skip_lost_and_found: bool,
bf6e3217 45 catalog: Arc<Mutex<CatalogWriter<W>>>,
239e49f9 46 patterns: Vec<MatchEntry>,
6fc053ed 47 entries_max: usize,
2761d6a4 48 ) -> Result<Self, Error> {
02141b4d 49 let (tx, rx) = std::sync::mpsc::sync_channel(10);
e8edbbd4 50
c443f58b 51 let buffer_size = 256 * 1024;
c6e28b66 52
5be106ee 53 let error = Arc::new(Mutex::new(None));
c443f58b
WB
54 let child = std::thread::Builder::new()
55 .name("PxarBackupStream".to_string())
56 .spawn({
57 let error = Arc::clone(&error);
58 move || {
59 let mut catalog_guard = catalog.lock().unwrap();
60 let writer = std::io::BufWriter::with_capacity(
61 buffer_size,
62 crate::tools::StdChannelWriter::new(tx),
63 );
64
65 let writer = pxar::encoder::sync::StandardWriter::new(writer);
66 if let Err(err) = crate::pxar::create_archive(
67 dir,
68 writer,
239e49f9 69 patterns,
c443f58b
WB
70 crate::pxar::flags::DEFAULT,
71 device_set,
72 skip_lost_and_found,
73 |_| Ok(()),
74 entries_max,
75 Some(&mut *catalog_guard),
76 ) {
77 let mut error = error.lock().unwrap();
78 *error = Some(err.to_string());
79 }
80 }
81 })?;
e8edbbd4 82
5be106ee 83 Ok(Self {
02141b4d 84 rx: Some(rx),
5be106ee
DM
85 child: Some(child),
86 error,
87 })
e8edbbd4 88 }
23bb8780 89
bf6e3217 90 pub fn open<W: Write + Send + 'static>(
2761d6a4
DM
91 dirname: &Path,
92 device_set: Option<HashSet<u64>>,
93 verbose: bool,
94 skip_lost_and_found: bool,
bf6e3217 95 catalog: Arc<Mutex<CatalogWriter<W>>>,
239e49f9 96 patterns: Vec<MatchEntry>,
6fc053ed 97 entries_max: usize,
2761d6a4 98 ) -> Result<Self, Error> {
728797d0 99 let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
23bb8780
DM
100 let path = std::path::PathBuf::from(dirname);
101
c443f58b
WB
102 Self::new(
103 dir,
104 path,
105 device_set,
106 verbose,
107 skip_lost_and_found,
108 catalog,
239e49f9 109 patterns,
c443f58b
WB
110 entries_max,
111 )
23bb8780 112 }
e8edbbd4
DM
113}
114
8968258b 115impl Stream for PxarBackupStream {
369a87e3 116 type Item = Result<Vec<u8>, Error>;
e8edbbd4 117
02141b4d 118 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
c443f58b
WB
119 {
120 // limit lock scope
6c3c9bce
DM
121 let error = self.error.lock().unwrap();
122 if let Some(ref msg) = *error {
369a87e3 123 return Poll::Ready(Some(Err(format_err!("{}", msg))));
6c3c9bce 124 }
5be106ee 125 }
02141b4d
DM
126
127 match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
128 Ok(data) => Poll::Ready(Some(data)),
129 Err(_) => {
130 let error = self.error.lock().unwrap();
131 if let Some(ref msg) = *error {
132 return Poll::Ready(Some(Err(format_err!("{}", msg))));
133 }
134 Poll::Ready(None) // channel closed, no error
135 }
136 }
e8edbbd4
DM
137 }
138}