]> git.proxmox.com Git - proxmox-backup.git/blame_incremental - src/client/pxar_backup_stream.rs
pxar: create .pxarexclude-cli file
[proxmox-backup.git] / src / client / pxar_backup_stream.rs
... / ...
CommitLineData
1use std::collections::HashSet;
2use std::io::Write;
3//use std::os::unix::io::FromRawFd;
4use std::path::{Path, PathBuf};
5use std::pin::Pin;
6use std::sync::{Arc, Mutex};
7use std::task::{Context, Poll};
8use std::thread;
9
10use anyhow::{format_err, Error};
11use futures::stream::Stream;
12use nix::dir::Dir;
13use nix::fcntl::OFlag;
14use nix::sys::stat::Mode;
15
16use pathpatterns::MatchEntry;
17
18use crate::backup::CatalogWriter;
19
20/// Stream implementation to encode and upload .pxar archives.
21///
22/// The hyper client needs an async Stream for file upload, so we
23/// spawn an extra thread to encode the .pxar data and pipe it to the
24/// consumer.
25pub struct PxarBackupStream {
26 rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
27 child: Option<thread::JoinHandle<()>>,
28 error: Arc<Mutex<Option<String>>>,
29}
30
31impl Drop for PxarBackupStream {
32 fn drop(&mut self) {
33 self.rx = None;
34 self.child.take().unwrap().join().unwrap();
35 }
36}
37
38impl PxarBackupStream {
39 pub fn new<W: Write + Send + 'static>(
40 dir: Dir,
41 _path: PathBuf,
42 device_set: Option<HashSet<u64>>,
43 _verbose: bool,
44 skip_lost_and_found: bool,
45 catalog: Arc<Mutex<CatalogWriter<W>>>,
46 patterns: Vec<MatchEntry>,
47 entries_max: usize,
48 ) -> Result<Self, Error> {
49 let (tx, rx) = std::sync::mpsc::sync_channel(10);
50
51 let buffer_size = 256 * 1024;
52
53 let error = Arc::new(Mutex::new(None));
54 let child = std::thread::Builder::new()
55 .name("PxarBackupStream".to_string())
56 .spawn({
57 let error = Arc::clone(&error);
58 move || {
59 let mut catalog_guard = catalog.lock().unwrap();
60 let writer = std::io::BufWriter::with_capacity(
61 buffer_size,
62 crate::tools::StdChannelWriter::new(tx),
63 );
64
65 let writer = pxar::encoder::sync::StandardWriter::new(writer);
66 if let Err(err) = crate::pxar::create_archive(
67 dir,
68 writer,
69 patterns,
70 crate::pxar::flags::DEFAULT,
71 device_set,
72 skip_lost_and_found,
73 |_| Ok(()),
74 entries_max,
75 Some(&mut *catalog_guard),
76 ) {
77 let mut error = error.lock().unwrap();
78 *error = Some(err.to_string());
79 }
80 }
81 })?;
82
83 Ok(Self {
84 rx: Some(rx),
85 child: Some(child),
86 error,
87 })
88 }
89
90 pub fn open<W: Write + Send + 'static>(
91 dirname: &Path,
92 device_set: Option<HashSet<u64>>,
93 verbose: bool,
94 skip_lost_and_found: bool,
95 catalog: Arc<Mutex<CatalogWriter<W>>>,
96 patterns: Vec<MatchEntry>,
97 entries_max: usize,
98 ) -> Result<Self, Error> {
99 let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
100 let path = std::path::PathBuf::from(dirname);
101
102 Self::new(
103 dir,
104 path,
105 device_set,
106 verbose,
107 skip_lost_and_found,
108 catalog,
109 patterns,
110 entries_max,
111 )
112 }
113}
114
115impl Stream for PxarBackupStream {
116 type Item = Result<Vec<u8>, Error>;
117
118 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
119 {
120 // limit lock scope
121 let error = self.error.lock().unwrap();
122 if let Some(ref msg) = *error {
123 return Poll::Ready(Some(Err(format_err!("{}", msg))));
124 }
125 }
126
127 match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
128 Ok(data) => Poll::Ready(Some(data)),
129 Err(_) => {
130 let error = self.error.lock().unwrap();
131 if let Some(ref msg) = *error {
132 return Poll::Ready(Some(Err(format_err!("{}", msg))));
133 }
134 Poll::Ready(None) // channel closed, no error
135 }
136 }
137 }
138}