]>
git.proxmox.com Git - proxmox-backup.git/blob - pbs-client/src/pxar_backup_stream.rs
2 //use std::os::unix::io::FromRawFd;
5 use std
::sync
::{Arc, Mutex}
;
6 use std
::task
::{Context, Poll}
;
8 use anyhow
::{format_err, Error}
;
9 use futures
::stream
::Stream
;
10 use futures
::future
::{Abortable, AbortHandle}
;
12 use nix
::fcntl
::OFlag
;
13 use nix
::sys
::stat
::Mode
;
15 use proxmox_async
::blocking
::TokioWriterAdapter
;
17 use pbs_datastore
::catalog
::CatalogWriter
;
18 use pbs_tools
::sync
::StdChannelWriter
;
20 /// Stream implementation to encode and upload .pxar archives.
22 /// The hyper client needs an async Stream for file upload, so we
23 /// spawn an extra thread to encode the .pxar data and pipe it to the
25 pub struct PxarBackupStream
{
26 rx
: Option
<std
::sync
::mpsc
::Receiver
<Result
<Vec
<u8>, Error
>>>,
27 handle
: Option
<AbortHandle
>,
28 error
: Arc
<Mutex
<Option
<String
>>>,
31 impl Drop
for PxarBackupStream
{
34 self.handle
.take().unwrap().abort();
38 impl PxarBackupStream
{
39 pub fn new
<W
: Write
+ Send
+ '
static>(
41 catalog
: Arc
<Mutex
<CatalogWriter
<W
>>>,
42 options
: crate::pxar
::PxarCreateOptions
,
43 ) -> Result
<Self, Error
> {
44 let (tx
, rx
) = std
::sync
::mpsc
::sync_channel(10);
46 let buffer_size
= 256 * 1024;
48 let error
= Arc
::new(Mutex
::new(None
));
49 let error2
= Arc
::clone(&error
);
50 let handler
= async
move {
51 let writer
= TokioWriterAdapter
::new(std
::io
::BufWriter
::with_capacity(
53 StdChannelWriter
::new(tx
),
56 let verbose
= options
.verbose
;
58 let writer
= pxar
::encoder
::sync
::StandardWriter
::new(writer
);
59 if let Err(err
) = crate::pxar
::create_archive(
62 crate::pxar
::Flags
::DEFAULT
,
65 println
!("{:?}", path
);
72 let mut error
= error2
.lock().unwrap();
73 *error
= Some(err
.to_string());
77 let (handle
, registration
) = AbortHandle
::new_pair();
78 let future
= Abortable
::new(handler
, registration
);
88 pub fn open
<W
: Write
+ Send
+ '
static>(
90 catalog
: Arc
<Mutex
<CatalogWriter
<W
>>>,
91 options
: crate::pxar
::PxarCreateOptions
,
92 ) -> Result
<Self, Error
> {
93 let dir
= nix
::dir
::Dir
::open(dirname
, OFlag
::O_DIRECTORY
, Mode
::empty())?
;
103 impl Stream
for PxarBackupStream
{
104 type Item
= Result
<Vec
<u8>, Error
>;
106 fn poll_next(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
109 let error
= self.error
.lock().unwrap();
110 if let Some(ref msg
) = *error
{
111 return Poll
::Ready(Some(Err(format_err
!("{}", msg
))));
115 match proxmox_async
::runtime
::block_in_place(|| self.rx
.as_ref().unwrap().recv()) {
116 Ok(data
) => Poll
::Ready(Some(data
)),
118 let error
= self.error
.lock().unwrap();
119 if let Some(ref msg
) = *error
{
120 return Poll
::Ready(Some(Err(format_err
!("{}", msg
))));
122 Poll
::Ready(None
) // channel closed, no error