]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/pxar_backup_stream.rs
fix #3359: fix blocking writes in async code during pxar create
[proxmox-backup.git] / src / client / pxar_backup_stream.rs
CommitLineData
bf6e3217 1use std::io::Write;
02141b4d 2//use std::os::unix::io::FromRawFd;
254ec194 3use std::path::Path;
369a87e3
WB
4use std::pin::Pin;
5use std::sync::{Arc, Mutex};
6use std::task::{Context, Poll};
e8edbbd4 7
f7d4e4b5 8use anyhow::{format_err, Error};
e8edbbd4 9use futures::stream::Stream;
6afb60ab 10use futures::future::{Abortable, AbortHandle};
c443f58b 11use nix::dir::Dir;
e8edbbd4
DM
12use nix::fcntl::OFlag;
13use nix::sys::stat::Mode;
14
bf6e3217 15use crate::backup::CatalogWriter;
f1d76ecf
DC
16use crate::tools::{
17 StdChannelWriter,
18 TokioWriterAdapter,
19};
9d135fe6 20
8968258b 21/// Stream implementation to encode and upload .pxar archives.
151c6ce2
DM
22///
23/// The hyper client needs an async Stream for file upload, so we
8968258b 24/// spawn an extra thread to encode the .pxar data and pipe it to the
151c6ce2 25/// consumer.
8968258b 26pub struct PxarBackupStream {
02141b4d 27 rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
6afb60ab 28 handle: Option<AbortHandle>,
5be106ee 29 error: Arc<Mutex<Option<String>>>,
e8edbbd4
DM
30}
31
8968258b 32impl Drop for PxarBackupStream {
e8edbbd4 33 fn drop(&mut self) {
02141b4d 34 self.rx = None;
6afb60ab 35 self.handle.take().unwrap().abort();
e8edbbd4
DM
36 }
37}
38
8968258b 39impl PxarBackupStream {
bf6e3217 40 pub fn new<W: Write + Send + 'static>(
c443f58b 41 dir: Dir,
bf6e3217 42 catalog: Arc<Mutex<CatalogWriter<W>>>,
77486a60 43 options: crate::pxar::PxarCreateOptions,
2761d6a4 44 ) -> Result<Self, Error> {
02141b4d 45 let (tx, rx) = std::sync::mpsc::sync_channel(10);
e8edbbd4 46
c443f58b 47 let buffer_size = 256 * 1024;
c6e28b66 48
5be106ee 49 let error = Arc::new(Mutex::new(None));
6afb60ab
SR
50 let error2 = Arc::clone(&error);
51 let handler = async move {
f1d76ecf 52 let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
6afb60ab 53 buffer_size,
f1d76ecf
DC
54 StdChannelWriter::new(tx),
55 ));
c443f58b 56
6afb60ab 57 let verbose = options.verbose;
77486a60 58
6afb60ab
SR
59 let writer = pxar::encoder::sync::StandardWriter::new(writer);
60 if let Err(err) = crate::pxar::create_archive(
61 dir,
62 writer,
63 crate::pxar::Flags::DEFAULT,
64 move |path| {
65 if verbose {
66 println!("{:?}", path);
c443f58b 67 }
6afb60ab
SR
68 Ok(())
69 },
70 Some(catalog),
71 options,
72 ).await {
73 let mut error = error2.lock().unwrap();
74 *error = Some(err.to_string());
75 }
76 };
77
78 let (handle, registration) = AbortHandle::new_pair();
79 let future = Abortable::new(handler, registration);
80 tokio::spawn(future);
e8edbbd4 81
5be106ee 82 Ok(Self {
02141b4d 83 rx: Some(rx),
6afb60ab 84 handle: Some(handle),
5be106ee
DM
85 error,
86 })
e8edbbd4 87 }
23bb8780 88
bf6e3217 89 pub fn open<W: Write + Send + 'static>(
2761d6a4 90 dirname: &Path,
bf6e3217 91 catalog: Arc<Mutex<CatalogWriter<W>>>,
77486a60 92 options: crate::pxar::PxarCreateOptions,
2761d6a4 93 ) -> Result<Self, Error> {
728797d0 94 let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
23bb8780 95
c443f58b
WB
96 Self::new(
97 dir,
c443f58b 98 catalog,
77486a60 99 options,
c443f58b 100 )
23bb8780 101 }
e8edbbd4
DM
102}
103
8968258b 104impl Stream for PxarBackupStream {
369a87e3 105 type Item = Result<Vec<u8>, Error>;
e8edbbd4 106
02141b4d 107 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
c443f58b
WB
108 {
109 // limit lock scope
6c3c9bce
DM
110 let error = self.error.lock().unwrap();
111 if let Some(ref msg) = *error {
369a87e3 112 return Poll::Ready(Some(Err(format_err!("{}", msg))));
6c3c9bce 113 }
5be106ee 114 }
02141b4d
DM
115
116 match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
117 Ok(data) => Poll::Ready(Some(data)),
118 Err(_) => {
119 let error = self.error.lock().unwrap();
120 if let Some(ref msg) = *error {
121 return Poll::Ready(Some(Err(format_err!("{}", msg))));
122 }
123 Poll::Ready(None) // channel closed, no error
124 }
125 }
e8edbbd4
DM
126 }
127}