1 use std
::collections
::HashSet
;
3 use std
::os
::unix
::io
::FromRawFd
;
4 use std
::path
::{Path, PathBuf}
;
6 use std
::sync
::{Arc, Mutex}
;
7 use std
::task
::{Context, Poll}
;
11 use futures
::stream
::Stream
;
13 use nix
::fcntl
::OFlag
;
14 use nix
::sys
::stat
::Mode
;
18 use crate::backup
::CatalogWriter
;
20 use crate::tools
::wrapped_reader_stream
::WrappedReaderStream
;
22 /// Stream implementation to encode and upload .pxar archives.
24 /// The hyper client needs an async Stream for file upload, so we
25 /// spawn an extra thread to encode the .pxar data and pipe it to the
27 pub struct PxarBackupStream
{
28 stream
: Option
<WrappedReaderStream
<std
::fs
::File
>>,
29 child
: Option
<thread
::JoinHandle
<()>>,
30 error
: Arc
<Mutex
<Option
<String
>>>,
33 impl Drop
for PxarBackupStream
{
37 self.child
.take().unwrap().join().unwrap();
41 impl PxarBackupStream
{
42 pin_utils
::unsafe_pinned
!(stream
: Option
<WrappedReaderStream
<std
::fs
::File
>>);
44 pub fn new
<W
: Write
+ Send
+ '
static>(
47 device_set
: Option
<HashSet
<u64>>,
49 skip_lost_and_found
: bool
,
50 catalog
: Arc
<Mutex
<CatalogWriter
<W
>>>,
52 ) -> Result
<Self, Error
> {
54 let (rx
, tx
) = nix
::unistd
::pipe()?
;
56 let buffer_size
= 1024*1024;
57 nix
::fcntl
::fcntl(rx
, nix
::fcntl
::FcntlArg
::F_SETPIPE_SZ(buffer_size
as i32))?
;
59 let error
= Arc
::new(Mutex
::new(None
));
60 let error2
= error
.clone();
62 let catalog
= catalog
.clone();
63 let exclude_pattern
= Vec
::new();
64 let child
= std
::thread
::Builder
::new().name("PxarBackupStream".to_string()).spawn(move || {
65 let mut guard
= catalog
.lock().unwrap();
66 let mut writer
= unsafe { std::fs::File::from_raw_fd(tx) }
;
67 if let Err(err
) = pxar
::Encoder
::encode(
79 let mut error
= error2
.lock().unwrap();
80 *error
= Some(err
.to_string());
84 let pipe
= unsafe { std::fs::File::from_raw_fd(rx) }
;
85 let stream
= crate::tools
::wrapped_reader_stream
::WrappedReaderStream
::new(pipe
);
94 pub fn open
<W
: Write
+ Send
+ '
static>(
96 device_set
: Option
<HashSet
<u64>>,
98 skip_lost_and_found
: bool
,
99 catalog
: Arc
<Mutex
<CatalogWriter
<W
>>>,
101 ) -> Result
<Self, Error
> {
103 let dir
= nix
::dir
::Dir
::open(dirname
, OFlag
::O_DIRECTORY
, Mode
::empty())?
;
104 let path
= std
::path
::PathBuf
::from(dirname
);
106 Self::new(dir
, path
, device_set
, verbose
, skip_lost_and_found
, catalog
, entries_max
)
110 impl Stream
for PxarBackupStream
{
112 type Item
= Result
<Vec
<u8>, Error
>;
114 fn poll_next(mut self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
115 { // limit lock scope
116 let error
= self.error
.lock().unwrap();
117 if let Some(ref msg
) = *error
{
118 return Poll
::Ready(Some(Err(format_err
!("{}", msg
))));
121 let res
= self.as_mut()
126 Poll
::Ready(futures
::ready
!(res
)
127 .map(|v
| v
.map_err(Error
::from
))