]>
Commit | Line | Data |
---|---|---|
e8edbbd4 DM |
1 | use failure::*; |
2 | ||
3 | use std::thread; | |
5be106ee | 4 | use std::sync::{Arc, Mutex}; |
e8edbbd4 | 5 | use std::os::unix::io::FromRawFd; |
17d6979a | 6 | use std::path::{Path, PathBuf}; |
2eeaacb9 | 7 | use std::collections::HashSet; |
e8edbbd4 | 8 | |
0c516b12 | 9 | use futures::Poll; |
e8edbbd4 DM |
10 | use futures::stream::Stream; |
11 | ||
12 | use nix::fcntl::OFlag; | |
13 | use nix::sys::stat::Mode; | |
23bb8780 | 14 | use nix::dir::Dir; |
e8edbbd4 | 15 | |
3dbfe5b1 | 16 | use crate::pxar; |
0c516b12 | 17 | use crate::tools::wrapped_reader_stream::WrappedReaderStream; |
e8edbbd4 | 18 | |
8968258b | 19 | /// Stream implementation to encode and upload .pxar archives. |
151c6ce2 DM |
20 | /// |
21 | /// The hyper client needs an async Stream for file upload, so we | |
8968258b | 22 | /// spawn an extra thread to encode the .pxar data and pipe it to the |
151c6ce2 | 23 | /// consumer. |
8968258b | 24 | pub struct PxarBackupStream { |
2698e8a5 | 25 | stream: Option<WrappedReaderStream<std::fs::File>>, |
e8edbbd4 | 26 | child: Option<thread::JoinHandle<()>>, |
5be106ee | 27 | error: Arc<Mutex<Option<String>>>, |
e8edbbd4 DM |
28 | } |
29 | ||
8968258b | 30 | impl Drop for PxarBackupStream { |
e8edbbd4 DM |
31 | |
32 | fn drop(&mut self) { | |
2698e8a5 | 33 | self.stream = None; |
e8edbbd4 DM |
34 | self.child.take().unwrap().join().unwrap(); |
35 | } | |
36 | } | |
37 | ||
8968258b | 38 | impl PxarBackupStream { |
e8edbbd4 | 39 | |
2761d6a4 DM |
40 | pub fn new( |
41 | mut dir: Dir, | |
42 | path: PathBuf, | |
43 | device_set: Option<HashSet<u64>>, | |
44 | verbose: bool, | |
45 | skip_lost_and_found: bool, | |
46 | catalog: Arc<Mutex<crate::pxar::catalog::SimpleCatalog>>, | |
47 | ) -> Result<Self, Error> { | |
e8edbbd4 DM |
48 | |
49 | let (rx, tx) = nix::unistd::pipe()?; | |
50 | ||
0c516b12 | 51 | let buffer_size = 1024*1024; |
c6e28b66 DM |
52 | nix::fcntl::fcntl(rx, nix::fcntl::FcntlArg::F_SETPIPE_SZ(buffer_size as i32))?; |
53 | ||
5be106ee DM |
54 | let error = Arc::new(Mutex::new(None)); |
55 | let error2 = error.clone(); | |
56 | ||
2761d6a4 DM |
57 | let catalog = catalog.clone(); |
58 | let child = thread::spawn(move || { | |
59 | let mut guard = catalog.lock().unwrap(); | |
e8edbbd4 | 60 | let mut writer = unsafe { std::fs::File::from_raw_fd(tx) }; |
2761d6a4 | 61 | if let Err(err) = pxar::Encoder::encode(path, &mut dir, &mut writer, Some(&mut *guard), device_set, verbose, skip_lost_and_found, pxar::flags::DEFAULT) { |
5be106ee DM |
62 | let mut error = error2.lock().unwrap(); |
63 | *error = Some(err.to_string()); | |
e8edbbd4 DM |
64 | } |
65 | }); | |
66 | ||
67 | let pipe = unsafe { std::fs::File::from_raw_fd(rx) }; | |
0c516b12 | 68 | let stream = crate::tools::wrapped_reader_stream::WrappedReaderStream::new(pipe); |
e8edbbd4 | 69 | |
5be106ee DM |
70 | Ok(Self { |
71 | stream: Some(stream), | |
72 | child: Some(child), | |
73 | error, | |
74 | }) | |
e8edbbd4 | 75 | } |
23bb8780 | 76 | |
2761d6a4 DM |
77 | pub fn open( |
78 | dirname: &Path, | |
79 | device_set: Option<HashSet<u64>>, | |
80 | verbose: bool, | |
81 | skip_lost_and_found: bool, | |
82 | catalog: Arc<Mutex<crate::pxar::catalog::SimpleCatalog>>, | |
83 | ) -> Result<Self, Error> { | |
23bb8780 | 84 | |
728797d0 | 85 | let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; |
23bb8780 DM |
86 | let path = std::path::PathBuf::from(dirname); |
87 | ||
2761d6a4 | 88 | Self::new(dir, path, device_set, verbose, skip_lost_and_found, catalog) |
23bb8780 | 89 | } |
e8edbbd4 DM |
90 | } |
91 | ||
8968258b | 92 | impl Stream for PxarBackupStream { |
e8edbbd4 DM |
93 | |
94 | type Item = Vec<u8>; | |
95 | type Error = Error; | |
96 | ||
e8edbbd4 | 97 | fn poll(&mut self) -> Poll<Option<Vec<u8>>, Error> { |
6c3c9bce DM |
98 | { // limit lock scope |
99 | let error = self.error.lock().unwrap(); | |
100 | if let Some(ref msg) = *error { | |
101 | return Err(format_err!("{}", msg)); | |
102 | } | |
5be106ee | 103 | } |
2698e8a5 | 104 | self.stream.as_mut().unwrap().poll().map_err(Error::from) |
e8edbbd4 DM |
105 | } |
106 | } |