]>
Commit | Line | Data |
---|---|---|
1 | use std::collections::HashSet; | |
2 | use std::io::Write; | |
3 | //use std::os::unix::io::FromRawFd; | |
4 | use std::path::{Path, PathBuf}; | |
5 | use std::pin::Pin; | |
6 | use std::sync::{Arc, Mutex}; | |
7 | use std::task::{Context, Poll}; | |
8 | use std::thread; | |
9 | ||
10 | use anyhow::{format_err, Error}; | |
11 | use futures::stream::Stream; | |
12 | use nix::dir::Dir; | |
13 | use nix::fcntl::OFlag; | |
14 | use nix::sys::stat::Mode; | |
15 | ||
16 | use pathpatterns::MatchEntry; | |
17 | ||
18 | use crate::backup::CatalogWriter; | |
19 | ||
20 | /// Stream implementation to encode and upload .pxar archives. | |
21 | /// | |
22 | /// The hyper client needs an async Stream for file upload, so we | |
23 | /// spawn an extra thread to encode the .pxar data and pipe it to the | |
24 | /// consumer. | |
25 | pub struct PxarBackupStream { | |
26 | rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>, | |
27 | child: Option<thread::JoinHandle<()>>, | |
28 | error: Arc<Mutex<Option<String>>>, | |
29 | } | |
30 | ||
31 | impl Drop for PxarBackupStream { | |
32 | fn drop(&mut self) { | |
33 | self.rx = None; | |
34 | self.child.take().unwrap().join().unwrap(); | |
35 | } | |
36 | } | |
37 | ||
38 | impl PxarBackupStream { | |
39 | pub fn new<W: Write + Send + 'static>( | |
40 | dir: Dir, | |
41 | _path: PathBuf, | |
42 | device_set: Option<HashSet<u64>>, | |
43 | _verbose: bool, | |
44 | skip_lost_and_found: bool, | |
45 | catalog: Arc<Mutex<CatalogWriter<W>>>, | |
46 | patterns: Vec<MatchEntry>, | |
47 | entries_max: usize, | |
48 | ) -> Result<Self, Error> { | |
49 | let (tx, rx) = std::sync::mpsc::sync_channel(10); | |
50 | ||
51 | let buffer_size = 256 * 1024; | |
52 | ||
53 | let error = Arc::new(Mutex::new(None)); | |
54 | let child = std::thread::Builder::new() | |
55 | .name("PxarBackupStream".to_string()) | |
56 | .spawn({ | |
57 | let error = Arc::clone(&error); | |
58 | move || { | |
59 | let mut catalog_guard = catalog.lock().unwrap(); | |
60 | let writer = std::io::BufWriter::with_capacity( | |
61 | buffer_size, | |
62 | crate::tools::StdChannelWriter::new(tx), | |
63 | ); | |
64 | ||
65 | let writer = pxar::encoder::sync::StandardWriter::new(writer); | |
66 | if let Err(err) = crate::pxar::create_archive( | |
67 | dir, | |
68 | writer, | |
69 | patterns, | |
70 | crate::pxar::flags::DEFAULT, | |
71 | device_set, | |
72 | skip_lost_and_found, | |
73 | |_| Ok(()), | |
74 | entries_max, | |
75 | Some(&mut *catalog_guard), | |
76 | ) { | |
77 | let mut error = error.lock().unwrap(); | |
78 | *error = Some(err.to_string()); | |
79 | } | |
80 | } | |
81 | })?; | |
82 | ||
83 | Ok(Self { | |
84 | rx: Some(rx), | |
85 | child: Some(child), | |
86 | error, | |
87 | }) | |
88 | } | |
89 | ||
90 | pub fn open<W: Write + Send + 'static>( | |
91 | dirname: &Path, | |
92 | device_set: Option<HashSet<u64>>, | |
93 | verbose: bool, | |
94 | skip_lost_and_found: bool, | |
95 | catalog: Arc<Mutex<CatalogWriter<W>>>, | |
96 | patterns: Vec<MatchEntry>, | |
97 | entries_max: usize, | |
98 | ) -> Result<Self, Error> { | |
99 | let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; | |
100 | let path = std::path::PathBuf::from(dirname); | |
101 | ||
102 | Self::new( | |
103 | dir, | |
104 | path, | |
105 | device_set, | |
106 | verbose, | |
107 | skip_lost_and_found, | |
108 | catalog, | |
109 | patterns, | |
110 | entries_max, | |
111 | ) | |
112 | } | |
113 | } | |
114 | ||
115 | impl Stream for PxarBackupStream { | |
116 | type Item = Result<Vec<u8>, Error>; | |
117 | ||
118 | fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> { | |
119 | { | |
120 | // limit lock scope | |
121 | let error = self.error.lock().unwrap(); | |
122 | if let Some(ref msg) = *error { | |
123 | return Poll::Ready(Some(Err(format_err!("{}", msg)))); | |
124 | } | |
125 | } | |
126 | ||
127 | match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) { | |
128 | Ok(data) => Poll::Ready(Some(data)), | |
129 | Err(_) => { | |
130 | let error = self.error.lock().unwrap(); | |
131 | if let Some(ref msg) = *error { | |
132 | return Poll::Ready(Some(Err(format_err!("{}", msg)))); | |
133 | } | |
134 | Poll::Ready(None) // channel closed, no error | |
135 | } | |
136 | } | |
137 | } | |
138 | } |