]>
Commit | Line | Data |
---|---|---|
bf6e3217 | 1 | use std::io::Write; |
02141b4d | 2 | //use std::os::unix::io::FromRawFd; |
254ec194 | 3 | use std::path::Path; |
369a87e3 WB |
4 | use std::pin::Pin; |
5 | use std::sync::{Arc, Mutex}; | |
6 | use std::task::{Context, Poll}; | |
e8edbbd4 | 7 | |
f7d4e4b5 | 8 | use anyhow::{format_err, Error}; |
e8edbbd4 | 9 | use futures::stream::Stream; |
6afb60ab | 10 | use futures::future::{Abortable, AbortHandle}; |
c443f58b | 11 | use nix::dir::Dir; |
e8edbbd4 DM |
12 | use nix::fcntl::OFlag; |
13 | use nix::sys::stat::Mode; | |
14 | ||
bf6e3217 | 15 | use crate::backup::CatalogWriter; |
f1d76ecf DC |
16 | use crate::tools::{ |
17 | StdChannelWriter, | |
18 | TokioWriterAdapter, | |
19 | }; | |
9d135fe6 | 20 | |
8968258b | 21 | /// Stream implementation to encode and upload .pxar archives. |
151c6ce2 DM |
22 | /// |
23 | /// The hyper client needs an async Stream for file upload, so we | |
8968258b | 24 | /// spawn an extra thread to encode the .pxar data and pipe it to the |
151c6ce2 | 25 | /// consumer. |
8968258b | 26 | pub struct PxarBackupStream { |
02141b4d | 27 | rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>, |
6afb60ab | 28 | handle: Option<AbortHandle>, |
5be106ee | 29 | error: Arc<Mutex<Option<String>>>, |
e8edbbd4 DM |
30 | } |
31 | ||
8968258b | 32 | impl Drop for PxarBackupStream { |
e8edbbd4 | 33 | fn drop(&mut self) { |
02141b4d | 34 | self.rx = None; |
6afb60ab | 35 | self.handle.take().unwrap().abort(); |
e8edbbd4 DM |
36 | } |
37 | } | |
38 | ||
8968258b | 39 | impl PxarBackupStream { |
bf6e3217 | 40 | pub fn new<W: Write + Send + 'static>( |
c443f58b | 41 | dir: Dir, |
bf6e3217 | 42 | catalog: Arc<Mutex<CatalogWriter<W>>>, |
77486a60 | 43 | options: crate::pxar::PxarCreateOptions, |
2761d6a4 | 44 | ) -> Result<Self, Error> { |
02141b4d | 45 | let (tx, rx) = std::sync::mpsc::sync_channel(10); |
e8edbbd4 | 46 | |
c443f58b | 47 | let buffer_size = 256 * 1024; |
c6e28b66 | 48 | |
5be106ee | 49 | let error = Arc::new(Mutex::new(None)); |
6afb60ab SR |
50 | let error2 = Arc::clone(&error); |
51 | let handler = async move { | |
f1d76ecf | 52 | let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( |
6afb60ab | 53 | buffer_size, |
f1d76ecf DC |
54 | StdChannelWriter::new(tx), |
55 | )); | |
c443f58b | 56 | |
6afb60ab | 57 | let verbose = options.verbose; |
77486a60 | 58 | |
6afb60ab SR |
59 | let writer = pxar::encoder::sync::StandardWriter::new(writer); |
60 | if let Err(err) = crate::pxar::create_archive( | |
61 | dir, | |
62 | writer, | |
63 | crate::pxar::Flags::DEFAULT, | |
64 | move |path| { | |
65 | if verbose { | |
66 | println!("{:?}", path); | |
c443f58b | 67 | } |
6afb60ab SR |
68 | Ok(()) |
69 | }, | |
70 | Some(catalog), | |
71 | options, | |
72 | ).await { | |
73 | let mut error = error2.lock().unwrap(); | |
74 | *error = Some(err.to_string()); | |
75 | } | |
76 | }; | |
77 | ||
78 | let (handle, registration) = AbortHandle::new_pair(); | |
79 | let future = Abortable::new(handler, registration); | |
80 | tokio::spawn(future); | |
e8edbbd4 | 81 | |
5be106ee | 82 | Ok(Self { |
02141b4d | 83 | rx: Some(rx), |
6afb60ab | 84 | handle: Some(handle), |
5be106ee DM |
85 | error, |
86 | }) | |
e8edbbd4 | 87 | } |
23bb8780 | 88 | |
bf6e3217 | 89 | pub fn open<W: Write + Send + 'static>( |
2761d6a4 | 90 | dirname: &Path, |
bf6e3217 | 91 | catalog: Arc<Mutex<CatalogWriter<W>>>, |
77486a60 | 92 | options: crate::pxar::PxarCreateOptions, |
2761d6a4 | 93 | ) -> Result<Self, Error> { |
728797d0 | 94 | let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; |
23bb8780 | 95 | |
c443f58b WB |
96 | Self::new( |
97 | dir, | |
c443f58b | 98 | catalog, |
77486a60 | 99 | options, |
c443f58b | 100 | ) |
23bb8780 | 101 | } |
e8edbbd4 DM |
102 | } |
103 | ||
8968258b | 104 | impl Stream for PxarBackupStream { |
369a87e3 | 105 | type Item = Result<Vec<u8>, Error>; |
e8edbbd4 | 106 | |
02141b4d | 107 | fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> { |
c443f58b WB |
108 | { |
109 | // limit lock scope | |
6c3c9bce DM |
110 | let error = self.error.lock().unwrap(); |
111 | if let Some(ref msg) = *error { | |
369a87e3 | 112 | return Poll::Ready(Some(Err(format_err!("{}", msg)))); |
6c3c9bce | 113 | } |
5be106ee | 114 | } |
02141b4d DM |
115 | |
116 | match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) { | |
117 | Ok(data) => Poll::Ready(Some(data)), | |
118 | Err(_) => { | |
119 | let error = self.error.lock().unwrap(); | |
120 | if let Some(ref msg) = *error { | |
121 | return Poll::Ready(Some(Err(format_err!("{}", msg)))); | |
122 | } | |
123 | Poll::Ready(None) // channel closed, no error | |
124 | } | |
125 | } | |
e8edbbd4 DM |
126 | } |
127 | } |