]>
git.proxmox.com Git - proxmox-backup.git/blob - src/client/pxar_backup_stream.rs
1 use std
::collections
::HashSet
;
3 //use std::os::unix::io::FromRawFd;
4 use std
::path
::{Path, PathBuf}
;
6 use std
::sync
::{Arc, Mutex}
;
7 use std
::task
::{Context, Poll}
;
11 use futures
::stream
::Stream
;
13 use nix
::fcntl
::OFlag
;
14 use nix
::sys
::stat
::Mode
;
18 use crate::backup
::CatalogWriter
;
20 /// Stream implementation to encode and upload .pxar archives.
22 /// The hyper client needs an async Stream for file upload, so we
23 /// spawn an extra thread to encode the .pxar data and pipe it to the
25 pub struct PxarBackupStream
{
26 rx
: Option
<std
::sync
::mpsc
::Receiver
<Result
<Vec
<u8>, Error
>>>,
27 child
: Option
<thread
::JoinHandle
<()>>,
28 error
: Arc
<Mutex
<Option
<String
>>>,
31 impl Drop
for PxarBackupStream
{
35 self.child
.take().unwrap().join().unwrap();
39 impl PxarBackupStream
{
41 pub fn new
<W
: Write
+ Send
+ '
static>(
44 device_set
: Option
<HashSet
<u64>>,
46 skip_lost_and_found
: bool
,
47 catalog
: Arc
<Mutex
<CatalogWriter
<W
>>>,
48 exclude_pattern
: Vec
<pxar
::MatchPattern
>,
50 ) -> Result
<Self, Error
> {
52 let (tx
, rx
) = std
::sync
::mpsc
::sync_channel(10);
54 let buffer_size
= 256*1024;
56 let error
= Arc
::new(Mutex
::new(None
));
57 let error2
= error
.clone();
59 let catalog
= catalog
.clone();
60 let child
= std
::thread
::Builder
::new().name("PxarBackupStream".to_string()).spawn(move || {
61 let mut guard
= catalog
.lock().unwrap();
62 let mut writer
= std
::io
::BufWriter
::with_capacity(buffer_size
, crate::tools
::StdChannelWriter
::new(tx
));
64 if let Err(err
) = pxar
::Encoder
::encode(
76 let mut error
= error2
.lock().unwrap();
77 *error
= Some(err
.to_string());
88 pub fn open
<W
: Write
+ Send
+ '
static>(
90 device_set
: Option
<HashSet
<u64>>,
92 skip_lost_and_found
: bool
,
93 catalog
: Arc
<Mutex
<CatalogWriter
<W
>>>,
94 exclude_pattern
: Vec
<pxar
::MatchPattern
>,
96 ) -> Result
<Self, Error
> {
98 let dir
= nix
::dir
::Dir
::open(dirname
, OFlag
::O_DIRECTORY
, Mode
::empty())?
;
99 let path
= std
::path
::PathBuf
::from(dirname
);
101 Self::new(dir
, path
, device_set
, verbose
, skip_lost_and_found
, catalog
, exclude_pattern
, entries_max
)
105 impl Stream
for PxarBackupStream
{
107 type Item
= Result
<Vec
<u8>, Error
>;
109 fn poll_next(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
110 { // limit lock scope
111 let error
= self.error
.lock().unwrap();
112 if let Some(ref msg
) = *error
{
113 return Poll
::Ready(Some(Err(format_err
!("{}", msg
))));
117 match crate::tools
::runtime
::block_in_place(|| self.rx
.as_ref().unwrap().recv()) {
118 Ok(data
) => Poll
::Ready(Some(data
)),
120 let error
= self.error
.lock().unwrap();
121 if let Some(ref msg
) = *error
{
122 return Poll
::Ready(Some(Err(format_err
!("{}", msg
))));
124 Poll
::Ready(None
) // channel closed, no error