1 //! Map a raw data reader as a loop device via FUSE
3 use anyhow
::{Error, format_err, bail}
;
5 use std
::path
::{Path, PathBuf}
;
6 use std
::fs
::{File, remove_file, read_to_string, OpenOptions}
;
8 use std
::io
::prelude
::*;
9 use std
::collections
::HashMap
;
12 use nix
::sys
::signal
::{self, Signal}
;
14 use tokio
::io
::{AsyncRead, AsyncSeek, AsyncReadExt, AsyncSeekExt}
;
15 use futures
::stream
::{StreamExt, TryStreamExt}
;
16 use futures
::channel
::mpsc
::{Sender, Receiver}
;
18 use proxmox
::const_regex
;
19 use proxmox
::tools
::time
;
20 use proxmox_fuse
::{*, requests::FuseRequest}
;
24 const RUN_DIR
: &'
static str = "/run/pbs-loopdev";
27 pub LOOPDEV_REGEX
= r
"^loop\d+$";
30 /// Represents an ongoing FUSE-session that has been mapped onto a loop device.
31 /// Create with map_loop, then call 'main' and poll until startup_chan reports
32 /// success. Then, daemonize or otherwise finish setup, and continue polling
33 /// main's future until completion.
34 pub struct FuseLoopSession
<R
: AsyncRead
+ AsyncSeek
+ Unpin
> {
35 session
: Option
<Fuse
>,
40 pub loopdev_path
: String
,
43 impl<R
: AsyncRead
+ AsyncSeek
+ Unpin
> FuseLoopSession
<R
> {
45 /// Prepare for mapping the given reader as a block device node at
46 /// /dev/loopN. Creates a temporary file for FUSE and a PID file for unmap.
47 pub async
fn map_loop
<P
: AsRef
<str>>(size
: u64, mut reader
: R
, name
: P
, options
: &OsStr
)
48 -> Result
<Self, Error
>
50 // attempt a single read to check if the reader is configured correctly
51 let _
= reader
.read_u8().await?
;
53 std
::fs
::create_dir_all(RUN_DIR
)?
;
54 let mut path
= PathBuf
::from(RUN_DIR
);
55 path
.push(name
.as_ref());
56 let mut pid_path
= path
.clone();
57 pid_path
.set_extension("pid");
59 // cleanup previous instance with same name
60 // if loopdev is actually still mapped, this will do nothing and the
61 // create_new below will fail as intended
62 cleanup_unused_run_files(Some(name
.as_ref().to_owned()));
64 match OpenOptions
::new().write(true).create_new(true).open(&path
) {
65 Ok(_
) => { /* file created, continue on */ }
,
67 if e
.kind() == std
::io
::ErrorKind
::AlreadyExists
{
68 bail
!("the given archive is already mapped, cannot map twice");
70 bail
!("error while creating backing file ({:?}) - {}", &path
, e
);
75 let session
= Fuse
::builder("pbs-block-dev")?
81 let loopdev_path
= loopdev
::get_or_create_free_dev().map_err(|err
| {
82 format_err
!("loop-control GET_FREE failed - {}", err
)
85 // write pidfile so unmap can later send us a signal to exit
86 Self::write_pidfile(&pid_path
)?
;
89 session
: Some(session
),
91 stat
: minimal_stat(size
as i64),
92 fuse_path
: path
.to_string_lossy().into_owned(),
93 pid_path
: pid_path
.to_string_lossy().into_owned(),
98 fn write_pidfile(path
: &Path
) -> Result
<(), Error
> {
99 let pid
= unsafe { libc::getpid() }
;
100 let mut file
= File
::create(path
)?
;
101 write
!(file
, "{}", pid
)?
;
105 /// Runs the FUSE request loop and assigns the loop device. Will send a
106 /// message on startup_chan once the loop device is assigned (or assignment
107 /// fails). Send a message on abort_chan to trigger cleanup and exit FUSE.
108 /// An error on loopdev assignment does *not* automatically close the FUSE
109 /// handle or do cleanup, trigger abort_chan manually in case startup fails.
112 mut startup_chan
: Sender
<Result
<(), Error
>>,
113 abort_chan
: Receiver
<()>,
114 ) -> Result
<(), Error
> {
116 if let None
= self.session
{
117 panic
!("internal error: fuse_loop::main called before ::map_loop");
119 let mut session
= self.session
.take().unwrap().fuse();
120 let mut abort_chan
= abort_chan
.fuse();
122 let (loopdev_path
, fuse_path
) = (self.loopdev_path
.clone(), self.fuse_path
.clone());
123 tokio
::task
::spawn_blocking(move || {
124 if let Err(err
) = loopdev
::assign(loopdev_path
, fuse_path
) {
125 let _
= startup_chan
.try_send(Err(format_err
!("error while assigning loop device - {}", err
)));
127 // device is assigned successfully, which means not only is the
128 // loopdev ready, but FUSE is also okay, since the assignment
129 // would have failed otherwise
130 let _
= startup_chan
.try_send(Ok(()));
134 let (loopdev_path
, fuse_path
, pid_path
) =
135 (self.loopdev_path
.clone(), self.fuse_path
.clone(), self.pid_path
.clone());
136 let cleanup
= |session
: futures
::stream
::Fuse
<Fuse
>| {
137 // only warn for errors on cleanup, if these fail nothing is lost
138 if let Err(err
) = loopdev
::unassign(&loopdev_path
) {
140 "cleanup: warning: could not unassign file {} from loop device {} - {}",
147 // force close FUSE handle before attempting to remove backing file
148 std
::mem
::drop(session
);
150 if let Err(err
) = remove_file(&fuse_path
) {
152 "cleanup: warning: could not remove temporary file {} - {}",
157 if let Err(err
) = remove_file(&pid_path
) {
159 "cleanup: warning: could not remove PID file {} - {}",
168 _
= abort_chan
.next() => {
169 // aborted, do cleanup and exit
172 req
= session
.try_next() => {
173 let res
= match req?
{
174 Some(Request
::Lookup(req
)) => {
175 let stat
= self.stat
;
176 let entry
= EntryParam
::simple(stat
.st_ino
, stat
);
179 Some(Request
::Getattr(req
)) => {
180 req
.reply(&self.stat
, std
::f64::MAX
)
182 Some(Request
::Read(req
)) => {
183 match self.reader
.seek(SeekFrom
::Start(req
.offset
)).await
{
185 let mut buf
= vec
![0u8; req
.size
];
186 match self.reader
.read_exact(&mut buf
).await
{
201 // only FUSE requests necessary for loop-mapping are implemented
202 eprintln
!("Unimplemented FUSE request type encountered");
206 // FUSE connection closed
210 if let Err(err
) = res
{
211 // error during FUSE reply, cleanup and exit
219 // non-error FUSE exit
225 /// Clean up leftover files as well as FUSE instances without a loop device
226 /// connected. Best effort, never returns an error.
227 /// If filter_name is Some("..."), only this name will be cleaned up.
228 pub fn cleanup_unused_run_files(filter_name
: Option
<String
>) {
229 if let Ok(maps
) = find_all_mappings() {
230 for (name
, loopdev
) in maps
{
231 if loopdev
.is_none() &&
232 (filter_name
.is_none() || &name
== filter_name
.as_ref().unwrap())
234 let mut path
= PathBuf
::from(RUN_DIR
);
237 // clean leftover FUSE instances (e.g. user called 'losetup -d' or similar)
238 // does nothing if files are already stagnant (e.g. instance crashed etc...)
239 if let Ok(_
) = unmap_from_backing(&path
) {
240 // we have reaped some leftover instance, tell the user
242 "Cleaned up dangling mapping '{}': no loop device assigned",
247 // remove remnant files
248 // these we're not doing anything, so no need to inform the user
249 let _
= remove_file(&path
);
250 path
.set_extension("pid");
251 let _
= remove_file(&path
);
257 fn get_backing_file(loopdev
: &str) -> Result
<String
, Error
> {
258 let num
= loopdev
.split_at(9).1.parse
::<u8>().map_err(|err
|
259 format_err
!("malformed loopdev path, does not end with valid number - {}", err
))?
;
261 let block_path
= PathBuf
::from(format
!("/sys/devices/virtual/block/loop{}/loop/backing_file", num
));
262 let backing_file
= read_to_string(block_path
).map_err(|err
| {
263 if err
.kind() == std
::io
::ErrorKind
::NotFound
{
264 format_err
!("nothing mapped to {}", loopdev
)
266 format_err
!("error reading backing file - {}", err
)
270 let backing_file
= backing_file
.trim();
272 if !backing_file
.starts_with(RUN_DIR
) {
274 "loopdev {} is in use, but not by proxmox-backup-client (mapped to '{}')",
280 Ok(backing_file
.to_owned())
283 fn unmap_from_backing(backing_file
: &Path
) -> Result
<(), Error
> {
284 let mut pid_path
= PathBuf
::from(backing_file
);
285 pid_path
.set_extension("pid");
287 let pid_str
= read_to_string(&pid_path
).map_err(|err
|
288 format_err
!("error reading pidfile {:?}: {}", &pid_path
, err
))?
;
289 let pid
= pid_str
.parse
::<i32>().map_err(|err
|
290 format_err
!("malformed PID ({}) in pidfile - {}", pid_str
, err
))?
;
292 let pid
= Pid
::from_raw(pid
);
294 // send SIGINT to trigger cleanup and exit in target process
295 signal
::kill(pid
, Signal
::SIGINT
)?
;
297 // block until unmap is complete or timeout
298 let start
= time
::epoch_i64();
300 match signal
::kill(pid
, None
) {
302 // 10 second timeout, then assume failure
303 if (time
::epoch_i64() - start
) > 10 {
304 return Err(format_err
!("timed out waiting for PID '{}' to exit", &pid
));
306 std
::thread
::sleep(std
::time
::Duration
::from_millis(100));
308 Err(nix
::Error
::Sys(nix
::errno
::Errno
::ESRCH
)) => {
311 Err(e
) => return Err(e
.into()),
318 /// Returns an Iterator over a set of currently active mappings, i.e.
319 /// FuseLoopSession instances. Returns ("backing-file-name", Some("/dev/loopX"))
320 /// where .1 is None when a user has manually called 'losetup -d' or similar but
321 /// the FUSE instance is still running.
322 pub fn find_all_mappings() -> Result
<impl Iterator
<Item
= (String
, Option
<String
>)>, Error
> {
323 // get map of all /dev/loop mappings belonging to us
324 let mut loopmap
= HashMap
::new();
325 for ent
in fs
::scan_subdir(libc
::AT_FDCWD
, Path
::new("/dev/"), &LOOPDEV_REGEX
)?
{
328 let loopdev
= format
!("/dev/{}", ent
.file_name().to_string_lossy());
329 match get_backing_file(&loopdev
) {
331 // insert filename only, strip RUN_DIR/
332 loopmap
.insert(file
[RUN_DIR
.len()+1..].to_owned(), loopdev
);
341 Ok(fs
::read_subdir(libc
::AT_FDCWD
, Path
::new(RUN_DIR
))?
342 .filter_map(move |ent
| {
345 let file
= ent
.file_name().to_string_lossy();
346 if file
== "." || file
== ".." || file
.ends_with(".pid") {
349 let loopdev
= loopmap
.get(file
.as_ref()).map(String
::to_owned
);
350 Some((file
.into_owned(), loopdev
))
358 /// Try and unmap a running proxmox-backup-client instance from the given
359 /// /dev/loopN device
360 pub fn unmap_loopdev
<S
: AsRef
<str>>(loopdev
: S
) -> Result
<(), Error
> {
361 let loopdev
= loopdev
.as_ref();
362 if loopdev
.len() < 10 || !loopdev
.starts_with("/dev/loop") {
363 bail
!("malformed loopdev path, must be in format '/dev/loopX'");
366 let backing_file
= get_backing_file(loopdev
)?
;
367 unmap_from_backing(Path
::new(&backing_file
))
370 /// Try and unmap a running proxmox-backup-client instance from the given name
371 pub fn unmap_name
<S
: AsRef
<str>>(name
: S
) -> Result
<(), Error
> {
372 for (mapping
, _
) in find_all_mappings()?
{
373 if mapping
.ends_with(name
.as_ref()) {
374 let mut path
= PathBuf
::from(RUN_DIR
);
376 return unmap_from_backing(&path
);
379 Err(format_err
!("no mapping for name '{}' found", name
.as_ref()))
382 fn minimal_stat(size
: i64) -> libc
::stat
{
383 let mut stat
: libc
::stat
= unsafe { std::mem::zeroed() }
;
384 stat
.st_mode
= libc
::S_IFREG
;