]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-client/src/pxar_backup_stream.rs
move client to pbs-client subcrate
[proxmox-backup.git] / pbs-client / src / pxar_backup_stream.rs
1 use std::io::Write;
2 //use std::os::unix::io::FromRawFd;
3 use std::path::Path;
4 use std::pin::Pin;
5 use std::sync::{Arc, Mutex};
6 use std::task::{Context, Poll};
7
8 use anyhow::{format_err, Error};
9 use futures::stream::Stream;
10 use futures::future::{Abortable, AbortHandle};
11 use nix::dir::Dir;
12 use nix::fcntl::OFlag;
13 use nix::sys::stat::Mode;
14
15 use pbs_datastore::catalog::CatalogWriter;
16 use pbs_tools::sync::StdChannelWriter;
17 use pbs_tools::tokio::TokioWriterAdapter;
18
19 /// Stream implementation to encode and upload .pxar archives.
20 ///
21 /// The hyper client needs an async Stream for file upload, so we
22 /// spawn an extra thread to encode the .pxar data and pipe it to the
23 /// consumer.
24 pub struct PxarBackupStream {
25 rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
26 handle: Option<AbortHandle>,
27 error: Arc<Mutex<Option<String>>>,
28 }
29
30 impl Drop for PxarBackupStream {
31 fn drop(&mut self) {
32 self.rx = None;
33 self.handle.take().unwrap().abort();
34 }
35 }
36
37 impl PxarBackupStream {
38 pub fn new<W: Write + Send + 'static>(
39 dir: Dir,
40 catalog: Arc<Mutex<CatalogWriter<W>>>,
41 options: crate::pxar::PxarCreateOptions,
42 ) -> Result<Self, Error> {
43 let (tx, rx) = std::sync::mpsc::sync_channel(10);
44
45 let buffer_size = 256 * 1024;
46
47 let error = Arc::new(Mutex::new(None));
48 let error2 = Arc::clone(&error);
49 let handler = async move {
50 let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
51 buffer_size,
52 StdChannelWriter::new(tx),
53 ));
54
55 let verbose = options.verbose;
56
57 let writer = pxar::encoder::sync::StandardWriter::new(writer);
58 if let Err(err) = crate::pxar::create_archive(
59 dir,
60 writer,
61 crate::pxar::Flags::DEFAULT,
62 move |path| {
63 if verbose {
64 println!("{:?}", path);
65 }
66 Ok(())
67 },
68 Some(catalog),
69 options,
70 ).await {
71 let mut error = error2.lock().unwrap();
72 *error = Some(err.to_string());
73 }
74 };
75
76 let (handle, registration) = AbortHandle::new_pair();
77 let future = Abortable::new(handler, registration);
78 tokio::spawn(future);
79
80 Ok(Self {
81 rx: Some(rx),
82 handle: Some(handle),
83 error,
84 })
85 }
86
87 pub fn open<W: Write + Send + 'static>(
88 dirname: &Path,
89 catalog: Arc<Mutex<CatalogWriter<W>>>,
90 options: crate::pxar::PxarCreateOptions,
91 ) -> Result<Self, Error> {
92 let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
93
94 Self::new(
95 dir,
96 catalog,
97 options,
98 )
99 }
100 }
101
102 impl Stream for PxarBackupStream {
103 type Item = Result<Vec<u8>, Error>;
104
105 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
106 {
107 // limit lock scope
108 let error = self.error.lock().unwrap();
109 if let Some(ref msg) = *error {
110 return Poll::Ready(Some(Err(format_err!("{}", msg))));
111 }
112 }
113
114 match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
115 Ok(data) => Poll::Ready(Some(data)),
116 Err(_) => {
117 let error = self.error.lock().unwrap();
118 if let Some(ref msg) = *error {
119 return Poll::Ready(Some(Err(format_err!("{}", msg))));
120 }
121 Poll::Ready(None) // channel closed, no error
122 }
123 }
124 }
125 }