]>
Commit | Line | Data |
---|---|---|
369a87e3 | 1 | use std::collections::HashSet; |
bf6e3217 | 2 | use std::io::Write; |
02141b4d | 3 | //use std::os::unix::io::FromRawFd; |
254ec194 | 4 | use std::path::Path; |
369a87e3 WB |
5 | use std::pin::Pin; |
6 | use std::sync::{Arc, Mutex}; | |
7 | use std::task::{Context, Poll}; | |
8 | use std::thread; | |
e8edbbd4 | 9 | |
f7d4e4b5 | 10 | use anyhow::{format_err, Error}; |
e8edbbd4 | 11 | use futures::stream::Stream; |
c443f58b | 12 | use nix::dir::Dir; |
e8edbbd4 DM |
13 | use nix::fcntl::OFlag; |
14 | use nix::sys::stat::Mode; | |
15 | ||
c443f58b WB |
16 | use pathpatterns::MatchEntry; |
17 | ||
bf6e3217 | 18 | use crate::backup::CatalogWriter; |
9d135fe6 | 19 | |
8968258b | 20 | /// Stream implementation to encode and upload .pxar archives. |
151c6ce2 DM |
21 | /// |
22 | /// The hyper client needs an async Stream for file upload, so we | |
8968258b | 23 | /// spawn an extra thread to encode the .pxar data and pipe it to the |
151c6ce2 | 24 | /// consumer. |
8968258b | 25 | pub struct PxarBackupStream { |
02141b4d | 26 | rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>, |
e8edbbd4 | 27 | child: Option<thread::JoinHandle<()>>, |
5be106ee | 28 | error: Arc<Mutex<Option<String>>>, |
e8edbbd4 DM |
29 | } |
30 | ||
8968258b | 31 | impl Drop for PxarBackupStream { |
e8edbbd4 | 32 | fn drop(&mut self) { |
02141b4d | 33 | self.rx = None; |
e8edbbd4 DM |
34 | self.child.take().unwrap().join().unwrap(); |
35 | } | |
36 | } | |
37 | ||
8968258b | 38 | impl PxarBackupStream { |
bf6e3217 | 39 | pub fn new<W: Write + Send + 'static>( |
c443f58b | 40 | dir: Dir, |
2761d6a4 | 41 | device_set: Option<HashSet<u64>>, |
97bbd1bf | 42 | verbose: bool, |
2761d6a4 | 43 | skip_lost_and_found: bool, |
bf6e3217 | 44 | catalog: Arc<Mutex<CatalogWriter<W>>>, |
239e49f9 | 45 | patterns: Vec<MatchEntry>, |
6fc053ed | 46 | entries_max: usize, |
2761d6a4 | 47 | ) -> Result<Self, Error> { |
02141b4d | 48 | let (tx, rx) = std::sync::mpsc::sync_channel(10); |
e8edbbd4 | 49 | |
c443f58b | 50 | let buffer_size = 256 * 1024; |
c6e28b66 | 51 | |
5be106ee | 52 | let error = Arc::new(Mutex::new(None)); |
c443f58b WB |
53 | let child = std::thread::Builder::new() |
54 | .name("PxarBackupStream".to_string()) | |
55 | .spawn({ | |
56 | let error = Arc::clone(&error); | |
57 | move || { | |
58 | let mut catalog_guard = catalog.lock().unwrap(); | |
59 | let writer = std::io::BufWriter::with_capacity( | |
60 | buffer_size, | |
61 | crate::tools::StdChannelWriter::new(tx), | |
62 | ); | |
63 | ||
64 | let writer = pxar::encoder::sync::StandardWriter::new(writer); | |
65 | if let Err(err) = crate::pxar::create_archive( | |
66 | dir, | |
67 | writer, | |
239e49f9 | 68 | patterns, |
5444fa94 | 69 | crate::pxar::Flags::DEFAULT, |
c443f58b WB |
70 | device_set, |
71 | skip_lost_and_found, | |
97bbd1bf WB |
72 | |path| { |
73 | if verbose { | |
74 | println!("{:?}", path); | |
75 | } | |
76 | Ok(()) | |
77 | }, | |
c443f58b WB |
78 | entries_max, |
79 | Some(&mut *catalog_guard), | |
80 | ) { | |
81 | let mut error = error.lock().unwrap(); | |
82 | *error = Some(err.to_string()); | |
83 | } | |
84 | } | |
85 | })?; | |
e8edbbd4 | 86 | |
5be106ee | 87 | Ok(Self { |
02141b4d | 88 | rx: Some(rx), |
5be106ee DM |
89 | child: Some(child), |
90 | error, | |
91 | }) | |
e8edbbd4 | 92 | } |
23bb8780 | 93 | |
bf6e3217 | 94 | pub fn open<W: Write + Send + 'static>( |
2761d6a4 DM |
95 | dirname: &Path, |
96 | device_set: Option<HashSet<u64>>, | |
97 | verbose: bool, | |
98 | skip_lost_and_found: bool, | |
bf6e3217 | 99 | catalog: Arc<Mutex<CatalogWriter<W>>>, |
239e49f9 | 100 | patterns: Vec<MatchEntry>, |
6fc053ed | 101 | entries_max: usize, |
2761d6a4 | 102 | ) -> Result<Self, Error> { |
728797d0 | 103 | let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; |
23bb8780 | 104 | |
c443f58b WB |
105 | Self::new( |
106 | dir, | |
c443f58b WB |
107 | device_set, |
108 | verbose, | |
109 | skip_lost_and_found, | |
110 | catalog, | |
239e49f9 | 111 | patterns, |
c443f58b WB |
112 | entries_max, |
113 | ) | |
23bb8780 | 114 | } |
e8edbbd4 DM |
115 | } |
116 | ||
8968258b | 117 | impl Stream for PxarBackupStream { |
369a87e3 | 118 | type Item = Result<Vec<u8>, Error>; |
e8edbbd4 | 119 | |
02141b4d | 120 | fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> { |
c443f58b WB |
121 | { |
122 | // limit lock scope | |
6c3c9bce DM |
123 | let error = self.error.lock().unwrap(); |
124 | if let Some(ref msg) = *error { | |
369a87e3 | 125 | return Poll::Ready(Some(Err(format_err!("{}", msg)))); |
6c3c9bce | 126 | } |
5be106ee | 127 | } |
02141b4d DM |
128 | |
129 | match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) { | |
130 | Ok(data) => Poll::Ready(Some(data)), | |
131 | Err(_) => { | |
132 | let error = self.error.lock().unwrap(); | |
133 | if let Some(ref msg) = *error { | |
134 | return Poll::Ready(Some(Err(format_err!("{}", msg)))); | |
135 | } | |
136 | Poll::Ready(None) // channel closed, no error | |
137 | } | |
138 | } | |
e8edbbd4 DM |
139 | } |
140 | } |