4 use std
::os
::unix
::io
::FromRawFd
;
6 use futures
::{Async, Poll}
;
7 use futures
::stream
::Stream
;
10 use nix
::sys
::stat
::Mode
;
12 use crate::catar
::encoder
::*;
14 pub struct CaTarBackupStream
{
15 pipe
: Option
<std
::fs
::File
>,
17 child
: Option
<thread
::JoinHandle
<()>>,
20 impl Drop
for CaTarBackupStream
{
23 drop(self.pipe
.take());
24 self.child
.take().unwrap().join().unwrap();
28 impl CaTarBackupStream
{
30 pub fn new(dirname
: &str) -> Result
<Self, Error
> {
31 let mut buffer
= Vec
::with_capacity(4096);
32 unsafe { buffer.set_len(buffer.capacity()); }
34 let (rx
, tx
) = nix
::unistd
::pipe()?
;
36 let mut dir
= nix
::dir
::Dir
::open(dirname
, OFlag
::O_DIRECTORY
, Mode
::empty())?
;
37 let path
= std
::path
::PathBuf
::from(dirname
);
39 let child
= thread
::spawn(move|| {
40 let mut writer
= unsafe { std::fs::File::from_raw_fd(tx) }
;
41 if let Err(err
) = CaTarEncoder
::encode(path
, &mut dir
, None
, &mut writer
) {
42 eprintln
!("catar encode failed - {}", err
);
46 let pipe
= unsafe { std::fs::File::from_raw_fd(rx) }
;
48 Ok(Self { pipe: Some(pipe), buffer, child: Some(child) }
)
52 impl Stream
for CaTarBackupStream
{
57 // Note: This is not async!!
59 fn poll(&mut self) -> Poll
<Option
<Vec
<u8>>, Error
> {
64 let pipe
= match self.pipe
{
65 Some(ref mut pipe
) => pipe
,
66 None
=> unreachable
!(),
68 match pipe
.read(&mut self.buffer
) {
71 return Ok(Async
::Ready(None
))
73 let data
= self.buffer
[..n
].to_vec();
74 return Ok(Async
::Ready(Some(data
)))
77 Err(ref e
) if e
.kind() == std
::io
::ErrorKind
::Interrupted
=> {
81 return Err(err
.into())