1 //! Map a raw data reader as a loop device via FUSE
3 use anyhow
::{Error, format_err, bail}
;
5 use std
::path
::{Path, PathBuf}
;
6 use std
::fs
::{File, remove_file, read_to_string, OpenOptions}
;
8 use std
::io
::prelude
::*;
9 use std
::collections
::HashMap
;
12 use nix
::sys
::signal
::{self, Signal}
;
14 use tokio
::io
::{AsyncRead, AsyncSeek, AsyncReadExt, AsyncSeekExt}
;
15 use futures
::stream
::{StreamExt, TryStreamExt}
;
16 use futures
::channel
::mpsc
::{Sender, Receiver}
;
18 use proxmox
::{try_block, const_regex}
;
19 use proxmox_fuse
::{*, requests::FuseRequest}
;
23 const RUN_DIR
: &'
static str = "/run/pbs-loopdev";
26 pub LOOPDEV_REGEX
= r
"^loop\d+$";
29 /// Represents an ongoing FUSE-session that has been mapped onto a loop device.
30 /// Create with map_loop, then call 'main' and poll until startup_chan reports
31 /// success. Then, daemonize or otherwise finish setup, and continue polling
32 /// main's future until completion.
33 pub struct FuseLoopSession
<R
: AsyncRead
+ AsyncSeek
+ Unpin
> {
34 session
: Option
<Fuse
>,
39 pub loopdev_path
: String
,
42 impl<R
: AsyncRead
+ AsyncSeek
+ Unpin
> FuseLoopSession
<R
> {
44 /// Prepare for mapping the given reader as a block device node at
45 /// /dev/loopN. Creates a temporary file for FUSE and a PID file for unmap.
46 pub async
fn map_loop
<P
: AsRef
<str>>(size
: u64, mut reader
: R
, name
: P
, options
: &OsStr
)
47 -> Result
<Self, Error
>
49 // attempt a single read to check if the reader is configured correctly
50 let _
= reader
.read_u8().await?
;
52 std
::fs
::create_dir_all(RUN_DIR
)?
;
53 let mut path
= PathBuf
::from(RUN_DIR
);
54 path
.push(name
.as_ref());
55 let mut pid_path
= path
.clone();
56 pid_path
.set_extension("pid");
58 match OpenOptions
::new().write(true).create_new(true).open(&path
) {
59 Ok(_
) => { /* file created, continue on */ }
,
61 if e
.kind() == std
::io
::ErrorKind
::AlreadyExists
{
62 bail
!("the given archive is already mapped, cannot map twice");
64 bail
!("error while creating backing file ({:?}) - {}", &path
, e
);
69 let res
: Result
<(Fuse
, String
), Error
> = try_block
!{
70 let session
= Fuse
::builder("pbs-block-dev")?
76 let loopdev_path
= loopdev
::get_or_create_free_dev().map_err(|err
| {
77 format_err
!("loop-control GET_FREE failed - {}", err
)
80 // write pidfile so unmap can later send us a signal to exit
81 Self::write_pidfile(&pid_path
)?
;
83 Ok((session
, loopdev_path
))
87 Ok((session
, loopdev_path
)) =>
89 session
: Some(session
),
91 stat
: minimal_stat(size
as i64),
92 fuse_path
: path
.to_string_lossy().into_owned(),
93 pid_path
: pid_path
.to_string_lossy().into_owned(),
97 // best-effort temp file cleanup in case of error
98 let _
= remove_file(&path
);
99 let _
= remove_file(&pid_path
);
105 fn write_pidfile(path
: &Path
) -> Result
<(), Error
> {
106 let pid
= unsafe { libc::getpid() }
;
107 let mut file
= File
::create(path
)?
;
108 write
!(file
, "{}", pid
)?
;
112 /// Runs the FUSE request loop and assigns the loop device. Will send a
113 /// message on startup_chan once the loop device is assigned (or assignment
114 /// fails). Send a message on abort_chan to trigger cleanup and exit FUSE.
115 /// An error on loopdev assignment does *not* automatically close the FUSE
116 /// handle or do cleanup, trigger abort_chan manually in case startup fails.
119 mut startup_chan
: Sender
<Result
<(), Error
>>,
120 abort_chan
: Receiver
<()>,
121 ) -> Result
<(), Error
> {
123 if let None
= self.session
{
124 panic
!("internal error: fuse_loop::main called before ::map_loop");
126 let mut session
= self.session
.take().unwrap().fuse();
127 let mut abort_chan
= abort_chan
.fuse();
129 let (loopdev_path
, fuse_path
) = (self.loopdev_path
.clone(), self.fuse_path
.clone());
130 tokio
::task
::spawn_blocking(move || {
131 if let Err(err
) = loopdev
::assign(loopdev_path
, fuse_path
) {
132 let _
= startup_chan
.try_send(Err(format_err
!("error while assigning loop device - {}", err
)));
134 // device is assigned successfully, which means not only is the
135 // loopdev ready, but FUSE is also okay, since the assignment
136 // would have failed otherwise
137 let _
= startup_chan
.try_send(Ok(()));
141 let (loopdev_path
, fuse_path
, pid_path
) =
142 (self.loopdev_path
.clone(), self.fuse_path
.clone(), self.pid_path
.clone());
143 let cleanup
= |session
: futures
::stream
::Fuse
<Fuse
>| {
144 // only warn for errors on cleanup, if these fail nothing is lost
145 if let Err(err
) = loopdev
::unassign(&loopdev_path
) {
147 "cleanup: warning: could not unassign file {} from loop device {} - {}",
154 // force close FUSE handle before attempting to remove backing file
155 std
::mem
::drop(session
);
157 if let Err(err
) = remove_file(&fuse_path
) {
159 "cleanup: warning: could not remove temporary file {} - {}",
164 if let Err(err
) = remove_file(&pid_path
) {
166 "cleanup: warning: could not remove PID file {} - {}",
175 _
= abort_chan
.next() => {
176 // aborted, do cleanup and exit
179 req
= session
.try_next() => {
180 let res
= match req?
{
181 Some(Request
::Lookup(req
)) => {
182 let stat
= self.stat
;
183 let entry
= EntryParam
::simple(stat
.st_ino
, stat
);
186 Some(Request
::Getattr(req
)) => {
187 req
.reply(&self.stat
, std
::f64::MAX
)
189 Some(Request
::Read(req
)) => {
190 match self.reader
.seek(SeekFrom
::Start(req
.offset
)).await
{
192 let mut buf
= vec
![0u8; req
.size
];
193 match self.reader
.read_exact(&mut buf
).await
{
208 // only FUSE requests necessary for loop-mapping are implemented
209 eprintln
!("Unimplemented FUSE request type encountered");
213 // FUSE connection closed
217 if let Err(err
) = res
{
218 // error during FUSE reply, cleanup and exit
226 // non-error FUSE exit
232 fn get_backing_file(loopdev
: &str) -> Result
<String
, Error
> {
233 let num
= loopdev
.split_at(9).1.parse
::<u8>().map_err(|err
|
234 format_err
!("malformed loopdev path, does not end with valid number - {}", err
))?
;
236 let block_path
= PathBuf
::from(format
!("/sys/devices/virtual/block/loop{}/loop/backing_file", num
));
237 let backing_file
= read_to_string(block_path
).map_err(|err
| {
238 if err
.kind() == std
::io
::ErrorKind
::NotFound
{
239 format_err
!("nothing mapped to {}", loopdev
)
241 format_err
!("error reading backing file - {}", err
)
245 let backing_file
= backing_file
.trim();
247 if !backing_file
.starts_with(RUN_DIR
) {
249 "loopdev {} is in use, but not by proxmox-backup-client (mapped to '{}')",
255 Ok(backing_file
.to_owned())
258 fn unmap_from_backing(backing_file
: &Path
) -> Result
<(), Error
> {
259 let mut pid_path
= PathBuf
::from(backing_file
);
260 pid_path
.set_extension("pid");
262 let pid_str
= read_to_string(&pid_path
).map_err(|err
|
263 format_err
!("error reading pidfile {:?}: {}", &pid_path
, err
))?
;
264 let pid
= pid_str
.parse
::<i32>().map_err(|err
|
265 format_err
!("malformed PID ({}) in pidfile - {}", pid_str
, err
))?
;
267 // send SIGINT to trigger cleanup and exit in target process
268 signal
::kill(Pid
::from_raw(pid
), Signal
::SIGINT
)?
;
273 /// Returns an Iterator over a set of currently active mappings, i.e.
274 /// FuseLoopSession instances. Returns ("backing-file-name", Some("/dev/loopX"))
275 /// where .1 is None when a user has manually called 'losetup -d' or similar but
276 /// the FUSE instance is still running.
277 pub fn find_all_mappings() -> Result
<impl Iterator
<Item
= (String
, Option
<String
>)>, Error
> {
278 // get map of all /dev/loop mappings belonging to us
279 let mut loopmap
= HashMap
::new();
280 for ent
in fs
::scan_subdir(libc
::AT_FDCWD
, Path
::new("/dev/"), &LOOPDEV_REGEX
)?
{
283 let loopdev
= format
!("/dev/{}", ent
.file_name().to_string_lossy());
284 match get_backing_file(&loopdev
) {
286 // insert filename only, strip RUN_DIR/
287 loopmap
.insert(file
[RUN_DIR
.len()+1..].to_owned(), loopdev
);
296 Ok(fs
::read_subdir(libc
::AT_FDCWD
, Path
::new(RUN_DIR
))?
297 .filter_map(move |ent
| {
300 let file
= ent
.file_name().to_string_lossy();
301 if file
== "." || file
== ".." || file
.ends_with(".pid") {
304 let loopdev
= loopmap
.get(file
.as_ref()).map(String
::to_owned
);
305 Some((file
.into_owned(), loopdev
))
313 /// Try and unmap a running proxmox-backup-client instance from the given
314 /// /dev/loopN device
315 pub fn unmap_loopdev
<S
: AsRef
<str>>(loopdev
: S
) -> Result
<(), Error
> {
316 let loopdev
= loopdev
.as_ref();
317 if loopdev
.len() < 10 || !loopdev
.starts_with("/dev/loop") {
318 bail
!("malformed loopdev path, must be in format '/dev/loopX'");
321 let backing_file
= get_backing_file(loopdev
)?
;
322 unmap_from_backing(Path
::new(&backing_file
))
325 /// Try and unmap a running proxmox-backup-client instance from the given name
326 pub fn unmap_name
<S
: AsRef
<str>>(name
: S
) -> Result
<(), Error
> {
327 for (mapping
, _
) in find_all_mappings()?
{
328 if mapping
.ends_with(name
.as_ref()) {
329 let mut path
= PathBuf
::from(RUN_DIR
);
331 return unmap_from_backing(&path
);
334 Err(format_err
!("no mapping for name '{}' found", name
.as_ref()))
337 fn minimal_stat(size
: i64) -> libc
::stat
{
338 let mut stat
: libc
::stat
= unsafe { std::mem::zeroed() }
;
339 stat
.st_mode
= libc
::S_IFREG
;