]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-client/src/pxar_backup_stream.rs
484a3e32766b992e2f70b5ee21a1685623b66814
[proxmox-backup.git] / pbs-client / src / pxar_backup_stream.rs
1 use std::io::Write;
2 //use std::os::unix::io::FromRawFd;
3 use std::path::Path;
4 use std::pin::Pin;
5 use std::sync::{Arc, Mutex};
6 use std::task::{Context, Poll};
7
8 use anyhow::{format_err, Error};
9 use futures::stream::Stream;
10 use futures::future::{Abortable, AbortHandle};
11 use nix::dir::Dir;
12 use nix::fcntl::OFlag;
13 use nix::sys::stat::Mode;
14
15 use proxmox_async::blocking::TokioWriterAdapter;
16
17 use pbs_datastore::catalog::CatalogWriter;
18 use pbs_tools::sync::StdChannelWriter;
19
20 /// Stream implementation to encode and upload .pxar archives.
21 ///
22 /// The hyper client needs an async Stream for file upload, so we
23 /// spawn an extra thread to encode the .pxar data and pipe it to the
24 /// consumer.
25 pub struct PxarBackupStream {
26 rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
27 handle: Option<AbortHandle>,
28 error: Arc<Mutex<Option<String>>>,
29 }
30
31 impl Drop for PxarBackupStream {
32 fn drop(&mut self) {
33 self.rx = None;
34 self.handle.take().unwrap().abort();
35 }
36 }
37
38 impl PxarBackupStream {
39 pub fn new<W: Write + Send + 'static>(
40 dir: Dir,
41 catalog: Arc<Mutex<CatalogWriter<W>>>,
42 options: crate::pxar::PxarCreateOptions,
43 ) -> Result<Self, Error> {
44 let (tx, rx) = std::sync::mpsc::sync_channel(10);
45
46 let buffer_size = 256 * 1024;
47
48 let error = Arc::new(Mutex::new(None));
49 let error2 = Arc::clone(&error);
50 let handler = async move {
51 let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
52 buffer_size,
53 StdChannelWriter::new(tx),
54 ));
55
56 let verbose = options.verbose;
57
58 let writer = pxar::encoder::sync::StandardWriter::new(writer);
59 if let Err(err) = crate::pxar::create_archive(
60 dir,
61 writer,
62 crate::pxar::Flags::DEFAULT,
63 move |path| {
64 if verbose {
65 println!("{:?}", path);
66 }
67 Ok(())
68 },
69 Some(catalog),
70 options,
71 ).await {
72 let mut error = error2.lock().unwrap();
73 *error = Some(err.to_string());
74 }
75 };
76
77 let (handle, registration) = AbortHandle::new_pair();
78 let future = Abortable::new(handler, registration);
79 tokio::spawn(future);
80
81 Ok(Self {
82 rx: Some(rx),
83 handle: Some(handle),
84 error,
85 })
86 }
87
88 pub fn open<W: Write + Send + 'static>(
89 dirname: &Path,
90 catalog: Arc<Mutex<CatalogWriter<W>>>,
91 options: crate::pxar::PxarCreateOptions,
92 ) -> Result<Self, Error> {
93 let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
94
95 Self::new(
96 dir,
97 catalog,
98 options,
99 )
100 }
101 }
102
103 impl Stream for PxarBackupStream {
104 type Item = Result<Vec<u8>, Error>;
105
106 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
107 {
108 // limit lock scope
109 let error = self.error.lock().unwrap();
110 if let Some(ref msg) = *error {
111 return Poll::Ready(Some(Err(format_err!("{}", msg))));
112 }
113 }
114
115 match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
116 Ok(data) => Poll::Ready(Some(data)),
117 Err(_) => {
118 let error = self.error.lock().unwrap();
119 if let Some(ref msg) = *error {
120 return Poll::Ready(Some(Err(format_err!("{}", msg))));
121 }
122 Poll::Ready(None) // channel closed, no error
123 }
124 }
125 }
126 }