]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/catar_backup_stream.rs
client/http_client.rs: new helper class
[proxmox-backup.git] / src / client / catar_backup_stream.rs
1 use failure::*;
2
3 use std::thread;
4 use std::os::unix::io::FromRawFd;
5
6 use futures::{Async, Poll};
7 use futures::stream::Stream;
8
9 use nix::fcntl::OFlag;
10 use nix::sys::stat::Mode;
11
12 use crate::catar::encoder::*;
13
14 pub struct CaTarBackupStream {
15 pipe: Option<std::fs::File>,
16 buffer: Vec<u8>,
17 child: Option<thread::JoinHandle<()>>,
18 }
19
20 impl Drop for CaTarBackupStream {
21
22 fn drop(&mut self) {
23 drop(self.pipe.take());
24 self.child.take().unwrap().join().unwrap();
25 }
26 }
27
28 impl CaTarBackupStream {
29
30 pub fn new(dirname: &str) -> Result<Self, Error> {
31 let mut buffer = Vec::with_capacity(4096);
32 unsafe { buffer.set_len(buffer.capacity()); }
33
34 let (rx, tx) = nix::unistd::pipe()?;
35
36 let mut dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
37 let path = std::path::PathBuf::from(dirname);
38
39 let child = thread::spawn(move|| {
40 let mut writer = unsafe { std::fs::File::from_raw_fd(tx) };
41 if let Err(err) = CaTarEncoder::encode(path, &mut dir, None, &mut writer) {
42 eprintln!("catar encode failed - {}", err);
43 }
44 });
45
46 let pipe = unsafe { std::fs::File::from_raw_fd(rx) };
47
48 Ok(Self { pipe: Some(pipe), buffer, child: Some(child) })
49 }
50 }
51
52 impl Stream for CaTarBackupStream {
53
54 type Item = Vec<u8>;
55 type Error = Error;
56
57 // Note: This is not async!!
58
59 fn poll(&mut self) -> Poll<Option<Vec<u8>>, Error> {
60
61 use std::io::Read;
62
63 loop {
64 let pipe = match self.pipe {
65 Some(ref mut pipe) => pipe,
66 None => unreachable!(),
67 };
68 match pipe.read(&mut self.buffer) {
69 Ok(n) => {
70 if n == 0 {
71 return Ok(Async::Ready(None))
72 } else {
73 let data = self.buffer[..n].to_vec();
74 return Ok(Async::Ready(Some(data)))
75 }
76 }
77 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {
78 // try again
79 }
80 Err(err) => {
81 return Err(err.into())
82 }
83 };
84 }
85 }
86 }