.insert("status", status_cmd_def)
.insert("key", key::cli())
.insert("mount", mount_cmd_def())
+ .insert("map", map_cmd_def())
+ .insert("unmap", unmap_cmd_def())
.insert("catalog", catalog_mgmt_cli())
.insert("task", task_mgmt_cli())
.insert("version", version_cmd_def)
use std::os::unix::io::RawFd;
use std::path::Path;
use std::ffi::OsStr;
+use std::collections::HashMap;
use anyhow::{bail, format_err, Error};
use serde_json::Value;
use nix::unistd::{fork, ForkResult, pipe};
use futures::select;
use futures::future::FutureExt;
+use futures::stream::{StreamExt, TryStreamExt};
use proxmox::{sortable, identity};
use proxmox::api::{ApiHandler, ApiMethod, RpcEnvironment, schema::*, cli::*};
BackupDir,
BackupGroup,
BufferedDynamicReader,
+ AsyncIndexReader,
};
use proxmox_backup::client::*;
("target", false, &StringSchema::new("Target directory path.").schema()),
("repository", true, &REPO_URL_SCHEMA),
("keyfile", true, &StringSchema::new("Path to encryption key.").schema()),
- ("verbose", true, &BooleanSchema::new("Verbose output.").default(false).schema()),
+ ("verbose", true, &BooleanSchema::new("Verbose output and stay in foreground.").default(false).schema()),
+ ]),
+ )
+);
+
+#[sortable]
+const API_METHOD_MAP: ApiMethod = ApiMethod::new(
+ &ApiHandler::Sync(&mount),
+ &ObjectSchema::new(
+ "Map a drive image from a VM backup to a local loopback device. Use 'unmap' to undo.
+WARNING: Only do this with *trusted* backups!",
+ &sorted!([
+ ("snapshot", false, &StringSchema::new("Group/Snapshot path.").schema()),
+ ("archive-name", false, &StringSchema::new("Backup archive name.").schema()),
+ ("repository", true, &REPO_URL_SCHEMA),
+ ("keyfile", true, &StringSchema::new("Path to encryption key.").schema()),
+ ("verbose", true, &BooleanSchema::new("Verbose output and stay in foreground.").default(false).schema()),
+ ]),
+ )
+);
+
+#[sortable]
+const API_METHOD_UNMAP: ApiMethod = ApiMethod::new(
+ &ApiHandler::Sync(&unmap),
+ &ObjectSchema::new(
+ "Unmap a loop device mapped with 'map' and release all resources.",
+ &sorted!([
+ ("loopdev", false, &StringSchema::new("Path to loopdev (/dev/loopX) or loop device number.").schema()),
]),
)
);
.completion_cb("target", tools::complete_file_name)
}
+pub fn map_cmd_def() -> CliCommand {
+
+ CliCommand::new(&API_METHOD_MAP)
+ .arg_param(&["snapshot", "archive-name"])
+ .completion_cb("repository", complete_repository)
+ .completion_cb("snapshot", complete_group_or_snapshot)
+ .completion_cb("archive-name", complete_pxar_archive_name)
+}
+
+pub fn unmap_cmd_def() -> CliCommand {
+
+ CliCommand::new(&API_METHOD_UNMAP)
+ .arg_param(&["loopdev"])
+ .completion_cb("loopdev", tools::complete_file_name)
+}
+
fn mount(
param: Value,
_info: &ApiMethod,
async fn mount_do(param: Value, pipe: Option<RawFd>) -> Result<Value, Error> {
let repo = extract_repository_from_value(¶m)?;
let archive_name = tools::required_string_param(¶m, "archive-name")?;
- let target = tools::required_string_param(¶m, "target")?;
let client = connect(repo.host(), repo.port(), repo.user())?;
+ let target = param["target"].as_str();
+
record_repository(&repo);
let path = tools::required_string_param(¶m, "snapshot")?;
};
let server_archive_name = if archive_name.ends_with(".pxar") {
+ if let None = target {
+ bail!("use the 'mount' command to mount pxar archives");
+ }
format!("{}.didx", archive_name)
+ } else if archive_name.ends_with(".img") {
+ if let Some(_) = target {
+ bail!("use the 'map' command to map drive images");
+ }
+ format!("{}.fidx", archive_name)
} else {
- bail!("Can only mount pxar archives.");
+ bail!("Can only mount/map pxar archives and drive images.");
};
let client = BackupReader::start(
let file_info = manifest.lookup_file_info(&server_archive_name)?;
- if server_archive_name.ends_with(".didx") {
- let index = client.download_dynamic_index(&manifest, &server_archive_name).await?;
- let most_used = index.find_most_used_chunks(8);
- let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), most_used);
- let reader = BufferedDynamicReader::new(index, chunk_reader);
- let archive_size = reader.archive_size();
- let reader: proxmox_backup::pxar::fuse::Reader =
- Arc::new(BufferedDynamicReadAt::new(reader));
- let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
- let options = OsStr::new("ro,default_permissions");
-
- let session = proxmox_backup::pxar::fuse::Session::mount(
- decoder,
- &options,
- false,
- Path::new(target),
- )
- .map_err(|err| format_err!("pxar mount failed: {}", err))?;
-
+ let daemonize = || -> Result<(), Error> {
if let Some(pipe) = pipe {
nix::unistd::chdir(Path::new("/")).unwrap();
// Finish creation of daemon by redirecting filedescriptors.
nix::unistd::close(pipe).unwrap();
}
- // handle SIGINT and SIGTERM
- let mut interrupt_int = signal(SignalKind::interrupt())?;
- let mut interrupt_term = signal(SignalKind::terminate())?;
- let mut interrupt = futures::future::select(interrupt_int.next(), interrupt_term.next());
+ Ok(())
+ };
+
+ let options = OsStr::new("ro,default_permissions");
+
+ // handle SIGINT and SIGTERM
+ let mut interrupt_int = signal(SignalKind::interrupt())?;
+ let mut interrupt_term = signal(SignalKind::terminate())?;
+ let mut interrupt = futures::future::select(interrupt_int.next(), interrupt_term.next());
+
+ if server_archive_name.ends_with(".didx") {
+ let index = client.download_dynamic_index(&manifest, &server_archive_name).await?;
+ let most_used = index.find_most_used_chunks(8);
+ let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), most_used);
+ let reader = BufferedDynamicReader::new(index, chunk_reader);
+ let archive_size = reader.archive_size();
+ let reader: proxmox_backup::pxar::fuse::Reader =
+ Arc::new(BufferedDynamicReadAt::new(reader));
+ let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
+
+ let session = proxmox_backup::pxar::fuse::Session::mount(
+ decoder,
+ &options,
+ false,
+ Path::new(target.unwrap()),
+ )
+ .map_err(|err| format_err!("pxar mount failed: {}", err))?;
+
+ daemonize()?;
select! {
res = session.fuse() => res?,
// exit on interrupted
}
}
+ } else if server_archive_name.ends_with(".fidx") {
+ let index = client.download_fixed_index(&manifest, &server_archive_name).await?;
+ let size = index.index_bytes();
+ let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), HashMap::new());
+ let reader = AsyncIndexReader::new(index, chunk_reader);
+
+ let mut session = tools::fuse_loop::FuseLoopSession::map_loop(size, reader, options).await?;
+ let loopdev = session.loopdev_path.clone();
+
+ let (st_send, st_recv) = futures::channel::mpsc::channel(1);
+ let (mut abort_send, abort_recv) = futures::channel::mpsc::channel(1);
+ let mut st_recv = st_recv.fuse();
+ let mut session_fut = session.main(st_send, abort_recv).boxed().fuse();
+
+ // poll until loop file is mapped (or errors)
+ select! {
+ res = session_fut => {
+ bail!("FUSE session unexpectedly ended before loop file mapping");
+ },
+ res = st_recv.try_next() => {
+ if let Err(err) = res {
+ // init went wrong, abort now
+ abort_send.try_send(()).map_err(|err|
+ format_err!("error while sending abort signal - {}", err))?;
+ // ignore and keep original error cause
+ let _ = session_fut.await;
+ return Err(err);
+ }
+ }
+ }
+
+ // daemonize only now to be able to print mapped loopdev or startup errors
+ println!("Image mapped as {}", loopdev);
+ daemonize()?;
+
+ // continue polling until complete or interrupted (which also happens on unmap)
+ select! {
+ res = session_fut => res?,
+ _ = interrupt => {
+ // exit on interrupted
+ abort_send.try_send(()).map_err(|err|
+ format_err!("error while sending abort signal - {}", err))?;
+ session_fut.await?;
+ }
+ }
+
+ println!("Image unmapped");
} else {
- bail!("unknown archive file extension (expected .pxar)");
+ bail!("unknown archive file extension (expected .pxar or .img)");
}
Ok(Value::Null)
}
+
+fn unmap(
+ param: Value,
+ _info: &ApiMethod,
+ _rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Value, Error> {
+
+ let mut path = tools::required_string_param(¶m, "loopdev")?.to_owned();
+
+ if let Ok(num) = path.parse::<u8>() {
+ path = format!("/dev/loop{}", num);
+ }
+
+ tools::fuse_loop::unmap(path)?;
+
+ Ok(Value::Null)
+}
pub mod systemd;
pub mod nom;
pub mod logrotate;
+pub mod loopdev;
+pub mod fuse_loop;
mod parallel_handler;
pub use parallel_handler::*;
--- /dev/null
+use anyhow::{Error, format_err, bail};
+use std::ffi::OsStr;
+use std::path::{Path, PathBuf};
+use std::fs::{File, remove_file, read_to_string};
+use std::io::SeekFrom;
+use std::io::prelude::*;
+
+use nix::unistd::{Pid, mkstemp};
+use nix::sys::signal::{self, Signal};
+
+use tokio::io::{AsyncRead, AsyncSeek, AsyncReadExt, AsyncSeekExt};
+use futures::stream::{StreamExt, TryStreamExt};
+use futures::channel::mpsc::{Sender, Receiver};
+
+use proxmox::try_block;
+use proxmox_fuse::{*, requests::FuseRequest};
+use super::loopdev;
+
+const RUN_DIR: &'static str = "/run/pbs-loopdev";
+
+pub struct FuseLoopSession<R: AsyncRead + AsyncSeek + Unpin> {
+ session: Option<Fuse>,
+ stat: libc::stat,
+ reader: R,
+ fuse_path: String,
+ pid_path: String,
+ pub loopdev_path: String,
+}
+
+impl<R: AsyncRead + AsyncSeek + Unpin> FuseLoopSession<R> {
+
+ /// Prepare for mapping the given reader as a block device node at
+ /// /dev/loopN. Creates a temporary file for FUSE and a PID file for unmap.
+ pub async fn map_loop(size: u64, mut reader: R, options: &OsStr)
+ -> Result<Self, Error>
+ {
+ // attempt a single read to check if the reader is configured correctly
+ let _ = reader.read_u8().await?;
+
+ std::fs::create_dir_all(RUN_DIR)?;
+ let mut base_path = PathBuf::from(RUN_DIR);
+ base_path.push("XXXXXX"); // template for mkstemp
+ let (_, path) = mkstemp(&base_path)?;
+ let mut pid_path = path.clone();
+ pid_path.set_extension("pid");
+
+ let res: Result<(Fuse, String), Error> = try_block!{
+ let session = Fuse::builder("pbs-block-dev")?
+ .options_os(options)?
+ .enable_read()
+ .build()?
+ .mount(&path)?;
+
+ let loopdev_path = loopdev::get_or_create_free_dev().map_err(|err| {
+ format_err!("loop-control GET_FREE failed - {}", err)
+ })?;
+
+ // write pidfile so unmap can later send us a signal to exit
+ Self::write_pidfile(&pid_path)?;
+
+ Ok((session, loopdev_path))
+ };
+
+ match res {
+ Ok((session, loopdev_path)) =>
+ Ok(Self {
+ session: Some(session),
+ reader,
+ stat: minimal_stat(size as i64),
+ fuse_path: path.to_string_lossy().into_owned(),
+ pid_path: pid_path.to_string_lossy().into_owned(),
+ loopdev_path,
+ }),
+ Err(e) => {
+ // best-effort temp file cleanup in case of error
+ let _ = remove_file(&path);
+ let _ = remove_file(&pid_path);
+ Err(e)
+ }
+ }
+ }
+
+ fn write_pidfile(path: &Path) -> Result<(), Error> {
+ let pid = unsafe { libc::getpid() };
+ let mut file = File::create(path)?;
+ write!(file, "{}", pid)?;
+ Ok(())
+ }
+
+ /// Runs the FUSE request loop and assigns the loop device. Will send a
+ /// message on startup_chan once the loop device is assigned (or assignment
+ /// fails). Send a message on abort_chan to trigger cleanup and exit FUSE.
+ /// An error on loopdev assignment does *not* automatically close the FUSE
+ /// handle or do cleanup, trigger abort_chan manually in case startup fails.
+ pub async fn main(
+ &mut self,
+ mut startup_chan: Sender<Result<(), Error>>,
+ abort_chan: Receiver<()>,
+ ) -> Result<(), Error> {
+
+ if let None = self.session {
+ panic!("internal error: fuse_loop::main called before ::map_loop");
+ }
+ let mut session = self.session.take().unwrap().fuse();
+ let mut abort_chan = abort_chan.fuse();
+
+ let (loopdev_path, fuse_path) = (self.loopdev_path.clone(), self.fuse_path.clone());
+ tokio::task::spawn_blocking(move || {
+ if let Err(err) = loopdev::assign(loopdev_path, fuse_path) {
+ let _ = startup_chan.try_send(Err(format_err!("error while assigning loop device - {}", err)));
+ } else {
+ // device is assigned successfully, which means not only is the
+ // loopdev ready, but FUSE is also okay, since the assignment
+ // would have failed otherwise
+ let _ = startup_chan.try_send(Ok(()));
+ }
+ });
+
+ let (loopdev_path, fuse_path, pid_path) =
+ (self.loopdev_path.clone(), self.fuse_path.clone(), self.pid_path.clone());
+ let cleanup = |session: futures::stream::Fuse<Fuse>| {
+ // only warn for errors on cleanup, if these fail nothing is lost
+ if let Err(err) = loopdev::unassign(&loopdev_path) {
+ eprintln!(
+ "cleanup: warning: could not unassign file {} from loop device {} - {}",
+ &fuse_path,
+ &loopdev_path,
+ err,
+ );
+ }
+
+ // force close FUSE handle before attempting to remove backing file
+ std::mem::drop(session);
+
+ if let Err(err) = remove_file(&fuse_path) {
+ eprintln!(
+ "cleanup: warning: could not remove temporary file {} - {}",
+ &fuse_path,
+ err,
+ );
+ }
+ if let Err(err) = remove_file(&pid_path) {
+ eprintln!(
+ "cleanup: warning: could not remove PID file {} - {}",
+ &pid_path,
+ err,
+ );
+ }
+ };
+
+ loop {
+ tokio::select!{
+ _ = abort_chan.next() => {
+ // aborted, do cleanup and exit
+ break;
+ },
+ req = session.try_next() => {
+ let res = match req? {
+ Some(Request::Lookup(req)) => {
+ let stat = self.stat;
+ let entry = EntryParam::simple(stat.st_ino, stat);
+ req.reply(&entry)
+ },
+ Some(Request::Getattr(req)) => {
+ req.reply(&self.stat, std::f64::MAX)
+ },
+ Some(Request::Read(req)) => {
+ match self.reader.seek(SeekFrom::Start(req.offset)).await {
+ Ok(_) => {
+ let mut buf = vec![0u8; req.size];
+ match self.reader.read_exact(&mut buf).await {
+ Ok(_) => {
+ req.reply(&buf)
+ },
+ Err(e) => {
+ req.io_fail(e)
+ }
+ }
+ },
+ Err(e) => {
+ req.io_fail(e)
+ }
+ }
+ },
+ Some(_) => {
+ // only FUSE requests necessary for loop-mapping are implemented
+ eprintln!("Unimplemented FUSE request type encountered");
+ Ok(())
+ },
+ None => {
+ // FUSE connection closed
+ break;
+ }
+ };
+ if let Err(err) = res {
+ // error during FUSE reply, cleanup and exit
+ cleanup(session);
+ bail!(err);
+ }
+ }
+ }
+ }
+
+ // non-error FUSE exit
+ cleanup(session);
+ Ok(())
+ }
+}
+
+/// Try and unmap a running proxmox-backup-client instance from the given
+/// /dev/loopN device
+pub fn unmap(loopdev: String) -> Result<(), Error> {
+ if loopdev.len() < 10 || !loopdev.starts_with("/dev/loop") {
+ bail!("malformed loopdev path, must be in format '/dev/loopX'");
+ }
+ let num = loopdev.split_at(9).1.parse::<u8>().map_err(|err|
+ format_err!("malformed loopdev path, does not end with valid number - {}", err))?;
+
+ let block_path = PathBuf::from(format!("/sys/devices/virtual/block/loop{}/loop/backing_file", num));
+ let backing_file = read_to_string(block_path).map_err(|err| {
+ if err.kind() == std::io::ErrorKind::NotFound {
+ format_err!("nothing mapped to {}", loopdev)
+ } else {
+ format_err!("error reading backing file - {}", err)
+ }
+ })?;
+
+ let backing_file = backing_file.trim();
+ if !backing_file.starts_with(RUN_DIR) {
+ bail!(
+ "loopdev {} is in use, but not by proxmox-backup-client (mapped to '{}')",
+ loopdev,
+ backing_file,
+ );
+ }
+
+ let mut pid_path = PathBuf::from(backing_file);
+ pid_path.set_extension("pid");
+
+ let pid_str = read_to_string(&pid_path).map_err(|err|
+ format_err!("error reading pidfile {:?}: {}", &pid_path, err))?;
+ let pid = pid_str.parse::<i32>().map_err(|err|
+ format_err!("malformed PID ({}) in pidfile - {}", pid_str, err))?;
+
+ // send SIGINT to trigger cleanup and exit in target process
+ signal::kill(Pid::from_raw(pid), Signal::SIGINT)?;
+
+ Ok(())
+}
+
+fn minimal_stat(size: i64) -> libc::stat {
+ let mut stat: libc::stat = unsafe { std::mem::zeroed() };
+ stat.st_mode = libc::S_IFREG;
+ stat.st_ino = 1;
+ stat.st_nlink = 1;
+ stat.st_size = size;
+ stat
+}
--- /dev/null
+use anyhow::Error;
+use std::fs::{File, OpenOptions};
+use std::path::Path;
+use std::os::unix::io::{RawFd, AsRawFd};
+
+const LOOP_CONTROL: &str = "/dev/loop-control";
+const LOOP_NAME: &str = "/dev/loop";
+
+/// Implements a subset of loop device ioctls necessary to assign and release
+/// a single file from a free loopdev.
+mod loop_ioctl {
+ use nix::{ioctl_none, ioctl_write_int_bad, ioctl_write_ptr_bad};
+
+ const LOOP_IOCTL: u16 = 0x4C; // 'L'
+ const LOOP_SET_FD: u16 = 0x00;
+ const LOOP_CLR_FD: u16 = 0x01;
+ const LOOP_SET_STATUS64: u16 = 0x04;
+
+ const LOOP_CTRL_GET_FREE: u16 = 0x82;
+
+ ioctl_write_int_bad!(ioctl_set_fd, (LOOP_IOCTL << 8) | LOOP_SET_FD);
+ ioctl_none!(ioctl_clr_fd, LOOP_IOCTL, LOOP_CLR_FD);
+ ioctl_none!(ioctl_ctrl_get_free, LOOP_IOCTL, LOOP_CTRL_GET_FREE);
+ ioctl_write_ptr_bad!(ioctl_set_status64, (LOOP_IOCTL << 8) | LOOP_SET_STATUS64, LoopInfo64);
+
+ pub const LO_FLAGS_READ_ONLY: u32 = 1;
+ pub const LO_FLAGS_PARTSCAN: u32 = 8;
+
+ const LO_NAME_SIZE: usize = 64;
+ const LO_KEY_SIZE: usize = 32;
+
+ #[repr(C)]
+ pub struct LoopInfo64 {
+ pub lo_device: u64,
+ pub lo_inode: u64,
+ pub lo_rdevice: u64,
+ pub lo_offset: u64,
+ pub lo_sizelimit: u64,
+ pub lo_number: u32,
+ pub lo_encrypt_type: u32,
+ pub lo_encrypt_key_size: u32,
+ pub lo_flags: u32,
+ pub lo_file_name: [u8; LO_NAME_SIZE],
+ pub lo_crypt_name: [u8; LO_NAME_SIZE],
+ pub lo_encrypt_key: [u8; LO_KEY_SIZE],
+ pub lo_init: [u64; 2],
+ }
+}
+
+// ioctl helpers create public fns, do not export them outside the module
+// users should use the wrapper functions below
+use loop_ioctl::*;
+
+/// Use the GET_FREE ioctl to get or add a free loop device, of which the
+/// /dev/loopN path will be returned. This is inherently racy because of the
+/// delay between this and calling assign, but since assigning is atomic it
+/// does not matter much and will simply cause assign to fail.
+pub fn get_or_create_free_dev() -> Result<String, Error> {
+ let ctrl_file = File::open(LOOP_CONTROL)?;
+ let free_num = unsafe { ioctl_ctrl_get_free(ctrl_file.as_raw_fd())? };
+ let loop_file_path = format!("{}{}", LOOP_NAME, free_num);
+ Ok(loop_file_path)
+}
+
+fn assign_dev(fd: RawFd, backing_fd: RawFd) -> Result<(), Error> {
+ unsafe { ioctl_set_fd(fd, backing_fd)?; }
+
+ // set required read-only flag and partscan for convenience
+ let mut info: LoopInfo64 = unsafe { std::mem::zeroed() };
+ info.lo_flags = LO_FLAGS_READ_ONLY | LO_FLAGS_PARTSCAN;
+ unsafe { ioctl_set_status64(fd, &info)?; }
+
+ Ok(())
+}
+
+/// Open the next available /dev/loopN file and assign the given path to
+/// it as it's backing file in read-only mode.
+pub fn assign<P: AsRef<Path>>(loop_dev: P, backing: P) -> Result<(), Error> {
+ let loop_file = File::open(loop_dev)?;
+ let backing_file = OpenOptions::new()
+ .read(true)
+ .open(backing)?;
+ assign_dev(loop_file.as_raw_fd(), backing_file.as_raw_fd())?;
+ Ok(())
+}
+
+/// Unassign any file descriptors currently attached to the given
+/// /dev/loopN device.
+pub fn unassign<P: AsRef<Path>>(path: P) -> Result<(), Error> {
+ let loop_file = File::open(path)?;
+ unsafe { ioctl_clr_fd(loop_file.as_raw_fd())?; }
+ Ok(())
+}