]>
Commit | Line | Data |
---|---|---|
bf6e3217 | 1 | use std::io::Write; |
02141b4d | 2 | //use std::os::unix::io::FromRawFd; |
254ec194 | 3 | use std::path::Path; |
369a87e3 WB |
4 | use std::pin::Pin; |
5 | use std::sync::{Arc, Mutex}; | |
6 | use std::task::{Context, Poll}; | |
e8edbbd4 | 7 | |
f7d4e4b5 | 8 | use anyhow::{format_err, Error}; |
e8edbbd4 | 9 | use futures::stream::Stream; |
6afb60ab | 10 | use futures::future::{Abortable, AbortHandle}; |
c443f58b | 11 | use nix::dir::Dir; |
e8edbbd4 DM |
12 | use nix::fcntl::OFlag; |
13 | use nix::sys::stat::Mode; | |
14 | ||
be3a0295 | 15 | use pbs_datastore::catalog::CatalogWriter; |
4805edc4 WB |
16 | use pbs_tools::sync::StdChannelWriter; |
17 | use pbs_tools::tokio::TokioWriterAdapter; | |
9d135fe6 | 18 | |
8968258b | 19 | /// Stream implementation to encode and upload .pxar archives. |
151c6ce2 DM |
20 | /// |
21 | /// The hyper client needs an async Stream for file upload, so we | |
8968258b | 22 | /// spawn an extra thread to encode the .pxar data and pipe it to the |
151c6ce2 | 23 | /// consumer. |
8968258b | 24 | pub struct PxarBackupStream { |
02141b4d | 25 | rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>, |
6afb60ab | 26 | handle: Option<AbortHandle>, |
5be106ee | 27 | error: Arc<Mutex<Option<String>>>, |
e8edbbd4 DM |
28 | } |
29 | ||
8968258b | 30 | impl Drop for PxarBackupStream { |
e8edbbd4 | 31 | fn drop(&mut self) { |
02141b4d | 32 | self.rx = None; |
6afb60ab | 33 | self.handle.take().unwrap().abort(); |
e8edbbd4 DM |
34 | } |
35 | } | |
36 | ||
8968258b | 37 | impl PxarBackupStream { |
bf6e3217 | 38 | pub fn new<W: Write + Send + 'static>( |
c443f58b | 39 | dir: Dir, |
bf6e3217 | 40 | catalog: Arc<Mutex<CatalogWriter<W>>>, |
77486a60 | 41 | options: crate::pxar::PxarCreateOptions, |
2761d6a4 | 42 | ) -> Result<Self, Error> { |
02141b4d | 43 | let (tx, rx) = std::sync::mpsc::sync_channel(10); |
e8edbbd4 | 44 | |
c443f58b | 45 | let buffer_size = 256 * 1024; |
c6e28b66 | 46 | |
5be106ee | 47 | let error = Arc::new(Mutex::new(None)); |
6afb60ab SR |
48 | let error2 = Arc::clone(&error); |
49 | let handler = async move { | |
f1d76ecf | 50 | let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( |
6afb60ab | 51 | buffer_size, |
f1d76ecf DC |
52 | StdChannelWriter::new(tx), |
53 | )); | |
c443f58b | 54 | |
6afb60ab | 55 | let verbose = options.verbose; |
77486a60 | 56 | |
6afb60ab SR |
57 | let writer = pxar::encoder::sync::StandardWriter::new(writer); |
58 | if let Err(err) = crate::pxar::create_archive( | |
59 | dir, | |
60 | writer, | |
61 | crate::pxar::Flags::DEFAULT, | |
62 | move |path| { | |
63 | if verbose { | |
64 | println!("{:?}", path); | |
c443f58b | 65 | } |
6afb60ab SR |
66 | Ok(()) |
67 | }, | |
68 | Some(catalog), | |
69 | options, | |
70 | ).await { | |
71 | let mut error = error2.lock().unwrap(); | |
72 | *error = Some(err.to_string()); | |
73 | } | |
74 | }; | |
75 | ||
76 | let (handle, registration) = AbortHandle::new_pair(); | |
77 | let future = Abortable::new(handler, registration); | |
78 | tokio::spawn(future); | |
e8edbbd4 | 79 | |
5be106ee | 80 | Ok(Self { |
02141b4d | 81 | rx: Some(rx), |
6afb60ab | 82 | handle: Some(handle), |
5be106ee DM |
83 | error, |
84 | }) | |
e8edbbd4 | 85 | } |
23bb8780 | 86 | |
bf6e3217 | 87 | pub fn open<W: Write + Send + 'static>( |
2761d6a4 | 88 | dirname: &Path, |
bf6e3217 | 89 | catalog: Arc<Mutex<CatalogWriter<W>>>, |
77486a60 | 90 | options: crate::pxar::PxarCreateOptions, |
2761d6a4 | 91 | ) -> Result<Self, Error> { |
728797d0 | 92 | let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; |
23bb8780 | 93 | |
c443f58b WB |
94 | Self::new( |
95 | dir, | |
c443f58b | 96 | catalog, |
77486a60 | 97 | options, |
c443f58b | 98 | ) |
23bb8780 | 99 | } |
e8edbbd4 DM |
100 | } |
101 | ||
8968258b | 102 | impl Stream for PxarBackupStream { |
369a87e3 | 103 | type Item = Result<Vec<u8>, Error>; |
e8edbbd4 | 104 | |
02141b4d | 105 | fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> { |
c443f58b WB |
106 | { |
107 | // limit lock scope | |
6c3c9bce DM |
108 | let error = self.error.lock().unwrap(); |
109 | if let Some(ref msg) = *error { | |
369a87e3 | 110 | return Poll::Ready(Some(Err(format_err!("{}", msg)))); |
6c3c9bce | 111 | } |
5be106ee | 112 | } |
02141b4d | 113 | |
d420962f | 114 | match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) { |
02141b4d DM |
115 | Ok(data) => Poll::Ready(Some(data)), |
116 | Err(_) => { | |
117 | let error = self.error.lock().unwrap(); | |
118 | if let Some(ref msg) = *error { | |
119 | return Poll::Ready(Some(Err(format_err!("{}", msg)))); | |
120 | } | |
121 | Poll::Ready(None) // channel closed, no error | |
122 | } | |
123 | } | |
e8edbbd4 DM |
124 | } |
125 | } |