1 //! Map a raw data reader as a loop device via FUSE
3 use anyhow
::{Error, format_err, bail}
;
5 use std
::path
::{Path, PathBuf}
;
6 use std
::fs
::{File, remove_file, read_to_string, OpenOptions}
;
8 use std
::io
::prelude
::*;
9 use std
::collections
::HashMap
;
12 use nix
::sys
::signal
::{self, Signal}
;
14 use tokio
::io
::{AsyncRead, AsyncSeek, AsyncReadExt, AsyncSeekExt}
;
15 use futures
::stream
::{StreamExt, TryStreamExt}
;
16 use futures
::channel
::mpsc
::{Sender, Receiver}
;
18 use proxmox
::const_regex
;
19 use proxmox
::tools
::time
;
20 use proxmox_fuse
::{*, requests::FuseRequest}
;
23 const RUN_DIR
: &str = "/run/pbs-loopdev";
26 pub LOOPDEV_REGEX
= r
"^loop\d+$";
29 /// Represents an ongoing FUSE-session that has been mapped onto a loop device.
30 /// Create with map_loop, then call 'main' and poll until startup_chan reports
31 /// success. Then, daemonize or otherwise finish setup, and continue polling
32 /// main's future until completion.
33 pub struct FuseLoopSession
<R
: AsyncRead
+ AsyncSeek
+ Unpin
> {
34 session
: Option
<Fuse
>,
39 pub loopdev_path
: String
,
42 impl<R
: AsyncRead
+ AsyncSeek
+ Unpin
> FuseLoopSession
<R
> {
44 /// Prepare for mapping the given reader as a block device node at
45 /// /dev/loopN. Creates a temporary file for FUSE and a PID file for unmap.
46 pub async
fn map_loop
<P
: AsRef
<str>>(size
: u64, mut reader
: R
, name
: P
, options
: &OsStr
)
47 -> Result
<Self, Error
>
49 // attempt a single read to check if the reader is configured correctly
50 let _
= reader
.read_u8().await?
;
52 std
::fs
::create_dir_all(RUN_DIR
)?
;
53 let mut path
= PathBuf
::from(RUN_DIR
);
54 path
.push(name
.as_ref());
55 let mut pid_path
= path
.clone();
56 pid_path
.set_extension("pid");
58 // cleanup previous instance with same name
59 // if loopdev is actually still mapped, this will do nothing and the
60 // create_new below will fail as intended
61 cleanup_unused_run_files(Some(name
.as_ref().to_owned()));
63 match OpenOptions
::new().write(true).create_new(true).open(&path
) {
64 Ok(_
) => { /* file created, continue on */ }
,
66 if e
.kind() == std
::io
::ErrorKind
::AlreadyExists
{
67 bail
!("the given archive is already mapped, cannot map twice");
69 bail
!("error while creating backing file ({:?}) - {}", &path
, e
);
74 let session
= Fuse
::builder("pbs-block-dev")?
80 let loopdev_path
= loopdev
::get_or_create_free_dev().map_err(|err
| {
81 format_err
!("loop-control GET_FREE failed - {}", err
)
84 // write pidfile so unmap can later send us a signal to exit
85 Self::write_pidfile(&pid_path
)?
;
88 session
: Some(session
),
90 stat
: minimal_stat(size
as i64),
91 fuse_path
: path
.to_string_lossy().into_owned(),
92 pid_path
: pid_path
.to_string_lossy().into_owned(),
97 fn write_pidfile(path
: &Path
) -> Result
<(), Error
> {
98 let pid
= unsafe { libc::getpid() }
;
99 let mut file
= File
::create(path
)?
;
100 write
!(file
, "{}", pid
)?
;
104 /// Runs the FUSE request loop and assigns the loop device. Will send a
105 /// message on startup_chan once the loop device is assigned (or assignment
106 /// fails). Send a message on abort_chan to trigger cleanup and exit FUSE.
107 /// An error on loopdev assignment does *not* automatically close the FUSE
108 /// handle or do cleanup, trigger abort_chan manually in case startup fails.
111 mut startup_chan
: Sender
<Result
<(), Error
>>,
112 abort_chan
: Receiver
<()>,
113 ) -> Result
<(), Error
> {
115 if self.session
.is_none() {
116 panic
!("internal error: fuse_loop::main called before ::map_loop");
118 let mut session
= self.session
.take().unwrap().fuse();
119 let mut abort_chan
= abort_chan
.fuse();
121 let (loopdev_path
, fuse_path
) = (self.loopdev_path
.clone(), self.fuse_path
.clone());
122 tokio
::task
::spawn_blocking(move || {
123 if let Err(err
) = loopdev
::assign(loopdev_path
, fuse_path
) {
124 let _
= startup_chan
.try_send(Err(format_err
!("error while assigning loop device - {}", err
)));
126 // device is assigned successfully, which means not only is the
127 // loopdev ready, but FUSE is also okay, since the assignment
128 // would have failed otherwise
129 let _
= startup_chan
.try_send(Ok(()));
133 let (loopdev_path
, fuse_path
, pid_path
) =
134 (self.loopdev_path
.clone(), self.fuse_path
.clone(), self.pid_path
.clone());
135 let cleanup
= |session
: futures
::stream
::Fuse
<Fuse
>| {
136 // only warn for errors on cleanup, if these fail nothing is lost
137 if let Err(err
) = loopdev
::unassign(&loopdev_path
) {
139 "cleanup: warning: could not unassign file {} from loop device {} - {}",
146 // force close FUSE handle before attempting to remove backing file
147 std
::mem
::drop(session
);
149 if let Err(err
) = remove_file(&fuse_path
) {
151 "cleanup: warning: could not remove temporary file {} - {}",
156 if let Err(err
) = remove_file(&pid_path
) {
158 "cleanup: warning: could not remove PID file {} - {}",
167 _
= abort_chan
.next() => {
168 // aborted, do cleanup and exit
171 req
= session
.try_next() => {
172 let res
= match req?
{
173 Some(Request
::Lookup(req
)) => {
174 let stat
= self.stat
;
175 let entry
= EntryParam
::simple(stat
.st_ino
, stat
);
178 Some(Request
::Getattr(req
)) => {
179 req
.reply(&self.stat
, std
::f64::MAX
)
181 Some(Request
::Read(req
)) => {
182 match self.reader
.seek(SeekFrom
::Start(req
.offset
)).await
{
184 let mut buf
= vec
![0u8; req
.size
];
185 match self.reader
.read_exact(&mut buf
).await
{
200 // only FUSE requests necessary for loop-mapping are implemented
201 eprintln
!("Unimplemented FUSE request type encountered");
205 // FUSE connection closed
209 if let Err(err
) = res
{
210 // error during FUSE reply, cleanup and exit
218 // non-error FUSE exit
224 /// Clean up leftover files as well as FUSE instances without a loop device
225 /// connected. Best effort, never returns an error.
226 /// If filter_name is Some("..."), only this name will be cleaned up.
227 pub fn cleanup_unused_run_files(filter_name
: Option
<String
>) {
228 if let Ok(maps
) = find_all_mappings() {
229 for (name
, loopdev
) in maps
{
230 if loopdev
.is_none() &&
231 (filter_name
.is_none() || &name
== filter_name
.as_ref().unwrap())
233 let mut path
= PathBuf
::from(RUN_DIR
);
236 // clean leftover FUSE instances (e.g. user called 'losetup -d' or similar)
237 // does nothing if files are already stagnant (e.g. instance crashed etc...)
238 if unmap_from_backing(&path
, None
).is_ok() {
239 // we have reaped some leftover instance, tell the user
241 "Cleaned up dangling mapping '{}': no loop device assigned",
246 // remove remnant files
247 // these we're not doing anything, so no need to inform the user
248 let _
= remove_file(&path
);
249 path
.set_extension("pid");
250 let _
= remove_file(&path
);
256 fn get_backing_file(loopdev
: &str) -> Result
<String
, Error
> {
257 let num
= loopdev
.split_at(9).1.parse
::<u8>().map_err(|err
|
258 format_err
!("malformed loopdev path, does not end with valid number - {}", err
))?
;
260 let block_path
= PathBuf
::from(format
!("/sys/devices/virtual/block/loop{}/loop/backing_file", num
));
261 let backing_file
= read_to_string(block_path
).map_err(|err
| {
262 if err
.kind() == std
::io
::ErrorKind
::NotFound
{
263 format_err
!("nothing mapped to {}", loopdev
)
265 format_err
!("error reading backing file - {}", err
)
269 let backing_file
= backing_file
.trim();
271 if !backing_file
.starts_with(RUN_DIR
) {
273 "loopdev {} is in use, but not by proxmox-backup-client (mapped to '{}')",
279 Ok(backing_file
.to_owned())
282 // call in broken state: we found the mapping, but the client is already dead,
283 // only thing to do is clean up what we can
284 fn emerg_cleanup (loopdev
: Option
<&str>, mut backing_file
: PathBuf
) {
286 "warning: found mapping with dead process ({:?}), attempting cleanup",
290 if let Some(loopdev
) = loopdev
{
291 let _
= loopdev
::unassign(loopdev
);
294 // killing the backing process does not cancel the FUSE mount automatically
295 let mut command
= std
::process
::Command
::new("fusermount");
297 command
.arg(&backing_file
);
298 let _
= crate::tools
::run_command(command
, None
);
300 let _
= remove_file(&backing_file
);
301 backing_file
.set_extension("pid");
302 let _
= remove_file(&backing_file
);
305 fn unmap_from_backing(backing_file
: &Path
, loopdev
: Option
<&str>) -> Result
<(), Error
> {
306 let mut pid_path
= PathBuf
::from(backing_file
);
307 pid_path
.set_extension("pid");
309 let pid_str
= read_to_string(&pid_path
).map_err(|err
| {
310 if err
.kind() == std
::io
::ErrorKind
::NotFound
{
311 emerg_cleanup(loopdev
, backing_file
.to_owned());
313 format_err
!("error reading pidfile {:?}: {}", &pid_path
, err
)
315 let pid
= pid_str
.parse
::<i32>().map_err(|err
|
316 format_err
!("malformed PID ({}) in pidfile - {}", pid_str
, err
))?
;
318 let pid
= Pid
::from_raw(pid
);
320 // send SIGINT to trigger cleanup and exit in target process
321 match signal
::kill(pid
, Signal
::SIGINT
) {
323 Err(nix
::Error
::Sys(nix
::errno
::Errno
::ESRCH
)) => {
324 emerg_cleanup(loopdev
, backing_file
.to_owned());
327 Err(e
) => return Err(e
.into()),
330 // block until unmap is complete or timeout
331 let start
= time
::epoch_i64();
333 match signal
::kill(pid
, None
) {
335 // 10 second timeout, then assume failure
336 if (time
::epoch_i64() - start
) > 10 {
337 return Err(format_err
!("timed out waiting for PID '{}' to exit", &pid
));
339 std
::thread
::sleep(std
::time
::Duration
::from_millis(100));
341 Err(nix
::Error
::Sys(nix
::errno
::Errno
::ESRCH
)) => {
344 Err(e
) => return Err(e
.into()),
351 /// Returns an Iterator over a set of currently active mappings, i.e.
352 /// FuseLoopSession instances. Returns ("backing-file-name", Some("/dev/loopX"))
353 /// where .1 is None when a user has manually called 'losetup -d' or similar but
354 /// the FUSE instance is still running.
355 pub fn find_all_mappings() -> Result
<impl Iterator
<Item
= (String
, Option
<String
>)>, Error
> {
356 // get map of all /dev/loop mappings belonging to us
357 let mut loopmap
= HashMap
::new();
358 for ent
in pbs_tools
::fs
::scan_subdir(libc
::AT_FDCWD
, Path
::new("/dev/"), &LOOPDEV_REGEX
)?
{
359 if let Ok(ent
) = ent
{
360 let loopdev
= format
!("/dev/{}", ent
.file_name().to_string_lossy());
361 if let Ok(file
) = get_backing_file(&loopdev
) {
362 // insert filename only, strip RUN_DIR/
363 loopmap
.insert(file
[RUN_DIR
.len()+1..].to_owned(), loopdev
);
368 Ok(pbs_tools
::fs
::read_subdir(libc
::AT_FDCWD
, Path
::new(RUN_DIR
))?
369 .filter_map(move |ent
| {
372 let file
= ent
.file_name().to_string_lossy();
373 if file
== "." || file
== ".." || file
.ends_with(".pid") {
376 let loopdev
= loopmap
.get(file
.as_ref()).map(String
::to_owned
);
377 Some((file
.into_owned(), loopdev
))
385 /// Try and unmap a running proxmox-backup-client instance from the given
386 /// /dev/loopN device
387 pub fn unmap_loopdev
<S
: AsRef
<str>>(loopdev
: S
) -> Result
<(), Error
> {
388 let loopdev
= loopdev
.as_ref();
389 if loopdev
.len() < 10 || !loopdev
.starts_with("/dev/loop") {
390 bail
!("malformed loopdev path, must be in format '/dev/loopX'");
393 let backing_file
= get_backing_file(loopdev
)?
;
394 unmap_from_backing(Path
::new(&backing_file
), Some(loopdev
))
397 /// Try and unmap a running proxmox-backup-client instance from the given name
398 pub fn unmap_name
<S
: AsRef
<str>>(name
: S
) -> Result
<(), Error
> {
399 for (mapping
, loopdev
) in find_all_mappings()?
{
400 if mapping
.ends_with(name
.as_ref()) {
401 let mut path
= PathBuf
::from(RUN_DIR
);
403 return unmap_from_backing(&path
, loopdev
.as_deref());
406 Err(format_err
!("no mapping for name '{}' found", name
.as_ref()))
409 fn minimal_stat(size
: i64) -> libc
::stat
{
410 let mut stat
: libc
::stat
= unsafe { std::mem::zeroed() }
;
411 stat
.st_mode
= libc
::S_IFREG
;