-use anyhow::{bail, format_err, Error};
-use nix::unistd::{fork, ForkResult, pipe};
-use std::os::unix::io::RawFd;
-use chrono::{Local, DateTime, Utc, TimeZone};
-use std::path::{Path, PathBuf};
use std::collections::{HashSet, HashMap};
use std::ffi::OsStr;
-use std::io::{Write, Seek, SeekFrom};
+use std::io::{self, Write, Seek, SeekFrom};
use std::os::unix::fs::OpenOptionsExt;
+use std::os::unix::io::RawFd;
+use std::path::{Path, PathBuf};
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use anyhow::{bail, format_err, Error};
+use chrono::{Local, DateTime, Utc, TimeZone};
+use futures::future::FutureExt;
+use futures::select;
+use futures::stream::{StreamExt, TryStreamExt};
+use nix::unistd::{fork, ForkResult, pipe};
+use serde_json::{json, Value};
+use tokio::signal::unix::{signal, SignalKind};
+use tokio::sync::mpsc;
+use xdg::BaseDirectories;
+use pathpatterns::{MatchEntry, MatchType, PatternFlag};
use proxmox::{sortable, identity};
use proxmox::tools::fs::{file_get_contents, file_get_json, replace_file, CreateOptions, image_size};
use proxmox::sys::linux::tty;
use proxmox_backup::api2::types::*;
use proxmox_backup::client::*;
use proxmox_backup::backup::*;
-use proxmox_backup::pxar::{ self, catalog::* };
-
-//use proxmox_backup::backup::image_index::*;
-//use proxmox_backup::config::datastore;
-//use proxmox_backup::pxar::encoder::*;
-//use proxmox_backup::backup::datastore::*;
-
-use serde_json::{json, Value};
-//use hyper::Body;
-use std::sync::{Arc, Mutex};
-//use regex::Regex;
-use xdg::BaseDirectories;
-
-use futures::*;
-use tokio::sync::mpsc;
+use proxmox_backup::pxar::catalog::*;
const ENV_VAR_PBS_FINGERPRINT: &str = "PBS_FINGERPRINT";
const ENV_VAR_PBS_PASSWORD: &str = "PBS_PASSWORD";
-proxmox::const_regex! {
- BACKUPSPEC_REGEX = r"^([a-zA-Z0-9_-]+\.(?:pxar|img|conf|log)):(.+)$";
-}
const REPO_URL_SCHEMA: Schema = StringSchema::new("Repository URL.")
.format(&BACKUP_REPO_URL)
.max_length(256)
.schema();
-const BACKUP_SOURCE_SCHEMA: Schema = StringSchema::new(
- "Backup source specification ([<label>:<path>]).")
- .format(&ApiStringFormat::Pattern(&BACKUPSPEC_REGEX))
- .schema();
-
const KEYFILE_SCHEMA: Schema = StringSchema::new(
"Path to encryption key. All data will be encrypted using this key.")
.schema();
skip_lost_and_found: bool,
crypt_config: Option<Arc<CryptConfig>>,
catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
- exclude_pattern: Vec<pxar::MatchPattern>,
+ exclude_pattern: Vec<MatchEntry>,
entries_max: usize,
) -> Result<BackupStats, Error> {
Ok(Value::Null)
}
-fn parse_backupspec(value: &str) -> Result<(&str, &str), Error> {
-
- if let Some(caps) = (BACKUPSPEC_REGEX.regex_obj)().captures(value) {
- return Ok((caps.get(1).unwrap().as_str(), caps.get(2).unwrap().as_str()));
- }
- bail!("unable to parse directory specification '{}'", value);
-}
-
fn spawn_catalog_upload(
client: Arc<BackupWriter>,
crypt_config: Option<Arc<CryptConfig>>,
type: Integer,
description: "Max number of entries to hold in memory.",
optional: true,
- default: pxar::ENCODER_MAX_ENTRIES as isize,
+ default: proxmox_backup::pxar::ENCODER_MAX_ENTRIES as isize,
},
"verbose": {
type: Boolean,
let include_dev = param["include-dev"].as_array();
- let entries_max = param["entries-max"].as_u64().unwrap_or(pxar::ENCODER_MAX_ENTRIES as u64);
+ let entries_max = param["entries-max"].as_u64()
+ .unwrap_or(proxmox_backup::pxar::ENCODER_MAX_ENTRIES as u64);
let empty = Vec::new();
- let arg_pattern = param["exclude"].as_array().unwrap_or(&empty);
-
- let mut pattern_list = Vec::with_capacity(arg_pattern.len());
- for s in arg_pattern {
- let l = s.as_str().ok_or_else(|| format_err!("Invalid pattern string slice"))?;
- let p = pxar::MatchPattern::from_line(l.as_bytes())?
- .ok_or_else(|| format_err!("Invalid match pattern in arguments"))?;
- pattern_list.push(p);
+ let exclude_args = param["exclude"].as_array().unwrap_or(&empty);
+
+ let mut pattern_list = Vec::with_capacity(exclude_args.len());
+ for entry in exclude_args {
+ let entry = entry.as_str().ok_or_else(|| format_err!("Invalid pattern string slice"))?;
+ pattern_list.push(
+ MatchEntry::parse_pattern(entry, PatternFlag::PATH_NAME, MatchType::Exclude)
+ .map_err(|err| format_err!("invalid exclude pattern entry: {}", err))?
+ );
}
let mut devices = if all_file_systems { None } else { Some(HashSet::new()) };
let mut upload_list = vec![];
- enum BackupType { PXAR, IMAGE, CONFIG, LOGFILE };
-
let mut upload_catalog = false;
for backupspec in backupspec_list {
- let (target, filename) = parse_backupspec(backupspec.as_str().unwrap())?;
+ let spec = parse_backup_specification(backupspec.as_str().unwrap())?;
+ let filename = &spec.config_string;
+ let target = &spec.archive_name;
use std::os::unix::fs::FileTypeExt;
.map_err(|err| format_err!("unable to access '{}' - {}", filename, err))?;
let file_type = metadata.file_type();
- let extension = target.rsplit('.').next()
- .ok_or_else(|| format_err!("missing target file extenion '{}'", target))?;
-
- match extension {
- "pxar" => {
+ match spec.spec_type {
+ BackupSpecificationType::PXAR => {
if !file_type.is_dir() {
bail!("got unexpected file type (expected directory)");
}
- upload_list.push((BackupType::PXAR, filename.to_owned(), format!("{}.didx", target), 0));
+ upload_list.push((BackupSpecificationType::PXAR, filename.to_owned(), format!("{}.didx", target), 0));
upload_catalog = true;
}
- "img" => {
-
+ BackupSpecificationType::IMAGE => {
if !(file_type.is_file() || file_type.is_block_device()) {
bail!("got unexpected file type (expected file or block device)");
}
if size == 0 { bail!("got zero-sized file '{}'", filename); }
- upload_list.push((BackupType::IMAGE, filename.to_owned(), format!("{}.fidx", target), size));
+ upload_list.push((BackupSpecificationType::IMAGE, filename.to_owned(), format!("{}.fidx", target), size));
}
- "conf" => {
+ BackupSpecificationType::CONFIG => {
if !file_type.is_file() {
bail!("got unexpected file type (expected regular file)");
}
- upload_list.push((BackupType::CONFIG, filename.to_owned(), format!("{}.blob", target), metadata.len()));
+ upload_list.push((BackupSpecificationType::CONFIG, filename.to_owned(), format!("{}.blob", target), metadata.len()));
}
- "log" => {
+ BackupSpecificationType::LOGFILE => {
if !file_type.is_file() {
bail!("got unexpected file type (expected regular file)");
}
- upload_list.push((BackupType::LOGFILE, filename.to_owned(), format!("{}.blob", target), metadata.len()));
- }
- _ => {
- bail!("got unknown archive extension '{}'", extension);
+ upload_list.push((BackupSpecificationType::LOGFILE, filename.to_owned(), format!("{}.blob", target), metadata.len()));
}
}
}
for (backup_type, filename, target, size) in upload_list {
match backup_type {
- BackupType::CONFIG => {
+ BackupSpecificationType::CONFIG => {
println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target);
let stats = client
.upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
.await?;
manifest.add_file(target, stats.size, stats.csum)?;
}
- BackupType::LOGFILE => { // fixme: remove - not needed anymore ?
+ BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ?
println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target);
let stats = client
.upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
.await?;
manifest.add_file(target, stats.size, stats.csum)?;
}
- BackupType::PXAR => {
+ BackupSpecificationType::PXAR => {
println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
let stats = backup_directory(
manifest.add_file(target, stats.size, stats.csum)?;
catalog.lock().unwrap().end_directory()?;
}
- BackupType::IMAGE => {
+ BackupSpecificationType::IMAGE => {
println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
let stats = backup_image(
&client,
Ok(())
}
+fn parse_archive_type(name: &str) -> (String, ArchiveType) {
+ if name.ends_with(".didx") || name.ends_with(".fidx") || name.ends_with(".blob") {
+ (name.into(), archive_type(name).unwrap())
+ } else if name.ends_with(".pxar") {
+ (format!("{}.didx", name), ArchiveType::DynamicIndex)
+ } else if name.ends_with(".img") {
+ (format!("{}.fidx", name), ArchiveType::FixedIndex)
+ } else {
+ (format!("{}.blob", name), ArchiveType::Blob)
+ }
+}
+
#[api(
input: {
properties: {
}
};
- let server_archive_name = if archive_name.ends_with(".pxar") {
- format!("{}.didx", archive_name)
- } else if archive_name.ends_with(".img") {
- format!("{}.fidx", archive_name)
- } else {
- format!("{}.blob", archive_name)
- };
-
let client = BackupReader::start(
client,
crypt_config.clone(),
let manifest = client.download_manifest().await?;
- if server_archive_name == MANIFEST_BLOB_NAME {
+ let (archive_name, archive_type) = parse_archive_type(archive_name);
+
+ if archive_name == MANIFEST_BLOB_NAME {
let backup_index_data = manifest.into_json().to_string();
if let Some(target) = target {
replace_file(target, backup_index_data.as_bytes(), CreateOptions::new())?;
.map_err(|err| format_err!("unable to pipe data - {}", err))?;
}
- } else if server_archive_name.ends_with(".blob") {
+ } else if archive_type == ArchiveType::Blob {
- let mut reader = client.download_blob(&manifest, &server_archive_name).await?;
+ let mut reader = client.download_blob(&manifest, &archive_name).await?;
if let Some(target) = target {
let mut writer = std::fs::OpenOptions::new()
.map_err(|err| format_err!("unable to pipe data - {}", err))?;
}
- } else if server_archive_name.ends_with(".didx") {
+ } else if archive_type == ArchiveType::DynamicIndex {
- let index = client.download_dynamic_index(&manifest, &server_archive_name).await?;
+ let index = client.download_dynamic_index(&manifest, &archive_name).await?;
let most_used = index.find_most_used_chunks(8);
let mut reader = BufferedDynamicReader::new(index, chunk_reader);
if let Some(target) = target {
-
- let feature_flags = pxar::flags::DEFAULT;
- let mut decoder = pxar::SequentialDecoder::new(&mut reader, feature_flags);
- decoder.set_callback(move |path| {
- if verbose {
- eprintln!("{:?}", path);
- }
- Ok(())
- });
- decoder.set_allow_existing_dirs(allow_existing_dirs);
-
- decoder.restore(Path::new(target), &Vec::new())?;
+ proxmox_backup::pxar::extract_archive(
+ pxar::decoder::Decoder::from_std(reader)?,
+ Path::new(target),
+ &[],
+ proxmox_backup::pxar::Flags::DEFAULT,
+ allow_existing_dirs,
+ |path| {
+ if verbose {
+ println!("{:?}", path);
+ }
+ },
+ )
+ .map_err(|err| format_err!("error extracting archive - {}", err))?;
} else {
let mut writer = std::fs::OpenOptions::new()
.write(true)
std::io::copy(&mut reader, &mut writer)
.map_err(|err| format_err!("unable to pipe data - {}", err))?;
}
- } else if server_archive_name.ends_with(".fidx") {
+ } else if archive_type == ArchiveType::FixedIndex {
- let index = client.download_fixed_index(&manifest, &server_archive_name).await?;
+ let index = client.download_fixed_index(&manifest, &archive_name).await?;
let mut writer = if let Some(target) = target {
std::fs::OpenOptions::new()
};
dump_image(client.clone(), crypt_config.clone(), index, &mut writer, verbose)?;
-
- } else {
- bail!("unknown archive file extension (expected .pxar of .img)");
}
Ok(Value::Null)
("group", false, &StringSchema::new("Backup group.").schema()),
], [
("output-format", true, &OUTPUT_FORMAT),
+ (
+ "quiet",
+ true,
+ &BooleanSchema::new("Minimal output - only show removals.")
+ .schema()
+ ),
("repository", true, &REPO_URL_SCHEMA),
])
)
let output_format = get_output_format(¶m);
+ let quiet = param["quiet"].as_bool().unwrap_or(false);
+
param.as_object_mut().unwrap().remove("repository");
param.as_object_mut().unwrap().remove("group");
param.as_object_mut().unwrap().remove("output-format");
+ param.as_object_mut().unwrap().remove("quiet");
param["backup-type"] = group.backup_type().into();
param["backup-id"] = group.backup_id().into();
- let result = client.post(&path, Some(param)).await?;
+ let mut result = client.post(&path, Some(param)).await?;
record_repository(&repo);
- view_task_result(client, result, &output_format).await?;
+ let render_snapshot_path = |_v: &Value, record: &Value| -> Result<String, Error> {
+ let item: PruneListItem = serde_json::from_value(record.to_owned())?;
+ let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
+ Ok(snapshot.relative_path().to_str().unwrap().to_owned())
+ };
+
+ let render_prune_action = |v: &Value, _record: &Value| -> Result<String, Error> {
+ Ok(match v.as_bool() {
+ Some(true) => "keep",
+ Some(false) => "remove",
+ None => "unknown",
+ }.to_string())
+ };
+
+ let options = default_table_format_options()
+ .sortby("backup-type", false)
+ .sortby("backup-id", false)
+ .sortby("backup-time", false)
+ .column(ColumnConfig::new("backup-id").renderer(render_snapshot_path).header("snapshot"))
+ .column(ColumnConfig::new("backup-time").renderer(tools::format::render_epoch).header("date"))
+ .column(ColumnConfig::new("keep").renderer(render_prune_action).header("action"))
+ ;
+
+ let info = &proxmox_backup::api2::admin::datastore::API_RETURN_SCHEMA_PRUNE;
+
+ let mut data = result["data"].take();
+
+ if quiet {
+ let list: Vec<Value> = data.as_array().unwrap().iter().filter(|item| {
+ item["keep"].as_bool() == Some(false)
+ }).map(|v| v.clone()).collect();
+ data = list.into();
+ }
+
+ format_and_print_result_full(&mut data, info, &output_format, &options);
Ok(Value::Null)
}
}
}
+use proxmox_backup::client::RemoteChunkReader;
+/// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
+/// async use!
+///
+/// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
+/// so that we can properly access it from multiple threads simultaneously while not issuing
+/// duplicate simultaneous reads over http.
+struct BufferedDynamicReadAt {
+ inner: Mutex<BufferedDynamicReader<RemoteChunkReader>>,
+}
+
+impl BufferedDynamicReadAt {
+ fn new(inner: BufferedDynamicReader<RemoteChunkReader>) -> Self {
+ Self {
+ inner: Mutex::new(inner),
+ }
+ }
+}
+
+impl pxar::accessor::ReadAt for BufferedDynamicReadAt {
+ fn poll_read_at(
+ self: Pin<&Self>,
+ _cx: &mut Context,
+ buf: &mut [u8],
+ offset: u64,
+ ) -> Poll<io::Result<usize>> {
+ use std::io::Read;
+ tokio::task::block_in_place(move || {
+ let mut reader = self.inner.lock().unwrap();
+ reader.seek(SeekFrom::Start(offset))?;
+ Poll::Ready(Ok(reader.read(buf)?))
+ })
+ }
+}
+
async fn mount_do(param: Value, pipe: Option<RawFd>) -> Result<Value, Error> {
let repo = extract_repository_from_value(¶m)?;
let archive_name = tools::required_string_param(¶m, "archive-name")?;
let most_used = index.find_most_used_chunks(8);
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
let reader = BufferedDynamicReader::new(index, chunk_reader);
- let decoder = pxar::Decoder::new(reader)?;
+ let archive_size = reader.archive_size();
+ let reader: proxmox_backup::pxar::fuse::Reader =
+ Arc::new(BufferedDynamicReadAt::new(reader));
+ let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
let options = OsStr::new("ro,default_permissions");
- let mut session = pxar::fuse::Session::new(decoder, &options, pipe.is_none())
- .map_err(|err| format_err!("pxar mount failed: {}", err))?;
- // Mount the session but not call fuse deamonize as this will cause
- // issues with the runtime after the fork
- let deamonize = false;
- session.mount(&Path::new(target), deamonize)?;
+ let session = proxmox_backup::pxar::fuse::Session::mount(
+ decoder,
+ &options,
+ false,
+ Path::new(target),
+ )
+ .map_err(|err| format_err!("pxar mount failed: {}", err))?;
if let Some(pipe) = pipe {
nix::unistd::chdir(Path::new("/")).unwrap();
- // Finish creation of deamon by redirecting filedescriptors.
+ // Finish creation of daemon by redirecting filedescriptors.
let nullfd = nix::fcntl::open(
"/dev/null",
nix::fcntl::OFlag::O_RDWR,
nix::unistd::close(pipe).unwrap();
}
- let multithreaded = true;
- session.run_loop(multithreaded)?;
+ let mut interrupt = signal(SignalKind::interrupt())?;
+ select! {
+ res = session.fuse() => res?,
+ _ = interrupt.recv().fuse() => {
+ // exit on interrupted
+ }
+ }
} else {
bail!("unknown archive file extension (expected .pxar)");
}
let most_used = index.find_most_used_chunks(8);
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config.clone(), most_used);
let reader = BufferedDynamicReader::new(index, chunk_reader);
- let mut decoder = pxar::Decoder::new(reader)?;
- decoder.set_callback(|path| {
- println!("{:?}", path);
- Ok(())
- });
+ let archive_size = reader.archive_size();
+ let reader: proxmox_backup::pxar::fuse::Reader =
+ Arc::new(BufferedDynamicReadAt::new(reader));
+ let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
let tmpfile = client.download(CATALOG_NAME, tmpfile).await?;
let index = DynamicIndexReader::new(tmpfile)
catalog_reader,
&server_archive_name,
decoder,
- )?;
+ ).await?;
println!("Starting interactive shell");
- state.shell()?;
+ state.shell().await?;
record_repository(&repo);
.insert("catalog", catalog_mgmt_cli())
.insert("task", task_mgmt_cli());
- run_cli_command(cmd_def, Some(|future| {
+ let rpcenv = CliEnvironment::new();
+ run_cli_command(cmd_def, rpcenv, Some(|future| {
proxmox_backup::tools::runtime::main(future)
}));
}