]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/pxar_backup_stream.rs
src/client/pxar_backup_stream.rs: use 1M pipe buffer size
[proxmox-backup.git] / src / client / pxar_backup_stream.rs
CommitLineData
e8edbbd4
DM
1use failure::*;
2
3use std::thread;
4use std::os::unix::io::FromRawFd;
17d6979a 5use std::path::{Path, PathBuf};
e8edbbd4
DM
6
7use futures::{Async, Poll};
8use futures::stream::Stream;
9
10use nix::fcntl::OFlag;
11use nix::sys::stat::Mode;
23bb8780 12use nix::dir::Dir;
e8edbbd4 13
3dbfe5b1 14use crate::pxar;
e8edbbd4 15
8968258b 16/// Stream implementation to encode and upload .pxar archives.
151c6ce2
DM
17///
18/// The hyper client needs an async Stream for file upload, so we
8968258b 19/// spawn an extra thread to encode the .pxar data and pipe it to the
151c6ce2
DM
20/// consumer.
21///
22/// Note: The currect implementation is not fully ansync and can block.
8968258b 23pub struct PxarBackupStream {
e8edbbd4
DM
24 pipe: Option<std::fs::File>,
25 buffer: Vec<u8>,
26 child: Option<thread::JoinHandle<()>>,
27}
28
8968258b 29impl Drop for PxarBackupStream {
e8edbbd4
DM
30
31 fn drop(&mut self) {
32 drop(self.pipe.take());
33 self.child.take().unwrap().join().unwrap();
34 }
35}
36
8968258b 37impl PxarBackupStream {
e8edbbd4 38
eed6db39 39 pub fn new(mut dir: Dir, path: PathBuf, all_file_systems: bool, verbose: bool) -> Result<Self, Error> {
c6e28b66
DM
40 let buffer_size = 1024*1024;
41 let mut buffer = Vec::with_capacity(buffer_size);
e8edbbd4
DM
42 unsafe { buffer.set_len(buffer.capacity()); }
43
44 let (rx, tx) = nix::unistd::pipe()?;
45
c6e28b66
DM
46 nix::fcntl::fcntl(rx, nix::fcntl::FcntlArg::F_SETPIPE_SZ(buffer_size as i32))?;
47
e8edbbd4
DM
48 let child = thread::spawn(move|| {
49 let mut writer = unsafe { std::fs::File::from_raw_fd(tx) };
3dbfe5b1 50 if let Err(err) = pxar::Encoder::encode(path, &mut dir, &mut writer, all_file_systems, verbose) {
8968258b 51 eprintln!("pxar encode failed - {}", err);
e8edbbd4
DM
52 }
53 });
54
55 let pipe = unsafe { std::fs::File::from_raw_fd(rx) };
56
57 Ok(Self { pipe: Some(pipe), buffer, child: Some(child) })
58 }
23bb8780 59
eed6db39 60 pub fn open(dirname: &Path, all_file_systems: bool, verbose: bool) -> Result<Self, Error> {
23bb8780 61
728797d0 62 let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
23bb8780
DM
63 let path = std::path::PathBuf::from(dirname);
64
eed6db39 65 Self::new(dir, path, all_file_systems, verbose)
23bb8780 66 }
e8edbbd4
DM
67}
68
8968258b 69impl Stream for PxarBackupStream {
e8edbbd4
DM
70
71 type Item = Vec<u8>;
72 type Error = Error;
73
74 // Note: This is not async!!
75
76 fn poll(&mut self) -> Poll<Option<Vec<u8>>, Error> {
77
78 use std::io::Read;
79
23bb8780 80 loop {
e8edbbd4
DM
81 let pipe = match self.pipe {
82 Some(ref mut pipe) => pipe,
83 None => unreachable!(),
84 };
85 match pipe.read(&mut self.buffer) {
86 Ok(n) => {
87 if n == 0 {
88 return Ok(Async::Ready(None))
89 } else {
90 let data = self.buffer[..n].to_vec();
91 return Ok(Async::Ready(Some(data)))
92 }
93 }
94 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {
95 // try again
96 }
97 Err(err) => {
98 return Err(err.into())
99 }
100 };
101 }
102 }
103}