]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/pxar_backup_stream.rs
move more tools for the client into subcrates
[proxmox-backup.git] / src / client / pxar_backup_stream.rs
CommitLineData
bf6e3217 1use std::io::Write;
02141b4d 2//use std::os::unix::io::FromRawFd;
254ec194 3use std::path::Path;
369a87e3
WB
4use std::pin::Pin;
5use std::sync::{Arc, Mutex};
6use std::task::{Context, Poll};
e8edbbd4 7
f7d4e4b5 8use anyhow::{format_err, Error};
e8edbbd4 9use futures::stream::Stream;
6afb60ab 10use futures::future::{Abortable, AbortHandle};
c443f58b 11use nix::dir::Dir;
e8edbbd4
DM
12use nix::fcntl::OFlag;
13use nix::sys::stat::Mode;
14
be3a0295 15use pbs_datastore::catalog::CatalogWriter;
4805edc4
WB
16use pbs_tools::sync::StdChannelWriter;
17use pbs_tools::tokio::TokioWriterAdapter;
9d135fe6 18
8968258b 19/// Stream implementation to encode and upload .pxar archives.
151c6ce2
DM
20///
21/// The hyper client needs an async Stream for file upload, so we
8968258b 22/// spawn an extra thread to encode the .pxar data and pipe it to the
151c6ce2 23/// consumer.
8968258b 24pub struct PxarBackupStream {
02141b4d 25 rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
6afb60ab 26 handle: Option<AbortHandle>,
5be106ee 27 error: Arc<Mutex<Option<String>>>,
e8edbbd4
DM
28}
29
8968258b 30impl Drop for PxarBackupStream {
e8edbbd4 31 fn drop(&mut self) {
02141b4d 32 self.rx = None;
6afb60ab 33 self.handle.take().unwrap().abort();
e8edbbd4
DM
34 }
35}
36
8968258b 37impl PxarBackupStream {
bf6e3217 38 pub fn new<W: Write + Send + 'static>(
c443f58b 39 dir: Dir,
bf6e3217 40 catalog: Arc<Mutex<CatalogWriter<W>>>,
77486a60 41 options: crate::pxar::PxarCreateOptions,
2761d6a4 42 ) -> Result<Self, Error> {
02141b4d 43 let (tx, rx) = std::sync::mpsc::sync_channel(10);
e8edbbd4 44
c443f58b 45 let buffer_size = 256 * 1024;
c6e28b66 46
5be106ee 47 let error = Arc::new(Mutex::new(None));
6afb60ab
SR
48 let error2 = Arc::clone(&error);
49 let handler = async move {
f1d76ecf 50 let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
6afb60ab 51 buffer_size,
f1d76ecf
DC
52 StdChannelWriter::new(tx),
53 ));
c443f58b 54
6afb60ab 55 let verbose = options.verbose;
77486a60 56
6afb60ab
SR
57 let writer = pxar::encoder::sync::StandardWriter::new(writer);
58 if let Err(err) = crate::pxar::create_archive(
59 dir,
60 writer,
61 crate::pxar::Flags::DEFAULT,
62 move |path| {
63 if verbose {
64 println!("{:?}", path);
c443f58b 65 }
6afb60ab
SR
66 Ok(())
67 },
68 Some(catalog),
69 options,
70 ).await {
71 let mut error = error2.lock().unwrap();
72 *error = Some(err.to_string());
73 }
74 };
75
76 let (handle, registration) = AbortHandle::new_pair();
77 let future = Abortable::new(handler, registration);
78 tokio::spawn(future);
e8edbbd4 79
5be106ee 80 Ok(Self {
02141b4d 81 rx: Some(rx),
6afb60ab 82 handle: Some(handle),
5be106ee
DM
83 error,
84 })
e8edbbd4 85 }
23bb8780 86
bf6e3217 87 pub fn open<W: Write + Send + 'static>(
2761d6a4 88 dirname: &Path,
bf6e3217 89 catalog: Arc<Mutex<CatalogWriter<W>>>,
77486a60 90 options: crate::pxar::PxarCreateOptions,
2761d6a4 91 ) -> Result<Self, Error> {
728797d0 92 let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
23bb8780 93
c443f58b
WB
94 Self::new(
95 dir,
c443f58b 96 catalog,
77486a60 97 options,
c443f58b 98 )
23bb8780 99 }
e8edbbd4
DM
100}
101
8968258b 102impl Stream for PxarBackupStream {
369a87e3 103 type Item = Result<Vec<u8>, Error>;
e8edbbd4 104
02141b4d 105 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
c443f58b
WB
106 {
107 // limit lock scope
6c3c9bce
DM
108 let error = self.error.lock().unwrap();
109 if let Some(ref msg) = *error {
369a87e3 110 return Poll::Ready(Some(Err(format_err!("{}", msg))));
6c3c9bce 111 }
5be106ee 112 }
02141b4d 113
d420962f 114 match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
02141b4d
DM
115 Ok(data) => Poll::Ready(Some(data)),
116 Err(_) => {
117 let error = self.error.lock().unwrap();
118 if let Some(ref msg) = *error {
119 return Poll::Ready(Some(Err(format_err!("{}", msg))));
120 }
121 Poll::Ready(None) // channel closed, no error
122 }
123 }
e8edbbd4
DM
124 }
125}