]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/fuse_loop.rs
fuse_loop: wait for instance to close after killing
[proxmox-backup.git] / src / tools / fuse_loop.rs
1 //! Map a raw data reader as a loop device via FUSE
2
3 use anyhow::{Error, format_err, bail};
4 use std::ffi::OsStr;
5 use std::path::{Path, PathBuf};
6 use std::fs::{File, remove_file, read_to_string, OpenOptions};
7 use std::io::SeekFrom;
8 use std::io::prelude::*;
9 use std::collections::HashMap;
10
11 use nix::unistd::Pid;
12 use nix::sys::signal::{self, Signal};
13
14 use tokio::io::{AsyncRead, AsyncSeek, AsyncReadExt, AsyncSeekExt};
15 use futures::stream::{StreamExt, TryStreamExt};
16 use futures::channel::mpsc::{Sender, Receiver};
17
18 use proxmox::const_regex;
19 use proxmox::tools::time;
20 use proxmox_fuse::{*, requests::FuseRequest};
21 use super::loopdev;
22 use super::fs;
23
24 const RUN_DIR: &'static str = "/run/pbs-loopdev";
25
26 const_regex! {
27 pub LOOPDEV_REGEX = r"^loop\d+$";
28 }
29
30 /// Represents an ongoing FUSE-session that has been mapped onto a loop device.
31 /// Create with map_loop, then call 'main' and poll until startup_chan reports
32 /// success. Then, daemonize or otherwise finish setup, and continue polling
33 /// main's future until completion.
34 pub struct FuseLoopSession<R: AsyncRead + AsyncSeek + Unpin> {
35 session: Option<Fuse>,
36 stat: libc::stat,
37 reader: R,
38 fuse_path: String,
39 pid_path: String,
40 pub loopdev_path: String,
41 }
42
43 impl<R: AsyncRead + AsyncSeek + Unpin> FuseLoopSession<R> {
44
45 /// Prepare for mapping the given reader as a block device node at
46 /// /dev/loopN. Creates a temporary file for FUSE and a PID file for unmap.
47 pub async fn map_loop<P: AsRef<str>>(size: u64, mut reader: R, name: P, options: &OsStr)
48 -> Result<Self, Error>
49 {
50 // attempt a single read to check if the reader is configured correctly
51 let _ = reader.read_u8().await?;
52
53 std::fs::create_dir_all(RUN_DIR)?;
54 let mut path = PathBuf::from(RUN_DIR);
55 path.push(name.as_ref());
56 let mut pid_path = path.clone();
57 pid_path.set_extension("pid");
58
59 // cleanup previous instance with same name
60 // if loopdev is actually still mapped, this will do nothing and the
61 // create_new below will fail as intended
62 cleanup_unused_run_files(Some(name.as_ref().to_owned()));
63
64 match OpenOptions::new().write(true).create_new(true).open(&path) {
65 Ok(_) => { /* file created, continue on */ },
66 Err(e) => {
67 if e.kind() == std::io::ErrorKind::AlreadyExists {
68 bail!("the given archive is already mapped, cannot map twice");
69 } else {
70 bail!("error while creating backing file ({:?}) - {}", &path, e);
71 }
72 },
73 }
74
75 let session = Fuse::builder("pbs-block-dev")?
76 .options_os(options)?
77 .enable_read()
78 .build()?
79 .mount(&path)?;
80
81 let loopdev_path = loopdev::get_or_create_free_dev().map_err(|err| {
82 format_err!("loop-control GET_FREE failed - {}", err)
83 })?;
84
85 // write pidfile so unmap can later send us a signal to exit
86 Self::write_pidfile(&pid_path)?;
87
88 Ok(Self {
89 session: Some(session),
90 reader,
91 stat: minimal_stat(size as i64),
92 fuse_path: path.to_string_lossy().into_owned(),
93 pid_path: pid_path.to_string_lossy().into_owned(),
94 loopdev_path,
95 })
96 }
97
98 fn write_pidfile(path: &Path) -> Result<(), Error> {
99 let pid = unsafe { libc::getpid() };
100 let mut file = File::create(path)?;
101 write!(file, "{}", pid)?;
102 Ok(())
103 }
104
105 /// Runs the FUSE request loop and assigns the loop device. Will send a
106 /// message on startup_chan once the loop device is assigned (or assignment
107 /// fails). Send a message on abort_chan to trigger cleanup and exit FUSE.
108 /// An error on loopdev assignment does *not* automatically close the FUSE
109 /// handle or do cleanup, trigger abort_chan manually in case startup fails.
110 pub async fn main(
111 &mut self,
112 mut startup_chan: Sender<Result<(), Error>>,
113 abort_chan: Receiver<()>,
114 ) -> Result<(), Error> {
115
116 if let None = self.session {
117 panic!("internal error: fuse_loop::main called before ::map_loop");
118 }
119 let mut session = self.session.take().unwrap().fuse();
120 let mut abort_chan = abort_chan.fuse();
121
122 let (loopdev_path, fuse_path) = (self.loopdev_path.clone(), self.fuse_path.clone());
123 tokio::task::spawn_blocking(move || {
124 if let Err(err) = loopdev::assign(loopdev_path, fuse_path) {
125 let _ = startup_chan.try_send(Err(format_err!("error while assigning loop device - {}", err)));
126 } else {
127 // device is assigned successfully, which means not only is the
128 // loopdev ready, but FUSE is also okay, since the assignment
129 // would have failed otherwise
130 let _ = startup_chan.try_send(Ok(()));
131 }
132 });
133
134 let (loopdev_path, fuse_path, pid_path) =
135 (self.loopdev_path.clone(), self.fuse_path.clone(), self.pid_path.clone());
136 let cleanup = |session: futures::stream::Fuse<Fuse>| {
137 // only warn for errors on cleanup, if these fail nothing is lost
138 if let Err(err) = loopdev::unassign(&loopdev_path) {
139 eprintln!(
140 "cleanup: warning: could not unassign file {} from loop device {} - {}",
141 &fuse_path,
142 &loopdev_path,
143 err,
144 );
145 }
146
147 // force close FUSE handle before attempting to remove backing file
148 std::mem::drop(session);
149
150 if let Err(err) = remove_file(&fuse_path) {
151 eprintln!(
152 "cleanup: warning: could not remove temporary file {} - {}",
153 &fuse_path,
154 err,
155 );
156 }
157 if let Err(err) = remove_file(&pid_path) {
158 eprintln!(
159 "cleanup: warning: could not remove PID file {} - {}",
160 &pid_path,
161 err,
162 );
163 }
164 };
165
166 loop {
167 tokio::select!{
168 _ = abort_chan.next() => {
169 // aborted, do cleanup and exit
170 break;
171 },
172 req = session.try_next() => {
173 let res = match req? {
174 Some(Request::Lookup(req)) => {
175 let stat = self.stat;
176 let entry = EntryParam::simple(stat.st_ino, stat);
177 req.reply(&entry)
178 },
179 Some(Request::Getattr(req)) => {
180 req.reply(&self.stat, std::f64::MAX)
181 },
182 Some(Request::Read(req)) => {
183 match self.reader.seek(SeekFrom::Start(req.offset)).await {
184 Ok(_) => {
185 let mut buf = vec![0u8; req.size];
186 match self.reader.read_exact(&mut buf).await {
187 Ok(_) => {
188 req.reply(&buf)
189 },
190 Err(e) => {
191 req.io_fail(e)
192 }
193 }
194 },
195 Err(e) => {
196 req.io_fail(e)
197 }
198 }
199 },
200 Some(_) => {
201 // only FUSE requests necessary for loop-mapping are implemented
202 eprintln!("Unimplemented FUSE request type encountered");
203 Ok(())
204 },
205 None => {
206 // FUSE connection closed
207 break;
208 }
209 };
210 if let Err(err) = res {
211 // error during FUSE reply, cleanup and exit
212 cleanup(session);
213 bail!(err);
214 }
215 }
216 }
217 }
218
219 // non-error FUSE exit
220 cleanup(session);
221 Ok(())
222 }
223 }
224
225 /// Clean up leftover files as well as FUSE instances without a loop device
226 /// connected. Best effort, never returns an error.
227 /// If filter_name is Some("..."), only this name will be cleaned up.
228 pub fn cleanup_unused_run_files(filter_name: Option<String>) {
229 if let Ok(maps) = find_all_mappings() {
230 for (name, loopdev) in maps {
231 if loopdev.is_none() &&
232 (filter_name.is_none() || &name == filter_name.as_ref().unwrap())
233 {
234 let mut path = PathBuf::from(RUN_DIR);
235 path.push(&name);
236
237 // clean leftover FUSE instances (e.g. user called 'losetup -d' or similar)
238 // does nothing if files are already stagnant (e.g. instance crashed etc...)
239 if let Ok(_) = unmap_from_backing(&path) {
240 // we have reaped some leftover instance, tell the user
241 eprintln!(
242 "Cleaned up dangling mapping '{}': no loop device assigned",
243 &name
244 );
245 }
246
247 // remove remnant files
248 // these we're not doing anything, so no need to inform the user
249 let _ = remove_file(&path);
250 path.set_extension("pid");
251 let _ = remove_file(&path);
252 }
253 }
254 }
255 }
256
257 fn get_backing_file(loopdev: &str) -> Result<String, Error> {
258 let num = loopdev.split_at(9).1.parse::<u8>().map_err(|err|
259 format_err!("malformed loopdev path, does not end with valid number - {}", err))?;
260
261 let block_path = PathBuf::from(format!("/sys/devices/virtual/block/loop{}/loop/backing_file", num));
262 let backing_file = read_to_string(block_path).map_err(|err| {
263 if err.kind() == std::io::ErrorKind::NotFound {
264 format_err!("nothing mapped to {}", loopdev)
265 } else {
266 format_err!("error reading backing file - {}", err)
267 }
268 })?;
269
270 let backing_file = backing_file.trim();
271
272 if !backing_file.starts_with(RUN_DIR) {
273 bail!(
274 "loopdev {} is in use, but not by proxmox-backup-client (mapped to '{}')",
275 loopdev,
276 backing_file,
277 );
278 }
279
280 Ok(backing_file.to_owned())
281 }
282
283 fn unmap_from_backing(backing_file: &Path) -> Result<(), Error> {
284 let mut pid_path = PathBuf::from(backing_file);
285 pid_path.set_extension("pid");
286
287 let pid_str = read_to_string(&pid_path).map_err(|err|
288 format_err!("error reading pidfile {:?}: {}", &pid_path, err))?;
289 let pid = pid_str.parse::<i32>().map_err(|err|
290 format_err!("malformed PID ({}) in pidfile - {}", pid_str, err))?;
291
292 let pid = Pid::from_raw(pid);
293
294 // send SIGINT to trigger cleanup and exit in target process
295 signal::kill(pid, Signal::SIGINT)?;
296
297 // block until unmap is complete or timeout
298 let start = time::epoch_i64();
299 loop {
300 match signal::kill(pid, None) {
301 Ok(_) => {
302 // 10 second timeout, then assume failure
303 if (time::epoch_i64() - start) > 10 {
304 return Err(format_err!("timed out waiting for PID '{}' to exit", &pid));
305 }
306 std::thread::sleep(std::time::Duration::from_millis(100));
307 },
308 Err(nix::Error::Sys(nix::errno::Errno::ESRCH)) => {
309 break;
310 },
311 Err(e) => return Err(e.into()),
312 }
313 }
314
315 Ok(())
316 }
317
318 /// Returns an Iterator over a set of currently active mappings, i.e.
319 /// FuseLoopSession instances. Returns ("backing-file-name", Some("/dev/loopX"))
320 /// where .1 is None when a user has manually called 'losetup -d' or similar but
321 /// the FUSE instance is still running.
322 pub fn find_all_mappings() -> Result<impl Iterator<Item = (String, Option<String>)>, Error> {
323 // get map of all /dev/loop mappings belonging to us
324 let mut loopmap = HashMap::new();
325 for ent in fs::scan_subdir(libc::AT_FDCWD, Path::new("/dev/"), &LOOPDEV_REGEX)? {
326 match ent {
327 Ok(ent) => {
328 let loopdev = format!("/dev/{}", ent.file_name().to_string_lossy());
329 match get_backing_file(&loopdev) {
330 Ok(file) => {
331 // insert filename only, strip RUN_DIR/
332 loopmap.insert(file[RUN_DIR.len()+1..].to_owned(), loopdev);
333 },
334 Err(_) => {},
335 }
336 },
337 Err(_) => {},
338 }
339 }
340
341 Ok(fs::read_subdir(libc::AT_FDCWD, Path::new(RUN_DIR))?
342 .filter_map(move |ent| {
343 match ent {
344 Ok(ent) => {
345 let file = ent.file_name().to_string_lossy();
346 if file == "." || file == ".." || file.ends_with(".pid") {
347 None
348 } else {
349 let loopdev = loopmap.get(file.as_ref()).map(String::to_owned);
350 Some((file.into_owned(), loopdev))
351 }
352 },
353 Err(_) => None,
354 }
355 }))
356 }
357
358 /// Try and unmap a running proxmox-backup-client instance from the given
359 /// /dev/loopN device
360 pub fn unmap_loopdev<S: AsRef<str>>(loopdev: S) -> Result<(), Error> {
361 let loopdev = loopdev.as_ref();
362 if loopdev.len() < 10 || !loopdev.starts_with("/dev/loop") {
363 bail!("malformed loopdev path, must be in format '/dev/loopX'");
364 }
365
366 let backing_file = get_backing_file(loopdev)?;
367 unmap_from_backing(Path::new(&backing_file))
368 }
369
370 /// Try and unmap a running proxmox-backup-client instance from the given name
371 pub fn unmap_name<S: AsRef<str>>(name: S) -> Result<(), Error> {
372 for (mapping, _) in find_all_mappings()? {
373 if mapping.ends_with(name.as_ref()) {
374 let mut path = PathBuf::from(RUN_DIR);
375 path.push(&mapping);
376 return unmap_from_backing(&path);
377 }
378 }
379 Err(format_err!("no mapping for name '{}' found", name.as_ref()))
380 }
381
382 fn minimal_stat(size: i64) -> libc::stat {
383 let mut stat: libc::stat = unsafe { std::mem::zeroed() };
384 stat.st_mode = libc::S_IFREG;
385 stat.st_ino = 1;
386 stat.st_nlink = 1;
387 stat.st_size = size;
388 stat
389 }