"proxmox-backup-banner",
"proxmox-backup-client",
"proxmox-backup-debug",
+ "proxmox-file-restore",
"pxar-bin",
]
walkdir = "2"
webauthn-rs = "0.2.5"
xdg = "2.2"
-zstd = { version = "0.6", features = [ "bindgen" ] }
nom = "5.1"
crossbeam-channel = "0.5"
proxmox-backup-banner \
proxmox-backup-client \
proxmox-backup-debug \
+ proxmox-file-restore \
pxar-bin
ifeq ($(BUILD_MODE), release)
--bin proxmox-backup-client \
--package proxmox-backup-debug \
--bin proxmox-backup-debug \
+ --package proxmox-file-restore \
+ --bin proxmox-file-restore \
--package pxar-bin \
--bin pxar \
--package proxmox-backup \
pub const CONFIGDIR: &str = "/etc/proxmox-backup";
pub const JS_DIR: &str = "/usr/share/javascript/proxmox-backup";
+/// Unix system user used by proxmox-backup-proxy
+pub const BACKUP_USER_NAME: &str = "backup";
+/// Unix system group used by proxmox-backup-proxy
+pub const BACKUP_GROUP_NAME: &str = "backup";
+
#[macro_export]
macro_rules! PROXMOX_BACKUP_RUN_DIR_M { () => ("/run/proxmox-backup") }
tokio = { version = "1.6", features = [ "fs", "io-util", "rt", "rt-multi-thread", "sync" ] }
url = "2.1"
walkdir = "2"
+zstd = { version = "0.6", features = [ "bindgen" ] }
proxmox = { version = "0.13.0", default-features = false, features = [ "tokio" ] }
--- /dev/null
+//! Raw file descriptor related utilities.
+
+use std::os::unix::io::RawFd;
+
+use anyhow::Error;
+use nix::fcntl::{fcntl, FdFlag, F_GETFD, F_SETFD};
+
+/// Change the `O_CLOEXEC` flag of an existing file descriptor.
+pub fn fd_change_cloexec(fd: RawFd, on: bool) -> Result<(), Error> {
+ let mut flags = unsafe { FdFlag::from_bits_unchecked(fcntl(fd, F_GETFD)?) };
+ flags.set(FdFlag::FD_CLOEXEC, on);
+ fcntl(fd, F_SETFD(flags))?;
+ Ok(())
+}
pub mod cli;
pub mod compression;
pub mod format;
+pub mod fd;
pub mod fs;
pub mod io;
pub mod json;
+pub mod logrotate;
pub mod lru_cache;
pub mod nom;
pub mod ops;
pub mod str;
pub mod stream;
pub mod sync;
+pub mod sys;
pub mod ticket;
pub mod tokio;
pub mod xattr;
--- /dev/null
+use std::path::{Path, PathBuf};
+use std::fs::{File, rename};
+use std::os::unix::io::{FromRawFd, IntoRawFd};
+use std::io::Read;
+
+use anyhow::{bail, format_err, Error};
+use nix::unistd;
+
+use proxmox::tools::fs::{CreateOptions, make_tmp_file};
+
+/// Used for rotating log files and iterating over them
+pub struct LogRotate {
+ base_path: PathBuf,
+ compress: bool,
+
+ /// User logs should be reowned to.
+ owner: Option<String>,
+}
+
+impl LogRotate {
+ /// Creates a new instance if the path given is a valid file name (iow. does not end with ..)
+ /// 'compress' decides if compresses files will be created on rotation, and if it will search
+ /// '.zst' files when iterating
+ ///
+ /// By default, newly created files will be owned by the backup user. See [`new_with_user`] for
+ /// a way to opt out of this behavior.
+ pub fn new<P: AsRef<Path>>(
+ path: P,
+ compress: bool,
+ ) -> Option<Self> {
+ Self::new_with_user(path, compress, Some(pbs_buildcfg::BACKUP_USER_NAME.to_owned()))
+ }
+
+ /// See [`new`]. Additionally this also takes a user which should by default be used to reown
+ /// new files to.
+ pub fn new_with_user<P: AsRef<Path>>(
+ path: P,
+ compress: bool,
+ owner: Option<String>,
+ ) -> Option<Self> {
+ if path.as_ref().file_name().is_some() {
+ Some(Self {
+ base_path: path.as_ref().to_path_buf(),
+ compress,
+ owner,
+ })
+ } else {
+ None
+ }
+ }
+
+ /// Returns an iterator over the logrotated file names that exist
+ pub fn file_names(&self) -> LogRotateFileNames {
+ LogRotateFileNames {
+ base_path: self.base_path.clone(),
+ count: 0,
+ compress: self.compress
+ }
+ }
+
+ /// Returns an iterator over the logrotated file handles
+ pub fn files(&self) -> LogRotateFiles {
+ LogRotateFiles {
+ file_names: self.file_names(),
+ }
+ }
+
+ fn compress(source_path: &PathBuf, target_path: &PathBuf, options: &CreateOptions) -> Result<(), Error> {
+ let mut source = File::open(source_path)?;
+ let (fd, tmp_path) = make_tmp_file(target_path, options.clone())?;
+ let target = unsafe { File::from_raw_fd(fd.into_raw_fd()) };
+ let mut encoder = match zstd::stream::write::Encoder::new(target, 0) {
+ Ok(encoder) => encoder,
+ Err(err) => {
+ let _ = unistd::unlink(&tmp_path);
+ bail!("creating zstd encoder failed - {}", err);
+ }
+ };
+
+ if let Err(err) = std::io::copy(&mut source, &mut encoder) {
+ let _ = unistd::unlink(&tmp_path);
+ bail!("zstd encoding failed for file {:?} - {}", target_path, err);
+ }
+
+ if let Err(err) = encoder.finish() {
+ let _ = unistd::unlink(&tmp_path);
+ bail!("zstd finish failed for file {:?} - {}", target_path, err);
+ }
+
+ if let Err(err) = rename(&tmp_path, target_path) {
+ let _ = unistd::unlink(&tmp_path);
+ bail!("rename failed for file {:?} - {}", target_path, err);
+ }
+
+ if let Err(err) = unistd::unlink(source_path) {
+ bail!("unlink failed for file {:?} - {}", source_path, err);
+ }
+
+ Ok(())
+ }
+
+ /// Rotates the files up to 'max_files'
+ /// if the 'compress' option was given it will compress the newest file
+ ///
+ /// e.g. rotates
+ /// foo.2.zst => foo.3.zst
+ /// foo.1 => foo.2.zst
+ /// foo => foo.1
+ pub fn do_rotate(&mut self, options: CreateOptions, max_files: Option<usize>) -> Result<(), Error> {
+ let mut filenames: Vec<PathBuf> = self.file_names().collect();
+ if filenames.is_empty() {
+ return Ok(()); // no file means nothing to rotate
+ }
+ let count = filenames.len() + 1;
+
+ let mut next_filename = self.base_path.clone().canonicalize()?.into_os_string();
+ next_filename.push(format!(".{}", filenames.len()));
+ if self.compress && count > 2 {
+ next_filename.push(".zst");
+ }
+
+ filenames.push(PathBuf::from(next_filename));
+
+ for i in (0..count-1).rev() {
+ if self.compress
+ && filenames[i].extension() != Some(std::ffi::OsStr::new("zst"))
+ && filenames[i+1].extension() == Some(std::ffi::OsStr::new("zst"))
+ {
+ Self::compress(&filenames[i], &filenames[i+1], &options)?;
+ } else {
+ rename(&filenames[i], &filenames[i+1])?;
+ }
+ }
+
+ if let Some(max_files) = max_files {
+ for file in filenames.iter().skip(max_files) {
+ if let Err(err) = unistd::unlink(file) {
+ eprintln!("could not remove {:?}: {}", &file, err);
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn rotate(
+ &mut self,
+ max_size: u64,
+ options: Option<CreateOptions>,
+ max_files: Option<usize>
+ ) -> Result<bool, Error> {
+
+ let options = match options {
+ Some(options) => options,
+ None => match self.owner.as_deref() {
+ Some(owner) => {
+ let user = crate::sys::query_user(owner)?
+ .ok_or_else(|| {
+ format_err!("failed to lookup owning user '{}' for logs", owner)
+ })?;
+ CreateOptions::new().owner(user.uid).group(user.gid)
+ }
+ None => CreateOptions::new(),
+ }
+ };
+
+ let metadata = match self.base_path.metadata() {
+ Ok(metadata) => metadata,
+ Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
+ Err(err) => bail!("unable to open task archive - {}", err),
+ };
+
+ if metadata.len() > max_size {
+ self.do_rotate(options, max_files)?;
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+}
+
+/// Iterator over logrotated file names
+pub struct LogRotateFileNames {
+ base_path: PathBuf,
+ count: usize,
+ compress: bool,
+}
+
+impl Iterator for LogRotateFileNames {
+ type Item = PathBuf;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.count > 0 {
+ let mut path: std::ffi::OsString = self.base_path.clone().into();
+
+ path.push(format!(".{}", self.count));
+ self.count += 1;
+
+ if Path::new(&path).is_file() {
+ Some(path.into())
+ } else if self.compress {
+ path.push(".zst");
+ if Path::new(&path).is_file() {
+ Some(path.into())
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ } else if self.base_path.is_file() {
+ self.count += 1;
+ Some(self.base_path.to_path_buf())
+ } else {
+ None
+ }
+ }
+}
+
+/// Iterator over logrotated files by returning a boxed reader
+pub struct LogRotateFiles {
+ file_names: LogRotateFileNames,
+}
+
+impl Iterator for LogRotateFiles {
+ type Item = Box<dyn Read + Send>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let filename = self.file_names.next()?;
+ let file = File::open(&filename).ok()?;
+
+ if filename.extension() == Some(std::ffi::OsStr::new("zst")) {
+ let encoder = zstd::stream::read::Decoder::new(file).ok()?;
+ return Some(Box::new(encoder));
+ }
+
+ Some(Box::new(file))
+ }
+}
--- /dev/null
+//! System level helpers.
+
+use nix::unistd::{Gid, Group, Uid, User};
+
+/// Query a user by name but only unless built with `#[cfg(test)]`.
+///
+/// This is to avoid having regression tests query the users of development machines which may
+/// not be compatible with PBS or privileged enough.
+pub fn query_user(user_name: &str) -> Result<Option<User>, nix::Error> {
+ if cfg!(test) {
+ Ok(Some(
+ User::from_uid(Uid::current())?.expect("current user does not exist"),
+ ))
+ } else {
+ User::from_name(user_name)
+ }
+}
+
+/// Query a group by name but only unless built with `#[cfg(test)]`.
+///
+/// This is to avoid having regression tests query the groups of development machines which may
+/// not be compatible with PBS or privileged enough.
+pub fn query_group(group_name: &str) -> Result<Option<Group>, nix::Error> {
+ if cfg!(test) {
+ Ok(Some(
+ Group::from_gid(Gid::current())?.expect("current group does not exist"),
+ ))
+ } else {
+ Group::from_name(group_name)
+ }
+}
walkdir = "2"
serde_json = "1.0"
-proxmox = { version = "0.13.0", features = [ "api-macro", "cli", "router" ] }
+proxmox = { version = "0.13.0", features = [ "api-macro", "cli" ] }
pbs-client = { path = "../pbs-client" }
pbs-datastore = { path = "../pbs-datastore" }
--- /dev/null
+[package]
+name = "proxmox-file-restore"
+version = "0.1.0"
+authors = ["Proxmox Support Team <support@proxmox.com>"]
+edition = "2018"
+
+[dependencies]
+anyhow = "1.0"
+base64 = "0.12"
+futures = "0.3"
+libc = "0.2"
+nix = "0.19.1"
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+tokio = { version = "1.6", features = [ "io-std", "rt", "rt-multi-thread", "time" ] }
+
+pxar = { version = "0.10.1", features = [ "tokio-io" ] }
+
+proxmox = { version = "0.13.0", features = [ "api-macro", "cli" ] }
+
+pbs-api-types = { path = "../pbs-api-types" }
+pbs-buildcfg = { path = "../pbs-buildcfg" }
+pbs-client = { path = "../pbs-client" }
+pbs-datastore = { path = "../pbs-datastore" }
+pbs-runtime = { path = "../pbs-runtime" }
+pbs-systemd = { path = "../pbs-systemd" }
+pbs-tools = { path = "../pbs-tools" }
--- /dev/null
+//! Abstraction layer over different methods of accessing a block backup
+use std::collections::HashMap;
+use std::future::Future;
+use std::hash::BuildHasher;
+use std::pin::Pin;
+
+use anyhow::{bail, Error};
+use serde::{Deserialize, Serialize};
+use serde_json::{json, Value};
+
+use proxmox::api::{api, cli::*};
+
+use pbs_client::BackupRepository;
+use pbs_datastore::backup_info::BackupDir;
+use pbs_datastore::catalog::ArchiveEntry;
+use pbs_datastore::manifest::BackupManifest;
+
+use super::block_driver_qemu::QemuBlockDriver;
+
+/// Contains details about a snapshot that is to be accessed by block file restore
+pub struct SnapRestoreDetails {
+ pub repo: BackupRepository,
+ pub snapshot: BackupDir,
+ pub manifest: BackupManifest,
+ pub keyfile: Option<String>,
+}
+
+/// Return value of a BlockRestoreDriver.status() call, 'id' must be valid for .stop(id)
+pub struct DriverStatus {
+ pub id: String,
+ pub data: Value,
+}
+
+pub type Async<R> = Pin<Box<dyn Future<Output = R> + Send>>;
+
+/// An abstract implementation for retrieving data out of a block file backup
+pub trait BlockRestoreDriver {
+ /// List ArchiveEntrys for the given image file and path
+ fn data_list(
+ &self,
+ details: SnapRestoreDetails,
+ img_file: String,
+ path: Vec<u8>,
+ ) -> Async<Result<Vec<ArchiveEntry>, Error>>;
+
+ /// pxar=true:
+ /// Attempt to create a pxar archive of the given file path and return a reader instance for it
+ /// pxar=false:
+ /// Attempt to read the file or folder at the given path and return the file content or a zip
+ /// file as a stream
+ fn data_extract(
+ &self,
+ details: SnapRestoreDetails,
+ img_file: String,
+ path: Vec<u8>,
+ pxar: bool,
+ ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>>;
+
+ /// Return status of all running/mapped images, result value is (id, extra data), where id must
+ /// match with the ones returned from list()
+ fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>>;
+ /// Stop/Close a running restore method
+ fn stop(&self, id: String) -> Async<Result<(), Error>>;
+ /// Returned ids must be prefixed with driver type so that they cannot collide between drivers,
+ /// the returned values must be passable to stop()
+ fn list(&self) -> Vec<String>;
+}
+
+#[api()]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)]
+pub enum BlockDriverType {
+ /// Uses a small QEMU/KVM virtual machine to map images securely. Requires PVE-patched QEMU.
+ Qemu,
+}
+
+impl BlockDriverType {
+ fn resolve(&self) -> impl BlockRestoreDriver {
+ match self {
+ BlockDriverType::Qemu => QemuBlockDriver {},
+ }
+ }
+}
+
+const DEFAULT_DRIVER: BlockDriverType = BlockDriverType::Qemu;
+const ALL_DRIVERS: &[BlockDriverType] = &[BlockDriverType::Qemu];
+
+pub async fn data_list(
+ driver: Option<BlockDriverType>,
+ details: SnapRestoreDetails,
+ img_file: String,
+ path: Vec<u8>,
+) -> Result<Vec<ArchiveEntry>, Error> {
+ let driver = driver.unwrap_or(DEFAULT_DRIVER).resolve();
+ driver.data_list(details, img_file, path).await
+}
+
+pub async fn data_extract(
+ driver: Option<BlockDriverType>,
+ details: SnapRestoreDetails,
+ img_file: String,
+ path: Vec<u8>,
+ pxar: bool,
+) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>, Error> {
+ let driver = driver.unwrap_or(DEFAULT_DRIVER).resolve();
+ driver.data_extract(details, img_file, path, pxar).await
+}
+
+#[api(
+ input: {
+ properties: {
+ "driver": {
+ type: BlockDriverType,
+ optional: true,
+ },
+ "output-format": {
+ schema: OUTPUT_FORMAT,
+ optional: true,
+ },
+ },
+ },
+)]
+/// Retrieve status information about currently running/mapped restore images
+pub async fn status(driver: Option<BlockDriverType>, param: Value) -> Result<(), Error> {
+ let output_format = get_output_format(¶m);
+ let text = output_format == "text";
+
+ let mut ret = json!({});
+
+ for dt in ALL_DRIVERS {
+ if driver.is_some() && &driver.unwrap() != dt {
+ continue;
+ }
+
+ let drv_name = format!("{:?}", dt);
+ let drv = dt.resolve();
+ match drv.status().await {
+ Ok(data) if data.is_empty() => {
+ if text {
+ println!("{}: no mappings", drv_name);
+ } else {
+ ret[drv_name] = json!({});
+ }
+ }
+ Ok(data) => {
+ if text {
+ println!("{}:", &drv_name);
+ }
+
+ ret[&drv_name]["ids"] = json!({});
+ for status in data {
+ if text {
+ println!("{} \t({})", status.id, status.data);
+ } else {
+ ret[&drv_name]["ids"][status.id] = status.data;
+ }
+ }
+ }
+ Err(err) => {
+ if text {
+ eprintln!("error getting status from driver '{}' - {}", drv_name, err);
+ } else {
+ ret[drv_name] = json!({ "error": format!("{}", err) });
+ }
+ }
+ }
+ }
+
+ if !text {
+ format_and_print_result(&ret, &output_format);
+ }
+
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ "name": {
+ type: String,
+ description: "The name of the VM to stop.",
+ },
+ },
+ },
+)]
+/// Immediately stop/unmap a given image. Not typically necessary, as VMs will stop themselves
+/// after a timer anyway.
+pub async fn stop(name: String) -> Result<(), Error> {
+ for drv in ALL_DRIVERS.iter().map(BlockDriverType::resolve) {
+ if drv.list().contains(&name) {
+ return drv.stop(name).await;
+ }
+ }
+
+ bail!("no mapping with name '{}' found", name);
+}
+
+/// Autocompletion handler for block mappings
+pub fn complete_block_driver_ids<S: BuildHasher>(
+ _arg: &str,
+ _param: &HashMap<String, String, S>,
+) -> Vec<String> {
+ ALL_DRIVERS
+ .iter()
+ .map(BlockDriverType::resolve)
+ .map(|d| d.list())
+ .flatten()
+ .collect()
+}
--- /dev/null
+//! Block file access via a small QEMU restore VM using the PBS block driver in QEMU
+use std::collections::HashMap;
+use std::fs::{File, OpenOptions};
+use std::io::{prelude::*, SeekFrom};
+
+use anyhow::{bail, Error};
+use futures::FutureExt;
+use serde::{Deserialize, Serialize};
+use serde_json::json;
+
+use proxmox::tools::fs::lock_file;
+
+use pbs_client::{DEFAULT_VSOCK_PORT, BackupRepository, VsockClient};
+use pbs_datastore::backup_info::BackupDir;
+use pbs_datastore::catalog::ArchiveEntry;
+
+use super::block_driver::*;
+use crate::get_user_run_dir;
+
+const RESTORE_VM_MAP: &str = "restore-vm-map.json";
+
+pub struct QemuBlockDriver {}
+
+#[derive(Clone, Hash, Serialize, Deserialize)]
+struct VMState {
+ pid: i32,
+ cid: i32,
+ ticket: String,
+}
+
+struct VMStateMap {
+ map: HashMap<String, VMState>,
+ file: File,
+}
+
+impl VMStateMap {
+ fn open_file_raw(write: bool) -> Result<File, Error> {
+ use std::os::unix::fs::OpenOptionsExt;
+ let mut path = get_user_run_dir()?;
+ path.push(RESTORE_VM_MAP);
+ OpenOptions::new()
+ .read(true)
+ .write(write)
+ .create(write)
+ .mode(0o600)
+ .open(path)
+ .map_err(Error::from)
+ }
+
+ /// Acquire a lock on the state map and retrieve a deserialized version
+ fn load() -> Result<Self, Error> {
+ let mut file = Self::open_file_raw(true)?;
+ lock_file(&mut file, true, Some(std::time::Duration::from_secs(120)))?;
+ let map = serde_json::from_reader(&file).unwrap_or_default();
+ Ok(Self { map, file })
+ }
+
+ /// Load a read-only copy of the current VM map. Only use for informational purposes, like
+ /// shell auto-completion, for anything requiring consistency use load() !
+ fn load_read_only() -> Result<HashMap<String, VMState>, Error> {
+ let file = Self::open_file_raw(false)?;
+ Ok(serde_json::from_reader(&file).unwrap_or_default())
+ }
+
+ /// Write back a potentially modified state map, consuming the held lock
+ fn write(mut self) -> Result<(), Error> {
+ self.file.seek(SeekFrom::Start(0))?;
+ self.file.set_len(0)?;
+ serde_json::to_writer(self.file, &self.map)?;
+
+ // drop ourselves including file lock
+ Ok(())
+ }
+
+ /// Return the map, but drop the lock immediately
+ fn read_only(self) -> HashMap<String, VMState> {
+ self.map
+ }
+}
+
+fn make_name(repo: &BackupRepository, snap: &BackupDir) -> String {
+ let full = format!("qemu_{}/{}", repo, snap);
+ pbs_systemd::escape_unit(&full, false)
+}
+
+/// remove non-responsive VMs from given map, returns 'true' if map was modified
+async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool {
+ let mut to_remove = Vec::new();
+ for (name, state) in map.iter() {
+ let client = VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
+ let res = client
+ .get("api2/json/status", Some(json!({"keep-timeout": true})))
+ .await;
+ if res.is_err() {
+ // VM is not reachable, remove from map and inform user
+ to_remove.push(name.clone());
+ eprintln!(
+ "VM '{}' (pid: {}, cid: {}) was not reachable, removing from map",
+ name, state.pid, state.cid
+ );
+ let _ = super::qemu_helper::try_kill_vm(state.pid);
+ }
+ }
+
+ for tr in &to_remove {
+ map.remove(tr);
+ }
+
+ !to_remove.is_empty()
+}
+
+fn new_ticket() -> String {
+ proxmox::tools::Uuid::generate().to_string()
+}
+
+async fn ensure_running(details: &SnapRestoreDetails) -> Result<VsockClient, Error> {
+ let name = make_name(&details.repo, &details.snapshot);
+ let mut state = VMStateMap::load()?;
+
+ cleanup_map(&mut state.map).await;
+
+ let new_cid;
+ let vms = match state.map.get(&name) {
+ Some(vm) => {
+ let client = VsockClient::new(vm.cid, DEFAULT_VSOCK_PORT, Some(vm.ticket.clone()));
+ let res = client.get("api2/json/status", None).await;
+ match res {
+ Ok(_) => {
+ // VM is running and we just reset its timeout, nothing to do
+ return Ok(client);
+ }
+ Err(err) => {
+ eprintln!("stale VM detected, restarting ({})", err);
+ // VM is dead, restart
+ let _ = super::qemu_helper::try_kill_vm(vm.pid);
+ let vms = start_vm(vm.cid, details).await?;
+ new_cid = vms.cid;
+ state.map.insert(name, vms.clone());
+ vms
+ }
+ }
+ }
+ None => {
+ let mut cid = state
+ .map
+ .iter()
+ .map(|v| v.1.cid)
+ .max()
+ .unwrap_or(0)
+ .wrapping_add(1);
+
+ // offset cid by user id, to avoid unneccessary retries
+ let running_uid = nix::unistd::Uid::current();
+ cid = cid.wrapping_add(running_uid.as_raw() as i32);
+
+ // some low CIDs have special meaning, start at 10 to avoid them
+ cid = cid.max(10);
+
+ let vms = start_vm(cid, details).await?;
+ new_cid = vms.cid;
+ state.map.insert(name, vms.clone());
+ vms
+ }
+ };
+
+ state.write()?;
+ Ok(VsockClient::new(
+ new_cid,
+ DEFAULT_VSOCK_PORT,
+ Some(vms.ticket.clone()),
+ ))
+}
+
+async fn start_vm(cid_request: i32, details: &SnapRestoreDetails) -> Result<VMState, Error> {
+ let ticket = new_ticket();
+ let files = details
+ .manifest
+ .files()
+ .iter()
+ .map(|file| file.filename.clone())
+ .filter(|name| name.ends_with(".img.fidx"));
+ let (pid, cid) =
+ super::qemu_helper::start_vm((cid_request.abs() & 0xFFFF) as u16, details, files, &ticket)
+ .await?;
+ Ok(VMState { pid, cid, ticket })
+}
+
+impl BlockRestoreDriver for QemuBlockDriver {
+ fn data_list(
+ &self,
+ details: SnapRestoreDetails,
+ img_file: String,
+ mut path: Vec<u8>,
+ ) -> Async<Result<Vec<ArchiveEntry>, Error>> {
+ async move {
+ let client = ensure_running(&details).await?;
+ if !path.is_empty() && path[0] != b'/' {
+ path.insert(0, b'/');
+ }
+ let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
+ let mut result = client
+ .get("api2/json/list", Some(json!({ "path": path })))
+ .await?;
+ serde_json::from_value(result["data"].take()).map_err(|err| err.into())
+ }
+ .boxed()
+ }
+
+ fn data_extract(
+ &self,
+ details: SnapRestoreDetails,
+ img_file: String,
+ mut path: Vec<u8>,
+ pxar: bool,
+ ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> {
+ async move {
+ let client = ensure_running(&details).await?;
+ if !path.is_empty() && path[0] != b'/' {
+ path.insert(0, b'/');
+ }
+ let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
+ let (mut tx, rx) = tokio::io::duplex(1024 * 4096);
+ tokio::spawn(async move {
+ if let Err(err) = client
+ .download(
+ "api2/json/extract",
+ Some(json!({ "path": path, "pxar": pxar })),
+ &mut tx,
+ )
+ .await
+ {
+ eprintln!("reading file extraction stream failed - {}", err);
+ std::process::exit(1);
+ }
+ });
+
+ Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
+ }
+ .boxed()
+ }
+
+ fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> {
+ async move {
+ let mut state_map = VMStateMap::load()?;
+ let modified = cleanup_map(&mut state_map.map).await;
+ let map = if modified {
+ let m = state_map.map.clone();
+ state_map.write()?;
+ m
+ } else {
+ state_map.read_only()
+ };
+ let mut result = Vec::new();
+
+ for (n, s) in map.iter() {
+ let client = VsockClient::new(s.cid, DEFAULT_VSOCK_PORT, Some(s.ticket.clone()));
+ let resp = client
+ .get("api2/json/status", Some(json!({"keep-timeout": true})))
+ .await;
+ let name = pbs_systemd::unescape_unit(n)
+ .unwrap_or_else(|_| "<invalid name>".to_owned());
+ let mut extra = json!({"pid": s.pid, "cid": s.cid});
+
+ match resp {
+ Ok(status) => match status["data"].as_object() {
+ Some(map) => {
+ for (k, v) in map.iter() {
+ extra[k] = v.clone();
+ }
+ }
+ None => {
+ let err = format!(
+ "invalid JSON received from /status call: {}",
+ status.to_string()
+ );
+ extra["error"] = json!(err);
+ }
+ },
+ Err(err) => {
+ let err = format!("error during /status API call: {}", err);
+ extra["error"] = json!(err);
+ }
+ }
+
+ result.push(DriverStatus {
+ id: name,
+ data: extra,
+ });
+ }
+
+ Ok(result)
+ }
+ .boxed()
+ }
+
+ fn stop(&self, id: String) -> Async<Result<(), Error>> {
+ async move {
+ let name = pbs_systemd::escape_unit(&id, false);
+ let mut map = VMStateMap::load()?;
+ let map_mod = cleanup_map(&mut map.map).await;
+ match map.map.get(&name) {
+ Some(state) => {
+ let client =
+ VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
+ // ignore errors, this either fails because:
+ // * the VM is unreachable/dead, in which case we don't want it in the map
+ // * the call was successful and the connection reset when the VM stopped
+ let _ = client.get("api2/json/stop", None).await;
+ map.map.remove(&name);
+ map.write()?;
+ }
+ None => {
+ if map_mod {
+ map.write()?;
+ }
+ bail!("VM with name '{}' not found", name);
+ }
+ }
+ Ok(())
+ }
+ .boxed()
+ }
+
+ fn list(&self) -> Vec<String> {
+ match VMStateMap::load_read_only() {
+ Ok(state) => state
+ .iter()
+ .filter_map(|(name, _)| pbs_systemd::unescape_unit(&name).ok())
+ .collect(),
+ Err(_) => Vec::new(),
+ }
+ }
+}
--- /dev/null
+//! Provides a very basic "newc" format cpio encoder.
+//! See 'man 5 cpio' for format details, as well as:
+//! https://www.kernel.org/doc/html/latest/driver-api/early-userspace/buffer-format.html
+//! This does not provide full support for the format, only what is needed to include files in an
+//! initramfs intended for a linux kernel.
+use std::ffi::{CString, CStr};
+
+use anyhow::{bail, Error};
+use tokio::io::{copy, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+
+/// Write a cpio file entry to an AsyncWrite.
+pub async fn append_file<W: AsyncWrite + Unpin, R: AsyncRead + Unpin>(
+ mut target: W,
+ content: R,
+ name: &CStr,
+ inode: u16,
+ mode: u16,
+ uid: u16,
+ gid: u16,
+ // negative mtimes are generally valid, but cpio defines all fields as unsigned
+ mtime: u64,
+ // c_filesize has 8 bytes, but man page claims that 4 GB files are the maximum, let's be safe
+ size: u32,
+) -> Result<(), Error> {
+ let name = name.to_bytes_with_nul();
+
+ target.write_all(b"070701").await?; // c_magic
+ print_cpio_hex(&mut target, inode as u64).await?; // c_ino
+ print_cpio_hex(&mut target, mode as u64).await?; // c_mode
+ print_cpio_hex(&mut target, uid as u64).await?; // c_uid
+ print_cpio_hex(&mut target, gid as u64).await?; // c_gid
+ print_cpio_hex(&mut target, 0).await?; // c_nlink
+ print_cpio_hex(&mut target, mtime as u64).await?; // c_mtime
+ print_cpio_hex(&mut target, size as u64).await?; // c_filesize
+ print_cpio_hex(&mut target, 0).await?; // c_devmajor
+ print_cpio_hex(&mut target, 0).await?; // c_devminor
+ print_cpio_hex(&mut target, 0).await?; // c_rdevmajor
+ print_cpio_hex(&mut target, 0).await?; // c_rdevminor
+ print_cpio_hex(&mut target, name.len() as u64).await?; // c_namesize
+ print_cpio_hex(&mut target, 0).await?; // c_check (ignored for newc)
+
+ target.write_all(name).await?;
+ let header_size = 6 + 8*13 + name.len();
+ let mut name_pad = header_size;
+ while name_pad & 3 != 0 {
+ target.write_u8(0).await?;
+ name_pad += 1;
+ }
+
+ let mut content = content.take(size as u64);
+ let copied = copy(&mut content, &mut target).await?;
+ if copied < size as u64 {
+ bail!("cpio: not enough data, or size to big - encoding invalid");
+ }
+ let mut data_pad = copied;
+ while data_pad & 3 != 0 {
+ target.write_u8(0).await?;
+ data_pad += 1;
+ }
+
+ Ok(())
+}
+
+/// Write the TRAILER!!! file to an AsyncWrite, signifying the end of a cpio archive. Note that you
+/// can immediately add more files after, to create a concatenated archive, the kernel for example
+/// will merge these upon loading an initramfs.
+pub async fn append_trailer<W: AsyncWrite + Unpin>(target: W) -> Result<(), Error> {
+ let name = CString::new("TRAILER!!!").unwrap();
+ append_file(target, tokio::io::empty(), &name, 0, 0, 0, 0, 0, 0).await
+}
+
+async fn print_cpio_hex<W: AsyncWrite + Unpin>(target: &mut W, value: u64) -> Result<(), Error> {
+ target.write_all(format!("{:08x}", value).as_bytes()).await.map_err(|e| e.into())
+}
--- /dev/null
+use std::ffi::OsStr;
+use std::os::unix::ffi::OsStrExt;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Error};
+use serde_json::{json, Value};
+
+use proxmox::api::{
+ api,
+ cli::{
+ default_table_format_options, format_and_print_result_full, get_output_format,
+ run_cli_command, CliCommand, CliCommandMap, CliEnvironment, ColumnConfig, OUTPUT_FORMAT,
+ },
+};
+use proxmox::tools::fs::{create_path, CreateOptions};
+use pxar::accessor::aio::Accessor;
+use pxar::decoder::aio::Decoder;
+
+use pbs_api_types::CryptMode;
+use pbs_datastore::{CryptConfig, CATALOG_NAME};
+use pbs_datastore::backup_info::BackupDir;
+use pbs_datastore::catalog::{ArchiveEntry, CatalogReader, DirEntryAttribute};
+use pbs_datastore::dynamic_index::{BufferedDynamicReader, LocalDynamicReadAt};
+use pbs_datastore::index::IndexFile;
+use pbs_datastore::key_derivation::decrypt_key;
+use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_client::pxar::{create_zip, extract_sub_dir, extract_sub_dir_seq};
+use pbs_client::tools::{
+ complete_group_or_snapshot, complete_repository, connect, extract_repository_from_value,
+ key_source::{
+ crypto_parameters_keep_fd, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
+ KEYFILE_SCHEMA,
+ },
+ REPO_URL_SCHEMA,
+};
+
+pub mod block_driver;
+pub use block_driver::*;
+
+pub mod cpio;
+
+mod qemu_helper;
+mod block_driver_qemu;
+
+enum ExtractPath {
+ ListArchives,
+ Pxar(String, Vec<u8>),
+ VM(String, Vec<u8>),
+}
+
+fn parse_path(path: String, base64: bool) -> Result<ExtractPath, Error> {
+ let mut bytes = if base64 {
+ base64::decode(&path)
+ .map_err(|err| format_err!("Failed base64-decoding path '{}' - {}", path, err))?
+ } else {
+ path.into_bytes()
+ };
+
+ if bytes == b"/" {
+ return Ok(ExtractPath::ListArchives);
+ }
+
+ while !bytes.is_empty() && bytes[0] == b'/' {
+ bytes.remove(0);
+ }
+
+ let (file, path) = {
+ let slash_pos = bytes.iter().position(|c| *c == b'/').unwrap_or(bytes.len());
+ let path = bytes.split_off(slash_pos);
+ let file = String::from_utf8(bytes)?;
+ (file, path)
+ };
+
+ if file.ends_with(".pxar.didx") {
+ Ok(ExtractPath::Pxar(file, path))
+ } else if file.ends_with(".img.fidx") {
+ Ok(ExtractPath::VM(file, path))
+ } else {
+ bail!("'{}' is not supported for file-restore", file);
+ }
+}
+
+fn keyfile_path(param: &Value) -> Option<String> {
+ if let Some(Value::String(keyfile)) = param.get("keyfile") {
+ return Some(keyfile.to_owned());
+ }
+
+ if let Some(Value::Number(keyfd)) = param.get("keyfd") {
+ return Some(format!("/dev/fd/{}", keyfd));
+ }
+
+ None
+}
+
+#[api(
+ input: {
+ properties: {
+ repository: {
+ schema: REPO_URL_SCHEMA,
+ optional: true,
+ },
+ snapshot: {
+ type: String,
+ description: "Group/Snapshot path.",
+ },
+ "path": {
+ description: "Path to restore. Directories will be restored as .zip files.",
+ type: String,
+ },
+ "base64": {
+ type: Boolean,
+ description: "If set, 'path' will be interpreted as base64 encoded.",
+ optional: true,
+ default: false,
+ },
+ keyfile: {
+ schema: KEYFILE_SCHEMA,
+ optional: true,
+ },
+ "keyfd": {
+ schema: KEYFD_SCHEMA,
+ optional: true,
+ },
+ "crypt-mode": {
+ type: CryptMode,
+ optional: true,
+ },
+ "driver": {
+ type: BlockDriverType,
+ optional: true,
+ },
+ "output-format": {
+ schema: OUTPUT_FORMAT,
+ optional: true,
+ },
+ }
+ },
+ returns: {
+ description: "A list of elements under the given path",
+ type: Array,
+ items: {
+ type: ArchiveEntry,
+ }
+ }
+)]
+/// List a directory from a backup snapshot.
+async fn list(
+ snapshot: String,
+ path: String,
+ base64: bool,
+ param: Value,
+) -> Result<(), Error> {
+ let repo = extract_repository_from_value(¶m)?;
+ let snapshot: BackupDir = snapshot.parse()?;
+ let path = parse_path(path, base64)?;
+
+ let keyfile = keyfile_path(¶m);
+ let crypto = crypto_parameters_keep_fd(¶m)?;
+ let crypt_config = match crypto.enc_key {
+ None => None,
+ Some(ref key) => {
+ let (key, _, _) =
+ decrypt_key(&key.key, &get_encryption_key_password).map_err(|err| {
+ eprintln!("{}", format_key_source(&key.source, "encryption"));
+ err
+ })?;
+ Some(Arc::new(CryptConfig::new(key)?))
+ }
+ };
+
+ let client = connect(&repo)?;
+ let client = BackupReader::start(
+ client,
+ crypt_config.clone(),
+ repo.store(),
+ &snapshot.group().backup_type(),
+ &snapshot.group().backup_id(),
+ snapshot.backup_time(),
+ true,
+ )
+ .await?;
+
+ let (manifest, _) = client.download_manifest().await?;
+ manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref))?;
+
+ let result = match path {
+ ExtractPath::ListArchives => {
+ let mut entries = vec![];
+ for file in manifest.files() {
+ if !file.filename.ends_with(".pxar.didx") && !file.filename.ends_with(".img.fidx") {
+ continue;
+ }
+ let path = format!("/{}", file.filename);
+ let attr = if file.filename.ends_with(".pxar.didx") {
+ // a pxar file is a file archive, so it's root is also a directory root
+ Some(&DirEntryAttribute::Directory { start: 0 })
+ } else {
+ None
+ };
+ entries.push(ArchiveEntry::new_with_size(path.as_bytes(), attr, Some(file.size)));
+ }
+
+ Ok(entries)
+ }
+ ExtractPath::Pxar(file, mut path) => {
+ let index = client
+ .download_dynamic_index(&manifest, CATALOG_NAME)
+ .await?;
+ let most_used = index.find_most_used_chunks(8);
+ let file_info = manifest.lookup_file_info(&CATALOG_NAME)?;
+ let chunk_reader = RemoteChunkReader::new(
+ client.clone(),
+ crypt_config,
+ file_info.chunk_crypt_mode(),
+ most_used,
+ );
+ let reader = BufferedDynamicReader::new(index, chunk_reader);
+ let mut catalog_reader = CatalogReader::new(reader);
+
+ let mut fullpath = file.into_bytes();
+ fullpath.append(&mut path);
+
+ catalog_reader.list_dir_contents(&fullpath)
+ }
+ ExtractPath::VM(file, path) => {
+ let details = SnapRestoreDetails {
+ manifest,
+ repo,
+ snapshot,
+ keyfile,
+ };
+ let driver: Option<BlockDriverType> = match param.get("driver") {
+ Some(drv) => Some(serde_json::from_value(drv.clone())?),
+ None => None,
+ };
+ data_list(driver, details, file, path).await
+ }
+ }?;
+
+ let options = default_table_format_options()
+ .sortby("type", false)
+ .sortby("text", false)
+ .column(ColumnConfig::new("type"))
+ .column(ColumnConfig::new("text").header("name"))
+ .column(ColumnConfig::new("mtime").header("last modified"))
+ .column(ColumnConfig::new("size"));
+
+ let output_format = get_output_format(¶m);
+ format_and_print_result_full(
+ &mut json!(result),
+ &API_METHOD_LIST.returns,
+ &output_format,
+ &options,
+ );
+
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ repository: {
+ schema: REPO_URL_SCHEMA,
+ optional: true,
+ },
+ snapshot: {
+ type: String,
+ description: "Group/Snapshot path.",
+ },
+ "path": {
+ description: "Path to restore. Directories will be restored as .zip files if extracted to stdout.",
+ type: String,
+ },
+ "base64": {
+ type: Boolean,
+ description: "If set, 'path' will be interpreted as base64 encoded.",
+ optional: true,
+ default: false,
+ },
+ target: {
+ type: String,
+ optional: true,
+ description: "Target directory path. Use '-' to write to standard output.",
+ },
+ keyfile: {
+ schema: KEYFILE_SCHEMA,
+ optional: true,
+ },
+ "keyfd": {
+ schema: KEYFD_SCHEMA,
+ optional: true,
+ },
+ "crypt-mode": {
+ type: CryptMode,
+ optional: true,
+ },
+ verbose: {
+ type: Boolean,
+ description: "Print verbose information",
+ optional: true,
+ default: false,
+ },
+ "driver": {
+ type: BlockDriverType,
+ optional: true,
+ },
+ }
+ }
+)]
+/// Restore files from a backup snapshot.
+async fn extract(
+ snapshot: String,
+ path: String,
+ base64: bool,
+ target: Option<String>,
+ verbose: bool,
+ param: Value,
+) -> Result<(), Error> {
+ let repo = extract_repository_from_value(¶m)?;
+ let snapshot: BackupDir = snapshot.parse()?;
+ let orig_path = path;
+ let path = parse_path(orig_path.clone(), base64)?;
+
+ let target = match target {
+ Some(target) if target == "-" => None,
+ Some(target) => Some(PathBuf::from(target)),
+ None => Some(std::env::current_dir()?),
+ };
+
+ let keyfile = keyfile_path(¶m);
+ let crypto = crypto_parameters_keep_fd(¶m)?;
+ let crypt_config = match crypto.enc_key {
+ None => None,
+ Some(ref key) => {
+ let (key, _, _) =
+ decrypt_key(&key.key, &get_encryption_key_password).map_err(|err| {
+ eprintln!("{}", format_key_source(&key.source, "encryption"));
+ err
+ })?;
+ Some(Arc::new(CryptConfig::new(key)?))
+ }
+ };
+
+ let client = connect(&repo)?;
+ let client = BackupReader::start(
+ client,
+ crypt_config.clone(),
+ repo.store(),
+ &snapshot.group().backup_type(),
+ &snapshot.group().backup_id(),
+ snapshot.backup_time(),
+ true,
+ )
+ .await?;
+ let (manifest, _) = client.download_manifest().await?;
+
+ match path {
+ ExtractPath::Pxar(archive_name, path) => {
+ let file_info = manifest.lookup_file_info(&archive_name)?;
+ let index = client
+ .download_dynamic_index(&manifest, &archive_name)
+ .await?;
+ let most_used = index.find_most_used_chunks(8);
+ let chunk_reader = RemoteChunkReader::new(
+ client.clone(),
+ crypt_config,
+ file_info.chunk_crypt_mode(),
+ most_used,
+ );
+ let reader = BufferedDynamicReader::new(index, chunk_reader);
+
+ let archive_size = reader.archive_size();
+ let reader = LocalDynamicReadAt::new(reader);
+ let decoder = Accessor::new(reader, archive_size).await?;
+ extract_to_target(decoder, &path, target, verbose).await?;
+ }
+ ExtractPath::VM(file, path) => {
+ let details = SnapRestoreDetails {
+ manifest,
+ repo,
+ snapshot,
+ keyfile,
+ };
+ let driver: Option<BlockDriverType> = match param.get("driver") {
+ Some(drv) => Some(serde_json::from_value(drv.clone())?),
+ None => None,
+ };
+
+ if let Some(mut target) = target {
+ let reader = data_extract(driver, details, file, path.clone(), true).await?;
+ let decoder = Decoder::from_tokio(reader).await?;
+ extract_sub_dir_seq(&target, decoder, verbose).await?;
+
+ // we extracted a .pxarexclude-cli file auto-generated by the VM when encoding the
+ // archive, this file is of no use for the user, so try to remove it
+ target.push(".pxarexclude-cli");
+ std::fs::remove_file(target).map_err(|e| {
+ format_err!("unable to remove temporary .pxarexclude-cli file - {}", e)
+ })?;
+ } else {
+ let mut reader = data_extract(driver, details, file, path.clone(), false).await?;
+ tokio::io::copy(&mut reader, &mut tokio::io::stdout()).await?;
+ }
+ }
+ _ => {
+ bail!("cannot extract '{}'", orig_path);
+ }
+ }
+
+ Ok(())
+}
+
+async fn extract_to_target<T>(
+ decoder: Accessor<T>,
+ path: &[u8],
+ target: Option<PathBuf>,
+ verbose: bool,
+) -> Result<(), Error>
+where
+ T: pxar::accessor::ReadAt + Clone + Send + Sync + Unpin + 'static,
+{
+ let path = if path.is_empty() { b"/" } else { path };
+
+ let root = decoder.open_root().await?;
+ let file = root
+ .lookup(OsStr::from_bytes(path))
+ .await?
+ .ok_or_else(|| format_err!("error opening '{:?}'", path))?;
+
+ if let Some(target) = target {
+ extract_sub_dir(target, decoder, OsStr::from_bytes(path), verbose).await?;
+ } else {
+ match file.kind() {
+ pxar::EntryKind::File { .. } => {
+ tokio::io::copy(&mut file.contents().await?, &mut tokio::io::stdout()).await?;
+ }
+ _ => {
+ create_zip(
+ tokio::io::stdout(),
+ decoder,
+ OsStr::from_bytes(path),
+ verbose,
+ )
+ .await?;
+ }
+ }
+ }
+
+ Ok(())
+}
+
+fn main() {
+ let list_cmd_def = CliCommand::new(&API_METHOD_LIST)
+ .arg_param(&["snapshot", "path"])
+ .completion_cb("repository", complete_repository)
+ .completion_cb("snapshot", complete_group_or_snapshot);
+
+ let restore_cmd_def = CliCommand::new(&API_METHOD_EXTRACT)
+ .arg_param(&["snapshot", "path", "target"])
+ .completion_cb("repository", complete_repository)
+ .completion_cb("snapshot", complete_group_or_snapshot)
+ .completion_cb("target", pbs_tools::fs::complete_file_name);
+
+ let status_cmd_def = CliCommand::new(&API_METHOD_STATUS);
+ let stop_cmd_def = CliCommand::new(&API_METHOD_STOP)
+ .arg_param(&["name"])
+ .completion_cb("name", complete_block_driver_ids);
+
+ let cmd_def = CliCommandMap::new()
+ .insert("list", list_cmd_def)
+ .insert("extract", restore_cmd_def)
+ .insert("status", status_cmd_def)
+ .insert("stop", stop_cmd_def);
+
+ let rpcenv = CliEnvironment::new();
+ run_cli_command(
+ cmd_def,
+ rpcenv,
+ Some(|future| pbs_runtime::main(future)),
+ );
+}
+
+/// Returns a runtime dir owned by the current user.
+/// Note that XDG_RUNTIME_DIR is not always available, especially for non-login users like
+/// "www-data", so we use a custom one in /run/proxmox-backup/<uid> instead.
+pub fn get_user_run_dir() -> Result<std::path::PathBuf, Error> {
+ let uid = nix::unistd::Uid::current();
+ let mut path: std::path::PathBuf = pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR.into();
+ path.push(uid.to_string());
+ create_run_dir()?;
+ std::fs::create_dir_all(&path)?;
+ Ok(path)
+}
+
+/// FIXME: proxmox-file-restore should not depend on this!
+fn create_run_dir() -> Result<(), Error> {
+ let backup_user = backup_user()?;
+ let opts = CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+ let _: bool = create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), None, Some(opts))?;
+ Ok(())
+}
+
+/// Return User info for the 'backup' user (``getpwnam_r(3)``)
+pub fn backup_user() -> Result<nix::unistd::User, Error> {
+ pbs_tools::sys::query_user(pbs_buildcfg::BACKUP_USER_NAME)?
+ .ok_or_else(|| format_err!("Unable to lookup '{}' user.", pbs_buildcfg::BACKUP_USER_NAME))
+}
+
--- /dev/null
+//! Helper to start a QEMU VM for single file restore.
+use std::fs::{File, OpenOptions};
+use std::io::prelude::*;
+use std::os::unix::io::AsRawFd;
+use std::path::PathBuf;
+use std::time::Duration;
+
+use anyhow::{bail, format_err, Error};
+use tokio::time;
+
+use nix::sys::signal::{kill, Signal};
+use nix::unistd::Pid;
+
+use proxmox::tools::fs::{create_path, file_read_string, make_tmp_file, CreateOptions};
+
+use pbs_client::{VsockClient, DEFAULT_VSOCK_PORT};
+
+use crate::{cpio, backup_user};
+use super::SnapRestoreDetails;
+
+const PBS_VM_NAME: &str = "pbs-restore-vm";
+const MAX_CID_TRIES: u64 = 32;
+
+fn create_restore_log_dir() -> Result<String, Error> {
+ let logpath = format!("{}/file-restore", pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR);
+
+ proxmox::try_block!({
+ let backup_user = backup_user()?;
+ let opts = CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+
+ let opts_root = CreateOptions::new()
+ .owner(nix::unistd::ROOT)
+ .group(nix::unistd::Gid::from_raw(0));
+
+ create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts))?;
+ create_path(&logpath, None, Some(opts_root))?;
+ Ok(())
+ })
+ .map_err(|err: Error| format_err!("unable to create file-restore log dir - {}", err))?;
+
+ Ok(logpath)
+}
+
+fn validate_img_existance(debug: bool) -> Result<(), Error> {
+ let kernel = PathBuf::from(pbs_buildcfg::PROXMOX_BACKUP_KERNEL_FN);
+ let initramfs = PathBuf::from(if debug {
+ pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_DBG_FN
+ } else {
+ pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_FN
+ });
+ if !kernel.exists() || !initramfs.exists() {
+ bail!("cannot run file-restore VM: package 'proxmox-backup-restore-image' is not (correctly) installed");
+ }
+ Ok(())
+}
+
+pub fn try_kill_vm(pid: i32) -> Result<(), Error> {
+ let pid = Pid::from_raw(pid);
+ if let Ok(()) = kill(pid, None) {
+ // process is running (and we could kill it), check if it is actually ours
+ // (if it errors assume we raced with the process's death and ignore it)
+ if let Ok(cmdline) = file_read_string(format!("/proc/{}/cmdline", pid)) {
+ if cmdline.split('\0').any(|a| a == PBS_VM_NAME) {
+ // yes, it's ours, kill it brutally with SIGKILL, no reason to take
+ // any chances - in this state it's most likely broken anyway
+ if let Err(err) = kill(pid, Signal::SIGKILL) {
+ bail!(
+ "reaping broken VM (pid {}) with SIGKILL failed: {}",
+ pid,
+ err
+ );
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
+
+async fn create_temp_initramfs(ticket: &str, debug: bool) -> Result<(File, String), Error> {
+ use std::ffi::CString;
+ use tokio::fs::File;
+
+ let (tmp_file, tmp_path) =
+ make_tmp_file("/tmp/file-restore-qemu.initramfs.tmp", CreateOptions::new())?;
+ nix::unistd::unlink(&tmp_path)?;
+ pbs_tools::fd::fd_change_cloexec(tmp_file.as_raw_fd(), false)?;
+
+ let initramfs = if debug {
+ pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_DBG_FN
+ } else {
+ pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_FN
+ };
+
+ let mut f = File::from_std(tmp_file);
+ let mut base = File::open(initramfs).await?;
+
+ tokio::io::copy(&mut base, &mut f).await?;
+
+ let name = CString::new("ticket").unwrap();
+ cpio::append_file(
+ &mut f,
+ ticket.as_bytes(),
+ &name,
+ 0,
+ (libc::S_IFREG | 0o400) as u16,
+ 0,
+ 0,
+ 0,
+ ticket.len() as u32,
+ )
+ .await?;
+ cpio::append_trailer(&mut f).await?;
+
+ let tmp_file = f.into_std().await;
+ let path = format!("/dev/fd/{}", &tmp_file.as_raw_fd());
+
+ Ok((tmp_file, path))
+}
+
+pub async fn start_vm(
+ // u16 so we can do wrapping_add without going too high
+ mut cid: u16,
+ details: &SnapRestoreDetails,
+ files: impl Iterator<Item = String>,
+ ticket: &str,
+) -> Result<(i32, i32), Error> {
+ if let Err(_) = std::env::var("PBS_PASSWORD") {
+ bail!("environment variable PBS_PASSWORD has to be set for QEMU VM restore");
+ }
+
+ let debug = if let Ok(val) = std::env::var("PBS_QEMU_DEBUG") {
+ !val.is_empty()
+ } else {
+ false
+ };
+
+ validate_img_existance(debug)?;
+
+ let pid;
+ let (mut pid_file, pid_path) = make_tmp_file("/tmp/file-restore-qemu.pid.tmp", CreateOptions::new())?;
+ nix::unistd::unlink(&pid_path)?;
+ pbs_tools::fd::fd_change_cloexec(pid_file.as_raw_fd(), false)?;
+
+ let (_ramfs_pid, ramfs_path) = create_temp_initramfs(ticket, debug).await?;
+
+ let logpath = create_restore_log_dir()?;
+ let logfile = &format!("{}/qemu.log", logpath);
+ let mut logrotate = pbs_tools::logrotate::LogRotate::new(logfile, false)
+ .ok_or_else(|| format_err!("could not get QEMU log file names"))?;
+
+ if let Err(err) = logrotate.do_rotate(CreateOptions::default(), Some(16)) {
+ eprintln!("warning: logrotate for QEMU log file failed - {}", err);
+ }
+
+ let mut logfd = OpenOptions::new()
+ .append(true)
+ .create_new(true)
+ .open(logfile)?;
+ pbs_tools::fd::fd_change_cloexec(logfd.as_raw_fd(), false)?;
+
+ // preface log file with start timestamp so one can see how long QEMU took to start
+ writeln!(logfd, "[{}] PBS file restore VM log", {
+ let now = proxmox::tools::time::epoch_i64();
+ proxmox::tools::time::epoch_to_rfc3339(now)?
+ },)?;
+
+ let base_args = [
+ "-chardev",
+ &format!(
+ "file,id=log,path=/dev/null,logfile=/dev/fd/{},logappend=on",
+ logfd.as_raw_fd()
+ ),
+ "-serial",
+ "chardev:log",
+ "-vnc",
+ "none",
+ "-enable-kvm",
+ "-kernel",
+ pbs_buildcfg::PROXMOX_BACKUP_KERNEL_FN,
+ "-initrd",
+ &ramfs_path,
+ "-append",
+ &format!(
+ "{} panic=1 zfs_arc_min=0 zfs_arc_max=0",
+ if debug { "debug" } else { "quiet" }
+ ),
+ "-daemonize",
+ "-pidfile",
+ &format!("/dev/fd/{}", pid_file.as_raw_fd()),
+ "-name",
+ PBS_VM_NAME,
+ ];
+
+ // Generate drive arguments for all fidx files in backup snapshot
+ let mut drives = Vec::new();
+ let mut id = 0;
+ for file in files {
+ if !file.ends_with(".img.fidx") {
+ continue;
+ }
+ drives.push("-drive".to_owned());
+ let keyfile = if let Some(ref keyfile) = details.keyfile {
+ format!(",,keyfile={}", keyfile)
+ } else {
+ "".to_owned()
+ };
+ drives.push(format!(
+ "file=pbs:repository={},,snapshot={},,archive={}{},read-only=on,if=none,id=drive{}",
+ details.repo, details.snapshot, file, keyfile, id
+ ));
+
+ // a PCI bus can only support 32 devices, so add a new one every 32
+ let bus = (id / 32) + 2;
+ if id % 32 == 0 {
+ drives.push("-device".to_owned());
+ drives.push(format!("pci-bridge,id=bridge{},chassis_nr={}", bus, bus));
+ }
+
+ drives.push("-device".to_owned());
+ // drive serial is used by VM to map .fidx files to /dev paths
+ let serial = file.strip_suffix(".img.fidx").unwrap_or(&file);
+ drives.push(format!(
+ "virtio-blk-pci,drive=drive{},serial={},bus=bridge{}",
+ id, serial, bus
+ ));
+ id += 1;
+ }
+
+ let ram = if debug {
+ 1024
+ } else {
+ // add more RAM if many drives are given
+ match id {
+ f if f < 10 => 192,
+ f if f < 20 => 256,
+ _ => 384,
+ }
+ };
+
+ // Try starting QEMU in a loop to retry if we fail because of a bad 'cid' value
+ let mut attempts = 0;
+ loop {
+ let mut qemu_cmd = std::process::Command::new("qemu-system-x86_64");
+ qemu_cmd.args(base_args.iter());
+ qemu_cmd.arg("-m");
+ qemu_cmd.arg(ram.to_string());
+ qemu_cmd.args(&drives);
+ qemu_cmd.arg("-device");
+ qemu_cmd.arg(format!(
+ "vhost-vsock-pci,guest-cid={},disable-legacy=on",
+ cid
+ ));
+
+ if debug {
+ let debug_args = [
+ "-chardev",
+ &format!(
+ "socket,id=debugser,path=/run/proxmox-backup/file-restore-serial-{}.sock,server,nowait",
+ cid
+ ),
+ "-serial",
+ "chardev:debugser",
+ ];
+ qemu_cmd.args(debug_args.iter());
+ }
+
+ qemu_cmd.stdout(std::process::Stdio::null());
+ qemu_cmd.stderr(std::process::Stdio::piped());
+
+ let res = tokio::task::block_in_place(|| qemu_cmd.spawn()?.wait_with_output())?;
+
+ if res.status.success() {
+ // at this point QEMU is already daemonized and running, so if anything fails we
+ // technically leave behind a zombie-VM... this shouldn't matter, as it will stop
+ // itself soon enough (timer), and the following operations are unlikely to fail
+ let mut pidstr = String::new();
+ pid_file.read_to_string(&mut pidstr)?;
+ pid = pidstr.trim_end().parse().map_err(|err| {
+ format_err!("cannot parse PID returned by QEMU ('{}'): {}", &pidstr, err)
+ })?;
+ break;
+ } else {
+ let out = String::from_utf8_lossy(&res.stderr);
+ if out.contains("unable to set guest cid: Address already in use") {
+ attempts += 1;
+ if attempts >= MAX_CID_TRIES {
+ bail!("CID '{}' in use, but max attempts reached, aborting", cid);
+ }
+ // CID in use, try next higher one
+ eprintln!("CID '{}' in use by other VM, attempting next one", cid);
+ // skip special-meaning low values
+ cid = cid.wrapping_add(1).max(10);
+ } else {
+ eprint!("{}", out);
+ bail!("Starting VM failed. See output above for more information.");
+ }
+ }
+ }
+
+ // QEMU has started successfully, now wait for virtio socket to become ready
+ let pid_t = Pid::from_raw(pid);
+ for _ in 0..60 {
+ let client = VsockClient::new(cid as i32, DEFAULT_VSOCK_PORT, Some(ticket.to_owned()));
+ if let Ok(Ok(_)) =
+ time::timeout(Duration::from_secs(2), client.get("api2/json/status", None)).await
+ {
+ if debug {
+ eprintln!(
+ "Connect to '/run/proxmox-backup/file-restore-serial-{}.sock' for shell access",
+ cid
+ )
+ }
+ return Ok((pid, cid as i32));
+ }
+ if kill(pid_t, None).is_err() { // check if QEMU process exited in between
+ bail!("VM exited before connection could be established");
+ }
+ time::sleep(Duration::from_millis(200)).await;
+ }
+
+ // start failed
+ if let Err(err) = try_kill_vm(pid) {
+ eprintln!("killing failed VM failed: {}", err);
+ }
+ bail!("starting VM timed out");
+}
//! Server/client-specific parts for what's otherwise in pbs-datastore.
-use anyhow::{bail, Error};
+use anyhow::{format_err, Error};
// Note: .pcat1 => Proxmox Catalog Format version 1
pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
-/// Unix system user used by proxmox-backup-proxy
-pub const BACKUP_USER_NAME: &str = "backup";
-/// Unix system group used by proxmox-backup-proxy
-pub const BACKUP_GROUP_NAME: &str = "backup";
+pub use pbs_buildcfg::{BACKUP_USER_NAME, BACKUP_GROUP_NAME};
/// Return User info for the 'backup' user (``getpwnam_r(3)``)
pub fn backup_user() -> Result<nix::unistd::User, Error> {
- if cfg!(test) {
- // fix permission problems with regressions test (when run as non-root).
- Ok(nix::unistd::User::from_uid(nix::unistd::Uid::current())?.unwrap())
- } else {
- match nix::unistd::User::from_name(BACKUP_USER_NAME)? {
- Some(user) => Ok(user),
- None => bail!("Unable to lookup backup user."),
- }
- }
+ pbs_tools::sys::query_user(BACKUP_USER_NAME)?
+ .ok_or_else(|| format_err!("Unable to lookup '{}' user.", BACKUP_USER_NAME))
}
/// Return Group info for the 'backup' group (``getgrnam(3)``)
pub fn backup_group() -> Result<nix::unistd::Group, Error> {
- if cfg!(test) {
- // fix permission problems with regressions test (when run as non-root).
- Ok(nix::unistd::Group::from_gid(nix::unistd::Gid::current())?.unwrap())
- } else {
- match nix::unistd::Group::from_name(BACKUP_GROUP_NAME)? {
- Some(group) => Ok(group),
- None => bail!("Unable to lookup backup user."),
- }
- }
+ pbs_tools::sys::query_group(BACKUP_GROUP_NAME)?
+ .ok_or_else(|| format_err!("Unable to lookup '{}' group.", BACKUP_GROUP_NAME))
}
// Split
config::update_self_signed_cert(false)?;
- proxmox_backup::tools::create_run_dir()?;
+ proxmox_backup::server::create_run_dir()?;
proxmox_backup::rrd::create_rrdb_dir()?;
proxmox_backup::server::jobstate::create_jobstate_dir()?;
use pbs_buildcfg::configdir;
use pbs_systemd::time::{compute_next_event, parse_calendar_event};
+use pbs_tools::logrotate::LogRotate;
use proxmox_backup::api2::types::Authid;
use proxmox_backup::server;
zfs_pool_stats,
get_pool_from_dataset,
},
- logrotate::LogRotate,
};
use proxmox_backup::api2::pull::do_sync_job;
+++ /dev/null
-use std::ffi::OsStr;
-use std::os::unix::ffi::OsStrExt;
-use std::path::PathBuf;
-use std::sync::Arc;
-
-use anyhow::{bail, format_err, Error};
-use serde_json::{json, Value};
-
-use proxmox::api::{
- api,
- cli::{
- default_table_format_options, format_and_print_result_full, get_output_format,
- run_cli_command, CliCommand, CliCommandMap, CliEnvironment, ColumnConfig, OUTPUT_FORMAT,
- },
-};
-use pxar::accessor::aio::Accessor;
-use pxar::decoder::aio::Decoder;
-
-use pbs_api_types::CryptMode;
-use pbs_datastore::{CryptConfig, CATALOG_NAME};
-use pbs_datastore::backup_info::BackupDir;
-use pbs_datastore::catalog::{ArchiveEntry, CatalogReader, DirEntryAttribute};
-use pbs_datastore::dynamic_index::{BufferedDynamicReader, LocalDynamicReadAt};
-use pbs_datastore::index::IndexFile;
-use pbs_datastore::key_derivation::decrypt_key;
-use pbs_client::{BackupReader, RemoteChunkReader};
-use pbs_client::pxar::{create_zip, extract_sub_dir, extract_sub_dir_seq};
-use pbs_client::tools::{
- complete_group_or_snapshot, complete_repository, connect, extract_repository_from_value,
- key_source::{
- crypto_parameters_keep_fd, format_key_source, get_encryption_key_password, KEYFD_SCHEMA,
- KEYFILE_SCHEMA,
- },
- REPO_URL_SCHEMA,
-};
-
-use proxmox_backup::tools;
-
-mod proxmox_file_restore;
-use proxmox_file_restore::*;
-
-enum ExtractPath {
- ListArchives,
- Pxar(String, Vec<u8>),
- VM(String, Vec<u8>),
-}
-
-fn parse_path(path: String, base64: bool) -> Result<ExtractPath, Error> {
- let mut bytes = if base64 {
- base64::decode(&path)
- .map_err(|err| format_err!("Failed base64-decoding path '{}' - {}", path, err))?
- } else {
- path.into_bytes()
- };
-
- if bytes == b"/" {
- return Ok(ExtractPath::ListArchives);
- }
-
- while !bytes.is_empty() && bytes[0] == b'/' {
- bytes.remove(0);
- }
-
- let (file, path) = {
- let slash_pos = bytes.iter().position(|c| *c == b'/').unwrap_or(bytes.len());
- let path = bytes.split_off(slash_pos);
- let file = String::from_utf8(bytes)?;
- (file, path)
- };
-
- if file.ends_with(".pxar.didx") {
- Ok(ExtractPath::Pxar(file, path))
- } else if file.ends_with(".img.fidx") {
- Ok(ExtractPath::VM(file, path))
- } else {
- bail!("'{}' is not supported for file-restore", file);
- }
-}
-
-fn keyfile_path(param: &Value) -> Option<String> {
- if let Some(Value::String(keyfile)) = param.get("keyfile") {
- return Some(keyfile.to_owned());
- }
-
- if let Some(Value::Number(keyfd)) = param.get("keyfd") {
- return Some(format!("/dev/fd/{}", keyfd));
- }
-
- None
-}
-
-#[api(
- input: {
- properties: {
- repository: {
- schema: REPO_URL_SCHEMA,
- optional: true,
- },
- snapshot: {
- type: String,
- description: "Group/Snapshot path.",
- },
- "path": {
- description: "Path to restore. Directories will be restored as .zip files.",
- type: String,
- },
- "base64": {
- type: Boolean,
- description: "If set, 'path' will be interpreted as base64 encoded.",
- optional: true,
- default: false,
- },
- keyfile: {
- schema: KEYFILE_SCHEMA,
- optional: true,
- },
- "keyfd": {
- schema: KEYFD_SCHEMA,
- optional: true,
- },
- "crypt-mode": {
- type: CryptMode,
- optional: true,
- },
- "driver": {
- type: BlockDriverType,
- optional: true,
- },
- "output-format": {
- schema: OUTPUT_FORMAT,
- optional: true,
- },
- }
- },
- returns: {
- description: "A list of elements under the given path",
- type: Array,
- items: {
- type: ArchiveEntry,
- }
- }
-)]
-/// List a directory from a backup snapshot.
-async fn list(
- snapshot: String,
- path: String,
- base64: bool,
- param: Value,
-) -> Result<(), Error> {
- let repo = extract_repository_from_value(¶m)?;
- let snapshot: BackupDir = snapshot.parse()?;
- let path = parse_path(path, base64)?;
-
- let keyfile = keyfile_path(¶m);
- let crypto = crypto_parameters_keep_fd(¶m)?;
- let crypt_config = match crypto.enc_key {
- None => None,
- Some(ref key) => {
- let (key, _, _) =
- decrypt_key(&key.key, &get_encryption_key_password).map_err(|err| {
- eprintln!("{}", format_key_source(&key.source, "encryption"));
- err
- })?;
- Some(Arc::new(CryptConfig::new(key)?))
- }
- };
-
- let client = connect(&repo)?;
- let client = BackupReader::start(
- client,
- crypt_config.clone(),
- repo.store(),
- &snapshot.group().backup_type(),
- &snapshot.group().backup_id(),
- snapshot.backup_time(),
- true,
- )
- .await?;
-
- let (manifest, _) = client.download_manifest().await?;
- manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref))?;
-
- let result = match path {
- ExtractPath::ListArchives => {
- let mut entries = vec![];
- for file in manifest.files() {
- if !file.filename.ends_with(".pxar.didx") && !file.filename.ends_with(".img.fidx") {
- continue;
- }
- let path = format!("/{}", file.filename);
- let attr = if file.filename.ends_with(".pxar.didx") {
- // a pxar file is a file archive, so it's root is also a directory root
- Some(&DirEntryAttribute::Directory { start: 0 })
- } else {
- None
- };
- entries.push(ArchiveEntry::new_with_size(path.as_bytes(), attr, Some(file.size)));
- }
-
- Ok(entries)
- }
- ExtractPath::Pxar(file, mut path) => {
- let index = client
- .download_dynamic_index(&manifest, CATALOG_NAME)
- .await?;
- let most_used = index.find_most_used_chunks(8);
- let file_info = manifest.lookup_file_info(&CATALOG_NAME)?;
- let chunk_reader = RemoteChunkReader::new(
- client.clone(),
- crypt_config,
- file_info.chunk_crypt_mode(),
- most_used,
- );
- let reader = BufferedDynamicReader::new(index, chunk_reader);
- let mut catalog_reader = CatalogReader::new(reader);
-
- let mut fullpath = file.into_bytes();
- fullpath.append(&mut path);
-
- catalog_reader.list_dir_contents(&fullpath)
- }
- ExtractPath::VM(file, path) => {
- let details = SnapRestoreDetails {
- manifest,
- repo,
- snapshot,
- keyfile,
- };
- let driver: Option<BlockDriverType> = match param.get("driver") {
- Some(drv) => Some(serde_json::from_value(drv.clone())?),
- None => None,
- };
- data_list(driver, details, file, path).await
- }
- }?;
-
- let options = default_table_format_options()
- .sortby("type", false)
- .sortby("text", false)
- .column(ColumnConfig::new("type"))
- .column(ColumnConfig::new("text").header("name"))
- .column(ColumnConfig::new("mtime").header("last modified"))
- .column(ColumnConfig::new("size"));
-
- let output_format = get_output_format(¶m);
- format_and_print_result_full(
- &mut json!(result),
- &API_METHOD_LIST.returns,
- &output_format,
- &options,
- );
-
- Ok(())
-}
-
-#[api(
- input: {
- properties: {
- repository: {
- schema: REPO_URL_SCHEMA,
- optional: true,
- },
- snapshot: {
- type: String,
- description: "Group/Snapshot path.",
- },
- "path": {
- description: "Path to restore. Directories will be restored as .zip files if extracted to stdout.",
- type: String,
- },
- "base64": {
- type: Boolean,
- description: "If set, 'path' will be interpreted as base64 encoded.",
- optional: true,
- default: false,
- },
- target: {
- type: String,
- optional: true,
- description: "Target directory path. Use '-' to write to standard output.",
- },
- keyfile: {
- schema: KEYFILE_SCHEMA,
- optional: true,
- },
- "keyfd": {
- schema: KEYFD_SCHEMA,
- optional: true,
- },
- "crypt-mode": {
- type: CryptMode,
- optional: true,
- },
- verbose: {
- type: Boolean,
- description: "Print verbose information",
- optional: true,
- default: false,
- },
- "driver": {
- type: BlockDriverType,
- optional: true,
- },
- }
- }
-)]
-/// Restore files from a backup snapshot.
-async fn extract(
- snapshot: String,
- path: String,
- base64: bool,
- target: Option<String>,
- verbose: bool,
- param: Value,
-) -> Result<(), Error> {
- let repo = extract_repository_from_value(¶m)?;
- let snapshot: BackupDir = snapshot.parse()?;
- let orig_path = path;
- let path = parse_path(orig_path.clone(), base64)?;
-
- let target = match target {
- Some(target) if target == "-" => None,
- Some(target) => Some(PathBuf::from(target)),
- None => Some(std::env::current_dir()?),
- };
-
- let keyfile = keyfile_path(¶m);
- let crypto = crypto_parameters_keep_fd(¶m)?;
- let crypt_config = match crypto.enc_key {
- None => None,
- Some(ref key) => {
- let (key, _, _) =
- decrypt_key(&key.key, &get_encryption_key_password).map_err(|err| {
- eprintln!("{}", format_key_source(&key.source, "encryption"));
- err
- })?;
- Some(Arc::new(CryptConfig::new(key)?))
- }
- };
-
- let client = connect(&repo)?;
- let client = BackupReader::start(
- client,
- crypt_config.clone(),
- repo.store(),
- &snapshot.group().backup_type(),
- &snapshot.group().backup_id(),
- snapshot.backup_time(),
- true,
- )
- .await?;
- let (manifest, _) = client.download_manifest().await?;
-
- match path {
- ExtractPath::Pxar(archive_name, path) => {
- let file_info = manifest.lookup_file_info(&archive_name)?;
- let index = client
- .download_dynamic_index(&manifest, &archive_name)
- .await?;
- let most_used = index.find_most_used_chunks(8);
- let chunk_reader = RemoteChunkReader::new(
- client.clone(),
- crypt_config,
- file_info.chunk_crypt_mode(),
- most_used,
- );
- let reader = BufferedDynamicReader::new(index, chunk_reader);
-
- let archive_size = reader.archive_size();
- let reader = LocalDynamicReadAt::new(reader);
- let decoder = Accessor::new(reader, archive_size).await?;
- extract_to_target(decoder, &path, target, verbose).await?;
- }
- ExtractPath::VM(file, path) => {
- let details = SnapRestoreDetails {
- manifest,
- repo,
- snapshot,
- keyfile,
- };
- let driver: Option<BlockDriverType> = match param.get("driver") {
- Some(drv) => Some(serde_json::from_value(drv.clone())?),
- None => None,
- };
-
- if let Some(mut target) = target {
- let reader = data_extract(driver, details, file, path.clone(), true).await?;
- let decoder = Decoder::from_tokio(reader).await?;
- extract_sub_dir_seq(&target, decoder, verbose).await?;
-
- // we extracted a .pxarexclude-cli file auto-generated by the VM when encoding the
- // archive, this file is of no use for the user, so try to remove it
- target.push(".pxarexclude-cli");
- std::fs::remove_file(target).map_err(|e| {
- format_err!("unable to remove temporary .pxarexclude-cli file - {}", e)
- })?;
- } else {
- let mut reader = data_extract(driver, details, file, path.clone(), false).await?;
- tokio::io::copy(&mut reader, &mut tokio::io::stdout()).await?;
- }
- }
- _ => {
- bail!("cannot extract '{}'", orig_path);
- }
- }
-
- Ok(())
-}
-
-async fn extract_to_target<T>(
- decoder: Accessor<T>,
- path: &[u8],
- target: Option<PathBuf>,
- verbose: bool,
-) -> Result<(), Error>
-where
- T: pxar::accessor::ReadAt + Clone + Send + Sync + Unpin + 'static,
-{
- let path = if path.is_empty() { b"/" } else { path };
-
- let root = decoder.open_root().await?;
- let file = root
- .lookup(OsStr::from_bytes(path))
- .await?
- .ok_or_else(|| format_err!("error opening '{:?}'", path))?;
-
- if let Some(target) = target {
- extract_sub_dir(target, decoder, OsStr::from_bytes(path), verbose).await?;
- } else {
- match file.kind() {
- pxar::EntryKind::File { .. } => {
- tokio::io::copy(&mut file.contents().await?, &mut tokio::io::stdout()).await?;
- }
- _ => {
- create_zip(
- tokio::io::stdout(),
- decoder,
- OsStr::from_bytes(path),
- verbose,
- )
- .await?;
- }
- }
- }
-
- Ok(())
-}
-
-fn main() {
- let list_cmd_def = CliCommand::new(&API_METHOD_LIST)
- .arg_param(&["snapshot", "path"])
- .completion_cb("repository", complete_repository)
- .completion_cb("snapshot", complete_group_or_snapshot);
-
- let restore_cmd_def = CliCommand::new(&API_METHOD_EXTRACT)
- .arg_param(&["snapshot", "path", "target"])
- .completion_cb("repository", complete_repository)
- .completion_cb("snapshot", complete_group_or_snapshot)
- .completion_cb("target", pbs_tools::fs::complete_file_name);
-
- let status_cmd_def = CliCommand::new(&API_METHOD_STATUS);
- let stop_cmd_def = CliCommand::new(&API_METHOD_STOP)
- .arg_param(&["name"])
- .completion_cb("name", complete_block_driver_ids);
-
- let cmd_def = CliCommandMap::new()
- .insert("list", list_cmd_def)
- .insert("extract", restore_cmd_def)
- .insert("status", status_cmd_def)
- .insert("stop", stop_cmd_def);
-
- let rpcenv = CliEnvironment::new();
- run_cli_command(
- cmd_def,
- rpcenv,
- Some(|future| pbs_runtime::main(future)),
- );
-}
-
-/// Returns a runtime dir owned by the current user.
-/// Note that XDG_RUNTIME_DIR is not always available, especially for non-login users like
-/// "www-data", so we use a custom one in /run/proxmox-backup/<uid> instead.
-pub fn get_user_run_dir() -> Result<std::path::PathBuf, Error> {
- let uid = nix::unistd::Uid::current();
- let mut path: std::path::PathBuf = pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR.into();
- path.push(uid.to_string());
- tools::create_run_dir()?;
- std::fs::create_dir_all(&path)?;
- Ok(path)
-}
+++ /dev/null
-//! Abstraction layer over different methods of accessing a block backup
-use std::collections::HashMap;
-use std::future::Future;
-use std::hash::BuildHasher;
-use std::pin::Pin;
-
-use anyhow::{bail, Error};
-use serde::{Deserialize, Serialize};
-use serde_json::{json, Value};
-
-use proxmox::api::{api, cli::*};
-
-use pbs_client::BackupRepository;
-use pbs_datastore::backup_info::BackupDir;
-use pbs_datastore::catalog::ArchiveEntry;
-use pbs_datastore::manifest::BackupManifest;
-
-use super::block_driver_qemu::QemuBlockDriver;
-
-/// Contains details about a snapshot that is to be accessed by block file restore
-pub struct SnapRestoreDetails {
- pub repo: BackupRepository,
- pub snapshot: BackupDir,
- pub manifest: BackupManifest,
- pub keyfile: Option<String>,
-}
-
-/// Return value of a BlockRestoreDriver.status() call, 'id' must be valid for .stop(id)
-pub struct DriverStatus {
- pub id: String,
- pub data: Value,
-}
-
-pub type Async<R> = Pin<Box<dyn Future<Output = R> + Send>>;
-
-/// An abstract implementation for retrieving data out of a block file backup
-pub trait BlockRestoreDriver {
- /// List ArchiveEntrys for the given image file and path
- fn data_list(
- &self,
- details: SnapRestoreDetails,
- img_file: String,
- path: Vec<u8>,
- ) -> Async<Result<Vec<ArchiveEntry>, Error>>;
-
- /// pxar=true:
- /// Attempt to create a pxar archive of the given file path and return a reader instance for it
- /// pxar=false:
- /// Attempt to read the file or folder at the given path and return the file content or a zip
- /// file as a stream
- fn data_extract(
- &self,
- details: SnapRestoreDetails,
- img_file: String,
- path: Vec<u8>,
- pxar: bool,
- ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>>;
-
- /// Return status of all running/mapped images, result value is (id, extra data), where id must
- /// match with the ones returned from list()
- fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>>;
- /// Stop/Close a running restore method
- fn stop(&self, id: String) -> Async<Result<(), Error>>;
- /// Returned ids must be prefixed with driver type so that they cannot collide between drivers,
- /// the returned values must be passable to stop()
- fn list(&self) -> Vec<String>;
-}
-
-#[api()]
-#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)]
-pub enum BlockDriverType {
- /// Uses a small QEMU/KVM virtual machine to map images securely. Requires PVE-patched QEMU.
- Qemu,
-}
-
-impl BlockDriverType {
- fn resolve(&self) -> impl BlockRestoreDriver {
- match self {
- BlockDriverType::Qemu => QemuBlockDriver {},
- }
- }
-}
-
-const DEFAULT_DRIVER: BlockDriverType = BlockDriverType::Qemu;
-const ALL_DRIVERS: &[BlockDriverType] = &[BlockDriverType::Qemu];
-
-pub async fn data_list(
- driver: Option<BlockDriverType>,
- details: SnapRestoreDetails,
- img_file: String,
- path: Vec<u8>,
-) -> Result<Vec<ArchiveEntry>, Error> {
- let driver = driver.unwrap_or(DEFAULT_DRIVER).resolve();
- driver.data_list(details, img_file, path).await
-}
-
-pub async fn data_extract(
- driver: Option<BlockDriverType>,
- details: SnapRestoreDetails,
- img_file: String,
- path: Vec<u8>,
- pxar: bool,
-) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>, Error> {
- let driver = driver.unwrap_or(DEFAULT_DRIVER).resolve();
- driver.data_extract(details, img_file, path, pxar).await
-}
-
-#[api(
- input: {
- properties: {
- "driver": {
- type: BlockDriverType,
- optional: true,
- },
- "output-format": {
- schema: OUTPUT_FORMAT,
- optional: true,
- },
- },
- },
-)]
-/// Retrieve status information about currently running/mapped restore images
-pub async fn status(driver: Option<BlockDriverType>, param: Value) -> Result<(), Error> {
- let output_format = get_output_format(¶m);
- let text = output_format == "text";
-
- let mut ret = json!({});
-
- for dt in ALL_DRIVERS {
- if driver.is_some() && &driver.unwrap() != dt {
- continue;
- }
-
- let drv_name = format!("{:?}", dt);
- let drv = dt.resolve();
- match drv.status().await {
- Ok(data) if data.is_empty() => {
- if text {
- println!("{}: no mappings", drv_name);
- } else {
- ret[drv_name] = json!({});
- }
- }
- Ok(data) => {
- if text {
- println!("{}:", &drv_name);
- }
-
- ret[&drv_name]["ids"] = json!({});
- for status in data {
- if text {
- println!("{} \t({})", status.id, status.data);
- } else {
- ret[&drv_name]["ids"][status.id] = status.data;
- }
- }
- }
- Err(err) => {
- if text {
- eprintln!("error getting status from driver '{}' - {}", drv_name, err);
- } else {
- ret[drv_name] = json!({ "error": format!("{}", err) });
- }
- }
- }
- }
-
- if !text {
- format_and_print_result(&ret, &output_format);
- }
-
- Ok(())
-}
-
-#[api(
- input: {
- properties: {
- "name": {
- type: String,
- description: "The name of the VM to stop.",
- },
- },
- },
-)]
-/// Immediately stop/unmap a given image. Not typically necessary, as VMs will stop themselves
-/// after a timer anyway.
-pub async fn stop(name: String) -> Result<(), Error> {
- for drv in ALL_DRIVERS.iter().map(BlockDriverType::resolve) {
- if drv.list().contains(&name) {
- return drv.stop(name).await;
- }
- }
-
- bail!("no mapping with name '{}' found", name);
-}
-
-/// Autocompletion handler for block mappings
-pub fn complete_block_driver_ids<S: BuildHasher>(
- _arg: &str,
- _param: &HashMap<String, String, S>,
-) -> Vec<String> {
- ALL_DRIVERS
- .iter()
- .map(BlockDriverType::resolve)
- .map(|d| d.list())
- .flatten()
- .collect()
-}
+++ /dev/null
-//! Block file access via a small QEMU restore VM using the PBS block driver in QEMU
-use std::collections::HashMap;
-use std::fs::{File, OpenOptions};
-use std::io::{prelude::*, SeekFrom};
-
-use anyhow::{bail, Error};
-use futures::FutureExt;
-use serde::{Deserialize, Serialize};
-use serde_json::json;
-
-use proxmox::tools::fs::lock_file;
-
-use pbs_client::{DEFAULT_VSOCK_PORT, BackupRepository, VsockClient};
-use pbs_datastore::backup_info::BackupDir;
-use pbs_datastore::catalog::ArchiveEntry;
-
-use super::block_driver::*;
-use crate::get_user_run_dir;
-
-const RESTORE_VM_MAP: &str = "restore-vm-map.json";
-
-pub struct QemuBlockDriver {}
-
-#[derive(Clone, Hash, Serialize, Deserialize)]
-struct VMState {
- pid: i32,
- cid: i32,
- ticket: String,
-}
-
-struct VMStateMap {
- map: HashMap<String, VMState>,
- file: File,
-}
-
-impl VMStateMap {
- fn open_file_raw(write: bool) -> Result<File, Error> {
- use std::os::unix::fs::OpenOptionsExt;
- let mut path = get_user_run_dir()?;
- path.push(RESTORE_VM_MAP);
- OpenOptions::new()
- .read(true)
- .write(write)
- .create(write)
- .mode(0o600)
- .open(path)
- .map_err(Error::from)
- }
-
- /// Acquire a lock on the state map and retrieve a deserialized version
- fn load() -> Result<Self, Error> {
- let mut file = Self::open_file_raw(true)?;
- lock_file(&mut file, true, Some(std::time::Duration::from_secs(120)))?;
- let map = serde_json::from_reader(&file).unwrap_or_default();
- Ok(Self { map, file })
- }
-
- /// Load a read-only copy of the current VM map. Only use for informational purposes, like
- /// shell auto-completion, for anything requiring consistency use load() !
- fn load_read_only() -> Result<HashMap<String, VMState>, Error> {
- let file = Self::open_file_raw(false)?;
- Ok(serde_json::from_reader(&file).unwrap_or_default())
- }
-
- /// Write back a potentially modified state map, consuming the held lock
- fn write(mut self) -> Result<(), Error> {
- self.file.seek(SeekFrom::Start(0))?;
- self.file.set_len(0)?;
- serde_json::to_writer(self.file, &self.map)?;
-
- // drop ourselves including file lock
- Ok(())
- }
-
- /// Return the map, but drop the lock immediately
- fn read_only(self) -> HashMap<String, VMState> {
- self.map
- }
-}
-
-fn make_name(repo: &BackupRepository, snap: &BackupDir) -> String {
- let full = format!("qemu_{}/{}", repo, snap);
- pbs_systemd::escape_unit(&full, false)
-}
-
-/// remove non-responsive VMs from given map, returns 'true' if map was modified
-async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool {
- let mut to_remove = Vec::new();
- for (name, state) in map.iter() {
- let client = VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
- let res = client
- .get("api2/json/status", Some(json!({"keep-timeout": true})))
- .await;
- if res.is_err() {
- // VM is not reachable, remove from map and inform user
- to_remove.push(name.clone());
- eprintln!(
- "VM '{}' (pid: {}, cid: {}) was not reachable, removing from map",
- name, state.pid, state.cid
- );
- let _ = super::qemu_helper::try_kill_vm(state.pid);
- }
- }
-
- for tr in &to_remove {
- map.remove(tr);
- }
-
- !to_remove.is_empty()
-}
-
-fn new_ticket() -> String {
- proxmox::tools::Uuid::generate().to_string()
-}
-
-async fn ensure_running(details: &SnapRestoreDetails) -> Result<VsockClient, Error> {
- let name = make_name(&details.repo, &details.snapshot);
- let mut state = VMStateMap::load()?;
-
- cleanup_map(&mut state.map).await;
-
- let new_cid;
- let vms = match state.map.get(&name) {
- Some(vm) => {
- let client = VsockClient::new(vm.cid, DEFAULT_VSOCK_PORT, Some(vm.ticket.clone()));
- let res = client.get("api2/json/status", None).await;
- match res {
- Ok(_) => {
- // VM is running and we just reset its timeout, nothing to do
- return Ok(client);
- }
- Err(err) => {
- eprintln!("stale VM detected, restarting ({})", err);
- // VM is dead, restart
- let _ = super::qemu_helper::try_kill_vm(vm.pid);
- let vms = start_vm(vm.cid, details).await?;
- new_cid = vms.cid;
- state.map.insert(name, vms.clone());
- vms
- }
- }
- }
- None => {
- let mut cid = state
- .map
- .iter()
- .map(|v| v.1.cid)
- .max()
- .unwrap_or(0)
- .wrapping_add(1);
-
- // offset cid by user id, to avoid unneccessary retries
- let running_uid = nix::unistd::Uid::current();
- cid = cid.wrapping_add(running_uid.as_raw() as i32);
-
- // some low CIDs have special meaning, start at 10 to avoid them
- cid = cid.max(10);
-
- let vms = start_vm(cid, details).await?;
- new_cid = vms.cid;
- state.map.insert(name, vms.clone());
- vms
- }
- };
-
- state.write()?;
- Ok(VsockClient::new(
- new_cid,
- DEFAULT_VSOCK_PORT,
- Some(vms.ticket.clone()),
- ))
-}
-
-async fn start_vm(cid_request: i32, details: &SnapRestoreDetails) -> Result<VMState, Error> {
- let ticket = new_ticket();
- let files = details
- .manifest
- .files()
- .iter()
- .map(|file| file.filename.clone())
- .filter(|name| name.ends_with(".img.fidx"));
- let (pid, cid) =
- super::qemu_helper::start_vm((cid_request.abs() & 0xFFFF) as u16, details, files, &ticket)
- .await?;
- Ok(VMState { pid, cid, ticket })
-}
-
-impl BlockRestoreDriver for QemuBlockDriver {
- fn data_list(
- &self,
- details: SnapRestoreDetails,
- img_file: String,
- mut path: Vec<u8>,
- ) -> Async<Result<Vec<ArchiveEntry>, Error>> {
- async move {
- let client = ensure_running(&details).await?;
- if !path.is_empty() && path[0] != b'/' {
- path.insert(0, b'/');
- }
- let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
- let mut result = client
- .get("api2/json/list", Some(json!({ "path": path })))
- .await?;
- serde_json::from_value(result["data"].take()).map_err(|err| err.into())
- }
- .boxed()
- }
-
- fn data_extract(
- &self,
- details: SnapRestoreDetails,
- img_file: String,
- mut path: Vec<u8>,
- pxar: bool,
- ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> {
- async move {
- let client = ensure_running(&details).await?;
- if !path.is_empty() && path[0] != b'/' {
- path.insert(0, b'/');
- }
- let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
- let (mut tx, rx) = tokio::io::duplex(1024 * 4096);
- tokio::spawn(async move {
- if let Err(err) = client
- .download(
- "api2/json/extract",
- Some(json!({ "path": path, "pxar": pxar })),
- &mut tx,
- )
- .await
- {
- eprintln!("reading file extraction stream failed - {}", err);
- std::process::exit(1);
- }
- });
-
- Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
- }
- .boxed()
- }
-
- fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> {
- async move {
- let mut state_map = VMStateMap::load()?;
- let modified = cleanup_map(&mut state_map.map).await;
- let map = if modified {
- let m = state_map.map.clone();
- state_map.write()?;
- m
- } else {
- state_map.read_only()
- };
- let mut result = Vec::new();
-
- for (n, s) in map.iter() {
- let client = VsockClient::new(s.cid, DEFAULT_VSOCK_PORT, Some(s.ticket.clone()));
- let resp = client
- .get("api2/json/status", Some(json!({"keep-timeout": true})))
- .await;
- let name = pbs_systemd::unescape_unit(n)
- .unwrap_or_else(|_| "<invalid name>".to_owned());
- let mut extra = json!({"pid": s.pid, "cid": s.cid});
-
- match resp {
- Ok(status) => match status["data"].as_object() {
- Some(map) => {
- for (k, v) in map.iter() {
- extra[k] = v.clone();
- }
- }
- None => {
- let err = format!(
- "invalid JSON received from /status call: {}",
- status.to_string()
- );
- extra["error"] = json!(err);
- }
- },
- Err(err) => {
- let err = format!("error during /status API call: {}", err);
- extra["error"] = json!(err);
- }
- }
-
- result.push(DriverStatus {
- id: name,
- data: extra,
- });
- }
-
- Ok(result)
- }
- .boxed()
- }
-
- fn stop(&self, id: String) -> Async<Result<(), Error>> {
- async move {
- let name = pbs_systemd::escape_unit(&id, false);
- let mut map = VMStateMap::load()?;
- let map_mod = cleanup_map(&mut map.map).await;
- match map.map.get(&name) {
- Some(state) => {
- let client =
- VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
- // ignore errors, this either fails because:
- // * the VM is unreachable/dead, in which case we don't want it in the map
- // * the call was successful and the connection reset when the VM stopped
- let _ = client.get("api2/json/stop", None).await;
- map.map.remove(&name);
- map.write()?;
- }
- None => {
- if map_mod {
- map.write()?;
- }
- bail!("VM with name '{}' not found", name);
- }
- }
- Ok(())
- }
- .boxed()
- }
-
- fn list(&self) -> Vec<String> {
- match VMStateMap::load_read_only() {
- Ok(state) => state
- .iter()
- .filter_map(|(name, _)| pbs_systemd::unescape_unit(&name).ok())
- .collect(),
- Err(_) => Vec::new(),
- }
- }
-}
+++ /dev/null
-//! Block device drivers and tools for single file restore
-pub mod block_driver;
-pub use block_driver::*;
-
-mod qemu_helper;
-mod block_driver_qemu;
+++ /dev/null
-//! Helper to start a QEMU VM for single file restore.
-use std::fs::{File, OpenOptions};
-use std::io::prelude::*;
-use std::os::unix::io::AsRawFd;
-use std::path::PathBuf;
-use std::time::Duration;
-
-use anyhow::{bail, format_err, Error};
-use tokio::time;
-
-use nix::sys::signal::{kill, Signal};
-use nix::unistd::Pid;
-
-use proxmox::tools::fs::{create_path, file_read_string, make_tmp_file, CreateOptions};
-
-use pbs_client::{VsockClient, DEFAULT_VSOCK_PORT};
-
-use proxmox_backup::backup::backup_user;
-use proxmox_backup::tools;
-
-use super::SnapRestoreDetails;
-
-const PBS_VM_NAME: &str = "pbs-restore-vm";
-const MAX_CID_TRIES: u64 = 32;
-
-fn create_restore_log_dir() -> Result<String, Error> {
- let logpath = format!("{}/file-restore", pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR);
-
- proxmox::try_block!({
- let backup_user = backup_user()?;
- let opts = CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid);
-
- let opts_root = CreateOptions::new()
- .owner(nix::unistd::ROOT)
- .group(nix::unistd::Gid::from_raw(0));
-
- create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts))?;
- create_path(&logpath, None, Some(opts_root))?;
- Ok(())
- })
- .map_err(|err: Error| format_err!("unable to create file-restore log dir - {}", err))?;
-
- Ok(logpath)
-}
-
-fn validate_img_existance(debug: bool) -> Result<(), Error> {
- let kernel = PathBuf::from(pbs_buildcfg::PROXMOX_BACKUP_KERNEL_FN);
- let initramfs = PathBuf::from(if debug {
- pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_DBG_FN
- } else {
- pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_FN
- });
- if !kernel.exists() || !initramfs.exists() {
- bail!("cannot run file-restore VM: package 'proxmox-backup-restore-image' is not (correctly) installed");
- }
- Ok(())
-}
-
-pub fn try_kill_vm(pid: i32) -> Result<(), Error> {
- let pid = Pid::from_raw(pid);
- if let Ok(()) = kill(pid, None) {
- // process is running (and we could kill it), check if it is actually ours
- // (if it errors assume we raced with the process's death and ignore it)
- if let Ok(cmdline) = file_read_string(format!("/proc/{}/cmdline", pid)) {
- if cmdline.split('\0').any(|a| a == PBS_VM_NAME) {
- // yes, it's ours, kill it brutally with SIGKILL, no reason to take
- // any chances - in this state it's most likely broken anyway
- if let Err(err) = kill(pid, Signal::SIGKILL) {
- bail!(
- "reaping broken VM (pid {}) with SIGKILL failed: {}",
- pid,
- err
- );
- }
- }
- }
- }
-
- Ok(())
-}
-
-async fn create_temp_initramfs(ticket: &str, debug: bool) -> Result<(File, String), Error> {
- use std::ffi::CString;
- use tokio::fs::File;
-
- let (tmp_file, tmp_path) =
- make_tmp_file("/tmp/file-restore-qemu.initramfs.tmp", CreateOptions::new())?;
- nix::unistd::unlink(&tmp_path)?;
- tools::fd_change_cloexec(tmp_file.as_raw_fd(), false)?;
-
- let initramfs = if debug {
- pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_DBG_FN
- } else {
- pbs_buildcfg::PROXMOX_BACKUP_INITRAMFS_FN
- };
-
- let mut f = File::from_std(tmp_file);
- let mut base = File::open(initramfs).await?;
-
- tokio::io::copy(&mut base, &mut f).await?;
-
- let name = CString::new("ticket").unwrap();
- tools::cpio::append_file(
- &mut f,
- ticket.as_bytes(),
- &name,
- 0,
- (libc::S_IFREG | 0o400) as u16,
- 0,
- 0,
- 0,
- ticket.len() as u32,
- )
- .await?;
- tools::cpio::append_trailer(&mut f).await?;
-
- let tmp_file = f.into_std().await;
- let path = format!("/dev/fd/{}", &tmp_file.as_raw_fd());
-
- Ok((tmp_file, path))
-}
-
-pub async fn start_vm(
- // u16 so we can do wrapping_add without going too high
- mut cid: u16,
- details: &SnapRestoreDetails,
- files: impl Iterator<Item = String>,
- ticket: &str,
-) -> Result<(i32, i32), Error> {
- if let Err(_) = std::env::var("PBS_PASSWORD") {
- bail!("environment variable PBS_PASSWORD has to be set for QEMU VM restore");
- }
-
- let debug = if let Ok(val) = std::env::var("PBS_QEMU_DEBUG") {
- !val.is_empty()
- } else {
- false
- };
-
- validate_img_existance(debug)?;
-
- let pid;
- let (mut pid_file, pid_path) = make_tmp_file("/tmp/file-restore-qemu.pid.tmp", CreateOptions::new())?;
- nix::unistd::unlink(&pid_path)?;
- tools::fd_change_cloexec(pid_file.as_raw_fd(), false)?;
-
- let (_ramfs_pid, ramfs_path) = create_temp_initramfs(ticket, debug).await?;
-
- let logpath = create_restore_log_dir()?;
- let logfile = &format!("{}/qemu.log", logpath);
- let mut logrotate = tools::logrotate::LogRotate::new(logfile, false)
- .ok_or_else(|| format_err!("could not get QEMU log file names"))?;
-
- if let Err(err) = logrotate.do_rotate(CreateOptions::default(), Some(16)) {
- eprintln!("warning: logrotate for QEMU log file failed - {}", err);
- }
-
- let mut logfd = OpenOptions::new()
- .append(true)
- .create_new(true)
- .open(logfile)?;
- tools::fd_change_cloexec(logfd.as_raw_fd(), false)?;
-
- // preface log file with start timestamp so one can see how long QEMU took to start
- writeln!(logfd, "[{}] PBS file restore VM log", {
- let now = proxmox::tools::time::epoch_i64();
- proxmox::tools::time::epoch_to_rfc3339(now)?
- },)?;
-
- let base_args = [
- "-chardev",
- &format!(
- "file,id=log,path=/dev/null,logfile=/dev/fd/{},logappend=on",
- logfd.as_raw_fd()
- ),
- "-serial",
- "chardev:log",
- "-vnc",
- "none",
- "-enable-kvm",
- "-kernel",
- pbs_buildcfg::PROXMOX_BACKUP_KERNEL_FN,
- "-initrd",
- &ramfs_path,
- "-append",
- &format!(
- "{} panic=1 zfs_arc_min=0 zfs_arc_max=0",
- if debug { "debug" } else { "quiet" }
- ),
- "-daemonize",
- "-pidfile",
- &format!("/dev/fd/{}", pid_file.as_raw_fd()),
- "-name",
- PBS_VM_NAME,
- ];
-
- // Generate drive arguments for all fidx files in backup snapshot
- let mut drives = Vec::new();
- let mut id = 0;
- for file in files {
- if !file.ends_with(".img.fidx") {
- continue;
- }
- drives.push("-drive".to_owned());
- let keyfile = if let Some(ref keyfile) = details.keyfile {
- format!(",,keyfile={}", keyfile)
- } else {
- "".to_owned()
- };
- drives.push(format!(
- "file=pbs:repository={},,snapshot={},,archive={}{},read-only=on,if=none,id=drive{}",
- details.repo, details.snapshot, file, keyfile, id
- ));
-
- // a PCI bus can only support 32 devices, so add a new one every 32
- let bus = (id / 32) + 2;
- if id % 32 == 0 {
- drives.push("-device".to_owned());
- drives.push(format!("pci-bridge,id=bridge{},chassis_nr={}", bus, bus));
- }
-
- drives.push("-device".to_owned());
- // drive serial is used by VM to map .fidx files to /dev paths
- let serial = file.strip_suffix(".img.fidx").unwrap_or(&file);
- drives.push(format!(
- "virtio-blk-pci,drive=drive{},serial={},bus=bridge{}",
- id, serial, bus
- ));
- id += 1;
- }
-
- let ram = if debug {
- 1024
- } else {
- // add more RAM if many drives are given
- match id {
- f if f < 10 => 192,
- f if f < 20 => 256,
- _ => 384,
- }
- };
-
- // Try starting QEMU in a loop to retry if we fail because of a bad 'cid' value
- let mut attempts = 0;
- loop {
- let mut qemu_cmd = std::process::Command::new("qemu-system-x86_64");
- qemu_cmd.args(base_args.iter());
- qemu_cmd.arg("-m");
- qemu_cmd.arg(ram.to_string());
- qemu_cmd.args(&drives);
- qemu_cmd.arg("-device");
- qemu_cmd.arg(format!(
- "vhost-vsock-pci,guest-cid={},disable-legacy=on",
- cid
- ));
-
- if debug {
- let debug_args = [
- "-chardev",
- &format!(
- "socket,id=debugser,path=/run/proxmox-backup/file-restore-serial-{}.sock,server,nowait",
- cid
- ),
- "-serial",
- "chardev:debugser",
- ];
- qemu_cmd.args(debug_args.iter());
- }
-
- qemu_cmd.stdout(std::process::Stdio::null());
- qemu_cmd.stderr(std::process::Stdio::piped());
-
- let res = tokio::task::block_in_place(|| qemu_cmd.spawn()?.wait_with_output())?;
-
- if res.status.success() {
- // at this point QEMU is already daemonized and running, so if anything fails we
- // technically leave behind a zombie-VM... this shouldn't matter, as it will stop
- // itself soon enough (timer), and the following operations are unlikely to fail
- let mut pidstr = String::new();
- pid_file.read_to_string(&mut pidstr)?;
- pid = pidstr.trim_end().parse().map_err(|err| {
- format_err!("cannot parse PID returned by QEMU ('{}'): {}", &pidstr, err)
- })?;
- break;
- } else {
- let out = String::from_utf8_lossy(&res.stderr);
- if out.contains("unable to set guest cid: Address already in use") {
- attempts += 1;
- if attempts >= MAX_CID_TRIES {
- bail!("CID '{}' in use, but max attempts reached, aborting", cid);
- }
- // CID in use, try next higher one
- eprintln!("CID '{}' in use by other VM, attempting next one", cid);
- // skip special-meaning low values
- cid = cid.wrapping_add(1).max(10);
- } else {
- eprint!("{}", out);
- bail!("Starting VM failed. See output above for more information.");
- }
- }
- }
-
- // QEMU has started successfully, now wait for virtio socket to become ready
- let pid_t = Pid::from_raw(pid);
- for _ in 0..60 {
- let client = VsockClient::new(cid as i32, DEFAULT_VSOCK_PORT, Some(ticket.to_owned()));
- if let Ok(Ok(_)) =
- time::timeout(Duration::from_secs(2), client.get("api2/json/status", None)).await
- {
- if debug {
- eprintln!(
- "Connect to '/run/proxmox-backup/file-restore-serial-{}.sock' for shell access",
- cid
- )
- }
- return Ok((pid, cid as i32));
- }
- if kill(pid_t, None).is_err() { // check if QEMU process exited in between
- bail!("VM exited before connection could be established");
- }
- time::sleep(Duration::from_millis(200)).await;
- }
-
- // start failed
- if let Err(err) = try_kill_vm(pid) {
- eprintln!("killing failed VM failed: {}", err);
- }
- bail!("starting VM timed out");
-}
/// Load the user's current challenges with the intent to create a challenge (create the file
/// if it does not exist), and keep a lock on the file.
fn open(userid: &Userid) -> Result<Self, Error> {
- crate::tools::create_run_dir()?;
+ crate::server::create_run_dir()?;
let options = CreateOptions::new().perm(Mode::from_bits_truncate(0o0600));
proxmox::tools::fs::create_path(CHALLENGE_DATA_PATH, Some(options.clone()), Some(options))
.map_err(|err| {
use serde_json::Value;
use proxmox::sys::linux::procfs::PidStat;
+use proxmox::tools::fs::{create_path, CreateOptions};
use pbs_buildcfg;
pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
let pid_str = format!("{}\n", *PID);
- let opts = proxmox::tools::fs::CreateOptions::new();
- proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), opts)
+ proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
}
pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
.await?;
Ok(())
}
+
+/// Create the base run-directory.
+///
+/// This exists to fixate the permissions for the run *base* directory while allowing intermediate
+/// directories after it to have different permissions.
+pub fn create_run_dir() -> Result<(), Error> {
+ let backup_user = crate::backup::backup_user()?;
+ let opts = CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+ let _: bool = create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), None, Some(opts))?;
+ Ok(())
+}
use proxmox::try_block;
use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
-use super::{UPID, UPIDExt};
-
use pbs_buildcfg;
+use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
+
+use super::{UPID, UPIDExt};
use crate::server;
-use crate::tools::logrotate::{LogRotate, LogRotateFiles};
use crate::tools::{FileLogger, FileLogOptions};
use crate::api2::types::{Authid, TaskStateType};
use crate::backup::{open_backup_lockfile, BackupLockGuard};
+++ /dev/null
-//! Provides a very basic "newc" format cpio encoder.
-//! See 'man 5 cpio' for format details, as well as:
-//! https://www.kernel.org/doc/html/latest/driver-api/early-userspace/buffer-format.html
-//! This does not provide full support for the format, only what is needed to include files in an
-//! initramfs intended for a linux kernel.
-use anyhow::{bail, Error};
-use std::ffi::{CString, CStr};
-use tokio::io::{copy, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-
-/// Write a cpio file entry to an AsyncWrite.
-pub async fn append_file<W: AsyncWrite + Unpin, R: AsyncRead + Unpin>(
- mut target: W,
- content: R,
- name: &CStr,
- inode: u16,
- mode: u16,
- uid: u16,
- gid: u16,
- // negative mtimes are generally valid, but cpio defines all fields as unsigned
- mtime: u64,
- // c_filesize has 8 bytes, but man page claims that 4 GB files are the maximum, let's be safe
- size: u32,
-) -> Result<(), Error> {
- let name = name.to_bytes_with_nul();
-
- target.write_all(b"070701").await?; // c_magic
- print_cpio_hex(&mut target, inode as u64).await?; // c_ino
- print_cpio_hex(&mut target, mode as u64).await?; // c_mode
- print_cpio_hex(&mut target, uid as u64).await?; // c_uid
- print_cpio_hex(&mut target, gid as u64).await?; // c_gid
- print_cpio_hex(&mut target, 0).await?; // c_nlink
- print_cpio_hex(&mut target, mtime as u64).await?; // c_mtime
- print_cpio_hex(&mut target, size as u64).await?; // c_filesize
- print_cpio_hex(&mut target, 0).await?; // c_devmajor
- print_cpio_hex(&mut target, 0).await?; // c_devminor
- print_cpio_hex(&mut target, 0).await?; // c_rdevmajor
- print_cpio_hex(&mut target, 0).await?; // c_rdevminor
- print_cpio_hex(&mut target, name.len() as u64).await?; // c_namesize
- print_cpio_hex(&mut target, 0).await?; // c_check (ignored for newc)
-
- target.write_all(name).await?;
- let header_size = 6 + 8*13 + name.len();
- let mut name_pad = header_size;
- while name_pad & 3 != 0 {
- target.write_u8(0).await?;
- name_pad += 1;
- }
-
- let mut content = content.take(size as u64);
- let copied = copy(&mut content, &mut target).await?;
- if copied < size as u64 {
- bail!("cpio: not enough data, or size to big - encoding invalid");
- }
- let mut data_pad = copied;
- while data_pad & 3 != 0 {
- target.write_u8(0).await?;
- data_pad += 1;
- }
-
- Ok(())
-}
-
-/// Write the TRAILER!!! file to an AsyncWrite, signifying the end of a cpio archive. Note that you
-/// can immediately add more files after, to create a concatenated archive, the kernel for example
-/// will merge these upon loading an initramfs.
-pub async fn append_trailer<W: AsyncWrite + Unpin>(target: W) -> Result<(), Error> {
- let name = CString::new("TRAILER!!!").unwrap();
- append_file(target, tokio::io::empty(), &name, 0, 0, 0, 0, 0, 0).await
-}
-
-async fn print_cpio_hex<W: AsyncWrite + Unpin>(target: &mut W, value: u64) -> Result<(), Error> {
- target.write_all(format!("{:08x}", value).as_bytes()).await.map_err(|e| e.into())
-}
+++ /dev/null
-use std::path::{Path, PathBuf};
-use std::fs::{File, rename};
-use std::os::unix::io::{FromRawFd, IntoRawFd};
-use std::io::Read;
-
-use anyhow::{bail, Error};
-use nix::unistd;
-
-use proxmox::tools::fs::{CreateOptions, make_tmp_file};
-
-/// Used for rotating log files and iterating over them
-pub struct LogRotate {
- base_path: PathBuf,
- compress: bool,
-}
-
-impl LogRotate {
- /// Creates a new instance if the path given is a valid file name
- /// (iow. does not end with ..)
- /// 'compress' decides if compresses files will be created on
- /// rotation, and if it will search '.zst' files when iterating
- pub fn new<P: AsRef<Path>>(path: P, compress: bool) -> Option<Self> {
- if path.as_ref().file_name().is_some() {
- Some(Self {
- base_path: path.as_ref().to_path_buf(),
- compress,
- })
- } else {
- None
- }
- }
-
- /// Returns an iterator over the logrotated file names that exist
- pub fn file_names(&self) -> LogRotateFileNames {
- LogRotateFileNames {
- base_path: self.base_path.clone(),
- count: 0,
- compress: self.compress
- }
- }
-
- /// Returns an iterator over the logrotated file handles
- pub fn files(&self) -> LogRotateFiles {
- LogRotateFiles {
- file_names: self.file_names(),
- }
- }
-
- fn compress(source_path: &PathBuf, target_path: &PathBuf, options: &CreateOptions) -> Result<(), Error> {
- let mut source = File::open(source_path)?;
- let (fd, tmp_path) = make_tmp_file(target_path, options.clone())?;
- let target = unsafe { File::from_raw_fd(fd.into_raw_fd()) };
- let mut encoder = match zstd::stream::write::Encoder::new(target, 0) {
- Ok(encoder) => encoder,
- Err(err) => {
- let _ = unistd::unlink(&tmp_path);
- bail!("creating zstd encoder failed - {}", err);
- }
- };
-
- if let Err(err) = std::io::copy(&mut source, &mut encoder) {
- let _ = unistd::unlink(&tmp_path);
- bail!("zstd encoding failed for file {:?} - {}", target_path, err);
- }
-
- if let Err(err) = encoder.finish() {
- let _ = unistd::unlink(&tmp_path);
- bail!("zstd finish failed for file {:?} - {}", target_path, err);
- }
-
- if let Err(err) = rename(&tmp_path, target_path) {
- let _ = unistd::unlink(&tmp_path);
- bail!("rename failed for file {:?} - {}", target_path, err);
- }
-
- if let Err(err) = unistd::unlink(source_path) {
- bail!("unlink failed for file {:?} - {}", source_path, err);
- }
-
- Ok(())
- }
-
- /// Rotates the files up to 'max_files'
- /// if the 'compress' option was given it will compress the newest file
- ///
- /// e.g. rotates
- /// foo.2.zst => foo.3.zst
- /// foo.1 => foo.2.zst
- /// foo => foo.1
- pub fn do_rotate(&mut self, options: CreateOptions, max_files: Option<usize>) -> Result<(), Error> {
- let mut filenames: Vec<PathBuf> = self.file_names().collect();
- if filenames.is_empty() {
- return Ok(()); // no file means nothing to rotate
- }
- let count = filenames.len() + 1;
-
- let mut next_filename = self.base_path.clone().canonicalize()?.into_os_string();
- next_filename.push(format!(".{}", filenames.len()));
- if self.compress && count > 2 {
- next_filename.push(".zst");
- }
-
- filenames.push(PathBuf::from(next_filename));
-
- for i in (0..count-1).rev() {
- if self.compress
- && filenames[i].extension() != Some(std::ffi::OsStr::new("zst"))
- && filenames[i+1].extension() == Some(std::ffi::OsStr::new("zst"))
- {
- Self::compress(&filenames[i], &filenames[i+1], &options)?;
- } else {
- rename(&filenames[i], &filenames[i+1])?;
- }
- }
-
- if let Some(max_files) = max_files {
- for file in filenames.iter().skip(max_files) {
- if let Err(err) = unistd::unlink(file) {
- eprintln!("could not remove {:?}: {}", &file, err);
- }
- }
- }
-
- Ok(())
- }
-
- pub fn rotate(
- &mut self,
- max_size: u64,
- options: Option<CreateOptions>,
- max_files: Option<usize>
- ) -> Result<bool, Error> {
-
- let options = match options {
- Some(options) => options,
- None => {
- let backup_user = crate::backup::backup_user()?;
- CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)
- },
- };
-
- let metadata = match self.base_path.metadata() {
- Ok(metadata) => metadata,
- Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
- Err(err) => bail!("unable to open task archive - {}", err),
- };
-
- if metadata.len() > max_size {
- self.do_rotate(options, max_files)?;
- Ok(true)
- } else {
- Ok(false)
- }
- }
-}
-
-/// Iterator over logrotated file names
-pub struct LogRotateFileNames {
- base_path: PathBuf,
- count: usize,
- compress: bool,
-}
-
-impl Iterator for LogRotateFileNames {
- type Item = PathBuf;
-
- fn next(&mut self) -> Option<Self::Item> {
- if self.count > 0 {
- let mut path: std::ffi::OsString = self.base_path.clone().into();
-
- path.push(format!(".{}", self.count));
- self.count += 1;
-
- if Path::new(&path).is_file() {
- Some(path.into())
- } else if self.compress {
- path.push(".zst");
- if Path::new(&path).is_file() {
- Some(path.into())
- } else {
- None
- }
- } else {
- None
- }
- } else if self.base_path.is_file() {
- self.count += 1;
- Some(self.base_path.to_path_buf())
- } else {
- None
- }
- }
-}
-
-/// Iterator over logrotated files by returning a boxed reader
-pub struct LogRotateFiles {
- file_names: LogRotateFileNames,
-}
-
-impl Iterator for LogRotateFiles {
- type Item = Box<dyn Read + Send>;
-
- fn next(&mut self) -> Option<Self::Item> {
- let filename = self.file_names.next()?;
- let file = File::open(&filename).ok()?;
-
- if filename.extension() == Some(std::ffi::OsStr::new("zst")) {
- let encoder = zstd::stream::read::Decoder::new(file).ok()?;
- return Some(Box::new(encoder));
- }
-
- Some(Box::new(file))
- }
-}
use openssl::hash::{hash, DigestBytes, MessageDigest};
pub use proxmox::tools::fd::Fd;
-use proxmox::tools::fs::{create_path, CreateOptions};
use proxmox_http::{
client::SimpleHttp,
pub mod async_io;
pub mod compression;
pub mod config;
-pub mod cpio;
pub mod daemon;
pub mod disks;
mod memcom;
pub use memcom::Memcom;
-pub mod logrotate;
pub mod serde_filter;
pub mod statistics;
pub mod subscription;
std::env::remove_var(name);
}
}
-
-/// Create the base run-directory.
-///
-/// This exists to fixate the permissions for the run *base* directory while allowing intermediate
-/// directories after it to have different permissions.
-pub fn create_run_dir() -> Result<(), Error> {
- let backup_user = crate::backup::backup_user()?;
- let opts = CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid);
- let _: bool = create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), None, Some(opts))?;
- Ok(())
-}