]>
git.proxmox.com Git - proxmox-backup.git/blob - pbs-client/src/pxar_backup_stream.rs
2 //use std::os::unix::io::FromRawFd;
5 use std
::sync
::{Arc, Mutex}
;
6 use std
::task
::{Context, Poll}
;
8 use anyhow
::{format_err, Error}
;
9 use futures
::stream
::Stream
;
10 use futures
::future
::{Abortable, AbortHandle}
;
12 use nix
::fcntl
::OFlag
;
13 use nix
::sys
::stat
::Mode
;
15 use pbs_datastore
::catalog
::CatalogWriter
;
16 use pbs_tools
::sync
::StdChannelWriter
;
17 use pbs_tools
::tokio
::TokioWriterAdapter
;
19 /// Stream implementation to encode and upload .pxar archives.
21 /// The hyper client needs an async Stream for file upload, so we
22 /// spawn an extra thread to encode the .pxar data and pipe it to the
24 pub struct PxarBackupStream
{
25 rx
: Option
<std
::sync
::mpsc
::Receiver
<Result
<Vec
<u8>, Error
>>>,
26 handle
: Option
<AbortHandle
>,
27 error
: Arc
<Mutex
<Option
<String
>>>,
30 impl Drop
for PxarBackupStream
{
33 self.handle
.take().unwrap().abort();
37 impl PxarBackupStream
{
38 pub fn new
<W
: Write
+ Send
+ '
static>(
40 catalog
: Arc
<Mutex
<CatalogWriter
<W
>>>,
41 options
: crate::pxar
::PxarCreateOptions
,
42 ) -> Result
<Self, Error
> {
43 let (tx
, rx
) = std
::sync
::mpsc
::sync_channel(10);
45 let buffer_size
= 256 * 1024;
47 let error
= Arc
::new(Mutex
::new(None
));
48 let error2
= Arc
::clone(&error
);
49 let handler
= async
move {
50 let writer
= TokioWriterAdapter
::new(std
::io
::BufWriter
::with_capacity(
52 StdChannelWriter
::new(tx
),
55 let verbose
= options
.verbose
;
57 let writer
= pxar
::encoder
::sync
::StandardWriter
::new(writer
);
58 if let Err(err
) = crate::pxar
::create_archive(
61 crate::pxar
::Flags
::DEFAULT
,
64 println
!("{:?}", path
);
71 let mut error
= error2
.lock().unwrap();
72 *error
= Some(err
.to_string());
76 let (handle
, registration
) = AbortHandle
::new_pair();
77 let future
= Abortable
::new(handler
, registration
);
87 pub fn open
<W
: Write
+ Send
+ '
static>(
89 catalog
: Arc
<Mutex
<CatalogWriter
<W
>>>,
90 options
: crate::pxar
::PxarCreateOptions
,
91 ) -> Result
<Self, Error
> {
92 let dir
= nix
::dir
::Dir
::open(dirname
, OFlag
::O_DIRECTORY
, Mode
::empty())?
;
102 impl Stream
for PxarBackupStream
{
103 type Item
= Result
<Vec
<u8>, Error
>;
105 fn poll_next(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
108 let error
= self.error
.lock().unwrap();
109 if let Some(ref msg
) = *error
{
110 return Poll
::Ready(Some(Err(format_err
!("{}", msg
))));
114 match pbs_runtime
::block_in_place(|| self.rx
.as_ref().unwrap().recv()) {
115 Ok(data
) => Poll
::Ready(Some(data
)),
117 let error
= self.error
.lock().unwrap();
118 if let Some(ref msg
) = *error
{
119 return Poll
::Ready(Some(Err(format_err
!("{}", msg
))));
121 Poll
::Ready(None
) // channel closed, no error