4 use std
::sync
::{Arc, Mutex}
;
5 use std
::os
::unix
::io
::FromRawFd
;
6 use std
::path
::{Path, PathBuf}
;
7 use std
::collections
::HashSet
;
10 use futures
::stream
::Stream
;
12 use nix
::fcntl
::OFlag
;
13 use nix
::sys
::stat
::Mode
;
17 use crate::tools
::wrapped_reader_stream
::WrappedReaderStream
;
19 /// Stream implementation to encode and upload .pxar archives.
21 /// The hyper client needs an async Stream for file upload, so we
22 /// spawn an extra thread to encode the .pxar data and pipe it to the
24 pub struct PxarBackupStream
{
25 stream
: Option
<WrappedReaderStream
<std
::fs
::File
>>,
26 child
: Option
<thread
::JoinHandle
<()>>,
27 error
: Arc
<Mutex
<Option
<String
>>>,
30 impl Drop
for PxarBackupStream
{
34 self.child
.take().unwrap().join().unwrap();
38 impl PxarBackupStream
{
40 pub fn new(mut dir
: Dir
, path
: PathBuf
, device_set
: Option
<HashSet
<u64>>, verbose
: bool
, skip_lost_and_found
: bool
) -> Result
<Self, Error
> {
42 let (rx
, tx
) = nix
::unistd
::pipe()?
;
44 let buffer_size
= 1024*1024;
45 nix
::fcntl
::fcntl(rx
, nix
::fcntl
::FcntlArg
::F_SETPIPE_SZ(buffer_size
as i32))?
;
47 let error
= Arc
::new(Mutex
::new(None
));
48 let error2
= error
.clone();
50 let child
= thread
::spawn(move|| {
51 let mut writer
= unsafe { std::fs::File::from_raw_fd(tx) }
;
52 if let Err(err
) = pxar
::Encoder
::encode(path
, &mut dir
, &mut writer
, device_set
, verbose
, skip_lost_and_found
, pxar
::flags
::DEFAULT
) {
53 let mut error
= error2
.lock().unwrap();
54 *error
= Some(err
.to_string());
58 let pipe
= unsafe { std::fs::File::from_raw_fd(rx) }
;
59 let stream
= crate::tools
::wrapped_reader_stream
::WrappedReaderStream
::new(pipe
);
68 pub fn open(dirname
: &Path
, device_set
: Option
<HashSet
<u64>>, verbose
: bool
, skip_lost_and_found
: bool
) -> Result
<Self, Error
> {
70 let dir
= nix
::dir
::Dir
::open(dirname
, OFlag
::O_DIRECTORY
, Mode
::empty())?
;
71 let path
= std
::path
::PathBuf
::from(dirname
);
73 Self::new(dir
, path
, device_set
, verbose
, skip_lost_and_found
)
77 impl Stream
for PxarBackupStream
{
82 fn poll(&mut self) -> Poll
<Option
<Vec
<u8>>, Error
> {
84 let error
= self.error
.lock().unwrap();
85 if let Some(ref msg
) = *error
{
86 return Err(format_err
!("{}", msg
));
89 self.stream
.as_mut().unwrap().poll().map_err(Error
::from
)