X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=src%2Fbin%2Fproxmox-backup-client.rs;h=30dba8cdbe05d641ba4b96e763f2d4a0a4ba4ced;hb=3bad3e6e52d8b4b88c7b1fc79a794b0b45a72d42;hp=37921e38383dc98680146a3449d042fbac2f0a38;hpb=163e9bbe91715a91d77e38a8f288857d1602fdd5;p=proxmox-backup.git diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 37921e38..30dba8cd 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -1,53 +1,83 @@ -use failure::*; -use nix::unistd::{fork, ForkResult, pipe}; -use std::os::unix::io::RawFd; -use chrono::{Local, Utc, TimeZone}; -use std::path::{Path, PathBuf}; use std::collections::{HashSet, HashMap}; -use std::ffi::OsStr; -use std::io::{Write, Seek, SeekFrom}; -use std::os::unix::fs::OpenOptionsExt; +use std::io::{self, Write, Seek, SeekFrom}; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::Context; + +use anyhow::{bail, format_err, Error}; +use chrono::{Local, DateTime, Utc, TimeZone}; +use futures::future::FutureExt; +use futures::stream::{StreamExt, TryStreamExt}; +use serde_json::{json, Value}; +use tokio::sync::mpsc; +use xdg::BaseDirectories; -use proxmox::{sortable, identity}; -use proxmox::tools::fs::{file_get_contents, file_get_json, file_set_contents, image_size}; +use pathpatterns::{MatchEntry, MatchType, PatternFlag}; +use proxmox::tools::fs::{file_get_contents, file_get_json, replace_file, CreateOptions, image_size}; use proxmox::api::{ApiHandler, ApiMethod, RpcEnvironment}; use proxmox::api::schema::*; use proxmox::api::cli::*; +use proxmox::api::api; +use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; use proxmox_backup::tools; use proxmox_backup::api2::types::*; use proxmox_backup::client::*; -use proxmox_backup::backup::*; -use proxmox_backup::pxar::{ self, catalog::* }; - -//use proxmox_backup::backup::image_index::*; -//use proxmox_backup::config::datastore; -//use proxmox_backup::pxar::encoder::*; -//use proxmox_backup::backup::datastore::*; - -use serde_json::{json, Value}; -//use hyper::Body; -use std::sync::{Arc, Mutex}; -//use regex::Regex; -use xdg::BaseDirectories; - -use futures::*; -use tokio::sync::mpsc; - -proxmox::api::const_regex! { - BACKUPSPEC_REGEX = r"^([a-zA-Z0-9_-]+\.(?:pxar|img|conf|log)):(.+)$"; -} - -const REPO_URL_SCHEMA: Schema = StringSchema::new("Repository URL.") +use proxmox_backup::pxar::catalog::*; +use proxmox_backup::backup::{ + archive_type, + load_and_decrypt_key, + verify_chunk_size, + ArchiveType, + AsyncReadChunk, + BackupDir, + BackupGroup, + BackupManifest, + BufferedDynamicReader, + CATALOG_NAME, + CatalogReader, + CatalogWriter, + ChunkStream, + CryptConfig, + CryptMode, + DataBlob, + DynamicIndexReader, + FixedChunkStream, + FixedIndexReader, + IndexFile, + MANIFEST_BLOB_NAME, + Shell, +}; + +mod proxmox_backup_client; +use proxmox_backup_client::*; + +const ENV_VAR_PBS_FINGERPRINT: &str = "PBS_FINGERPRINT"; +const ENV_VAR_PBS_PASSWORD: &str = "PBS_PASSWORD"; + + +pub const REPO_URL_SCHEMA: Schema = StringSchema::new("Repository URL.") .format(&BACKUP_REPO_URL) .max_length(256) .schema(); +pub const KEYFILE_SCHEMA: Schema = StringSchema::new( + "Path to encryption key. All data will be encrypted using this key.") + .schema(); + +const CHUNK_SIZE_SCHEMA: Schema = IntegerSchema::new( + "Chunk size in KB. Must be a power of 2.") + .minimum(64) + .maximum(4096) + .default(4096) + .schema(); + fn get_default_repository() -> Option { std::env::var("PBS_REPOSITORY").ok() } -fn extract_repository_from_value( +pub fn extract_repository_from_value( param: &Value, ) -> Result { @@ -117,10 +147,10 @@ fn record_repository(repo: &BackupRepository) { let new_data = json!(map); - let _ = file_set_contents(path, new_data.to_string().as_bytes(), None); + let _ = replace_file(path, new_data.to_string().as_bytes(), CreateOptions::new()); } -fn complete_repository(_arg: &str, _param: &HashMap) -> Vec { +pub fn complete_repository(_arg: &str, _param: &HashMap) -> Vec { let mut result = vec![]; @@ -146,19 +176,108 @@ fn complete_repository(_arg: &str, _param: &HashMap) -> Vec Result { + + let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok(); + + use std::env::VarError::*; + let password = match std::env::var(ENV_VAR_PBS_PASSWORD) { + Ok(p) => Some(p), + Err(NotUnicode(_)) => bail!(format!("{} contains bad characters", ENV_VAR_PBS_PASSWORD)), + Err(NotPresent) => None, + }; + + let options = HttpClientOptions::new() + .prefix(Some("proxmox-backup".to_string())) + .password(password) + .interactive(true) + .fingerprint(fingerprint) + .fingerprint_cache(true) + .ticket_cache(true); + + HttpClient::new(server, userid, options) +} + +async fn view_task_result( + client: HttpClient, + result: Value, + output_format: &str, +) -> Result<(), Error> { + let data = &result["data"]; + if output_format == "text" { + if let Some(upid) = data.as_str() { + display_task_log(client, upid, true).await?; + } + } else { + format_and_print_result(&data, &output_format); + } + + Ok(()) +} + +async fn api_datastore_list_snapshots( + client: &HttpClient, + store: &str, + group: Option, +) -> Result { + + let path = format!("api2/json/admin/datastore/{}/snapshots", store); + + let mut args = json!({}); + if let Some(group) = group { + args["backup-type"] = group.backup_type().into(); + args["backup-id"] = group.backup_id().into(); + } + + let mut result = client.get(&path, Some(args)).await?; + + Ok(result["data"].take()) +} + +pub async fn api_datastore_latest_snapshot( + client: &HttpClient, + store: &str, + group: BackupGroup, +) -> Result<(String, String, DateTime), Error> { + + let list = api_datastore_list_snapshots(client, store, Some(group.clone())).await?; + let mut list: Vec = serde_json::from_value(list)?; + + if list.is_empty() { + bail!("backup group {:?} does not contain any snapshots.", group.group_path()); + } + + list.sort_unstable_by(|a, b| b.backup_time.cmp(&a.backup_time)); + + let backup_time = Utc.timestamp(list[0].backup_time, 0); + + Ok((group.backup_type().to_owned(), group.backup_id().to_owned(), backup_time)) +} + async fn backup_directory>( client: &BackupWriter, + crypt_mode: CryptMode, + previous_manifest: Option>, dir_path: P, archive_name: &str, chunk_size: Option, device_set: Option>, verbose: bool, skip_lost_and_found: bool, - crypt_config: Option>, - catalog: Arc>>, + catalog: Arc>>, + exclude_pattern: Vec, + entries_max: usize, ) -> Result { - let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), device_set, verbose, skip_lost_and_found, catalog)?; + let pxar_stream = PxarBackupStream::open( + dir_path.as_ref(), + device_set, + verbose, + skip_lost_and_found, + catalog, + exclude_pattern, + entries_max, + )?; let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); let (mut tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks @@ -168,11 +287,13 @@ async fn backup_directory>( // spawn chunker inside a separate task so that it can run parallel tokio::spawn(async move { - let _ = tx.send_all(&mut chunk_stream).await; + while let Some(v) = chunk_stream.next().await { + let _ = tx.send(v).await; + } }); let stats = client - .upload_stream(archive_name, stream, "dynamic", None, crypt_config) + .upload_stream(crypt_mode, previous_manifest, archive_name, stream, "dynamic", None) .await?; Ok(stats) @@ -180,403 +301,355 @@ async fn backup_directory>( async fn backup_image>( client: &BackupWriter, + crypt_mode: CryptMode, + previous_manifest: Option>, image_path: P, archive_name: &str, image_size: u64, chunk_size: Option, _verbose: bool, - crypt_config: Option>, ) -> Result { let path = image_path.as_ref().to_owned(); let file = tokio::fs::File::open(path).await?; - let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) + let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) .map_err(Error::from); let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024)); let stats = client - .upload_stream(archive_name, stream, "fixed", Some(image_size), crypt_config) + .upload_stream(crypt_mode, previous_manifest, archive_name, stream, "fixed", Some(image_size)) .await?; Ok(stats) } -fn strip_server_file_expenstion(name: &str) -> String { - - if name.ends_with(".didx") || name.ends_with(".fidx") || name.ends_with(".blob") { - name[..name.len()-5].to_owned() - } else { - name.to_owned() // should not happen - } -} +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + "output-format": { + schema: OUTPUT_FORMAT, + optional: true, + }, + } + } +)] +/// List backup groups. +async fn list_backup_groups(param: Value) -> Result { -fn list_backup_groups( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + let output_format = get_output_format(¶m); let repo = extract_repository_from_value(¶m)?; - let client = HttpClient::new(repo.host(), repo.user(), None)?; + let client = connect(repo.host(), repo.user())?; let path = format!("api2/json/admin/datastore/{}/groups", repo.store()); - let mut result = async_main(async move { - client.get(&path, None).await - })?; + let mut result = client.get(&path, None).await?; record_repository(&repo); - // fixme: implement and use output formatter instead .. - let list = result["data"].as_array_mut().unwrap(); - - list.sort_unstable_by(|a, b| { - let a_id = a["backup-id"].as_str().unwrap(); - let a_backup_type = a["backup-type"].as_str().unwrap(); - let b_id = b["backup-id"].as_str().unwrap(); - let b_backup_type = b["backup-type"].as_str().unwrap(); - - let type_order = a_backup_type.cmp(b_backup_type); - if type_order == std::cmp::Ordering::Equal { - a_id.cmp(b_id) - } else { - type_order - } - }); - - let output_format = param["output-format"].as_str().unwrap_or("text").to_owned(); - - let mut result = vec![]; - - for item in list { + let render_group_path = |_v: &Value, record: &Value| -> Result { + let item: GroupListItem = serde_json::from_value(record.to_owned())?; + let group = BackupGroup::new(item.backup_type, item.backup_id); + Ok(group.group_path().to_str().unwrap().to_owned()) + }; - let id = item["backup-id"].as_str().unwrap(); - let btype = item["backup-type"].as_str().unwrap(); - let epoch = item["last-backup"].as_i64().unwrap(); - let last_backup = Utc.timestamp(epoch, 0); - let backup_count = item["backup-count"].as_u64().unwrap(); + let render_last_backup = |_v: &Value, record: &Value| -> Result { + let item: GroupListItem = serde_json::from_value(record.to_owned())?; + let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.last_backup); + Ok(snapshot.relative_path().to_str().unwrap().to_owned()) + }; - let group = BackupGroup::new(btype, id); + let render_files = |_v: &Value, record: &Value| -> Result { + let item: GroupListItem = serde_json::from_value(record.to_owned())?; + Ok(tools::format::render_backup_file_list(&item.files)) + }; - let path = group.group_path().to_str().unwrap().to_owned(); + let options = default_table_format_options() + .sortby("backup-type", false) + .sortby("backup-id", false) + .column(ColumnConfig::new("backup-id").renderer(render_group_path).header("group")) + .column( + ColumnConfig::new("last-backup") + .renderer(render_last_backup) + .header("last snapshot") + .right_align(false) + ) + .column(ColumnConfig::new("backup-count")) + .column(ColumnConfig::new("files").renderer(render_files)); - let files = item["files"].as_array().unwrap().iter() - .map(|v| strip_server_file_expenstion(v.as_str().unwrap())).collect(); + let mut data: Value = result["data"].take(); - if output_format == "text" { - println!( - "{:20} | {} | {:5} | {}", - path, - BackupDir::backup_time_to_string(last_backup), - backup_count, - tools::join(&files, ' '), - ); - } else { - result.push(json!({ - "backup-type": btype, - "backup-id": id, - "last-backup": epoch, - "backup-count": backup_count, - "files": files, - })); - } - } + let info = &proxmox_backup::api2::admin::datastore::API_RETURN_SCHEMA_LIST_GROUPS; - if output_format != "text" { format_and_print_result(&result.into(), &output_format); } + format_and_print_result_full(&mut data, info, &output_format, &options); Ok(Value::Null) } -fn list_snapshots( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + group: { + type: String, + description: "Backup group.", + optional: true, + }, + "output-format": { + schema: OUTPUT_FORMAT, + optional: true, + }, + } + } +)] +/// List backup snapshots. +async fn list_snapshots(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; - let output_format = param["output-format"].as_str().unwrap_or("text").to_owned(); + let output_format = get_output_format(¶m); - let client = HttpClient::new(repo.host(), repo.user(), None)?; - - let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store()); + let client = connect(repo.host(), repo.user())?; - let mut args = json!({}); - if let Some(path) = param["group"].as_str() { - let group = BackupGroup::parse(path)?; - args["backup-type"] = group.backup_type().into(); - args["backup-id"] = group.backup_id().into(); - } + let group: Option = if let Some(path) = param["group"].as_str() { + Some(path.parse()?) + } else { + None + }; - let result = async_main(async move { - client.get(&path, Some(args)).await - })?; + let mut data = api_datastore_list_snapshots(&client, repo.store(), group).await?; record_repository(&repo); - let list = result["data"].as_array().unwrap(); - - let mut result = vec![]; - - for item in list { - - let id = item["backup-id"].as_str().unwrap(); - let btype = item["backup-type"].as_str().unwrap(); - let epoch = item["backup-time"].as_i64().unwrap(); - - let snapshot = BackupDir::new(btype, id, epoch); + let render_snapshot_path = |_v: &Value, record: &Value| -> Result { + let item: SnapshotListItem = serde_json::from_value(record.to_owned())?; + let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time); + Ok(snapshot.relative_path().to_str().unwrap().to_owned()) + }; - let path = snapshot.relative_path().to_str().unwrap().to_owned(); + let render_files = |_v: &Value, record: &Value| -> Result { + let item: SnapshotListItem = serde_json::from_value(record.to_owned())?; + let mut filenames = Vec::new(); + for file in &item.files { + filenames.push(file.filename.to_string()); + } + Ok(tools::format::render_backup_file_list(&filenames[..])) + }; - let files = item["files"].as_array().unwrap().iter() - .map(|v| strip_server_file_expenstion(v.as_str().unwrap())).collect(); + let options = default_table_format_options() + .sortby("backup-type", false) + .sortby("backup-id", false) + .sortby("backup-time", false) + .column(ColumnConfig::new("backup-id").renderer(render_snapshot_path).header("snapshot")) + .column(ColumnConfig::new("size")) + .column(ColumnConfig::new("files").renderer(render_files)) + ; - if output_format == "text" { - let size_str = if let Some(size) = item["size"].as_u64() { - size.to_string() - } else { - String::from("-") - }; - println!("{} | {} | {}", path, size_str, tools::join(&files, ' ')); - } else { - let mut data = json!({ - "backup-type": btype, - "backup-id": id, - "backup-time": epoch, - "files": files, - }); - if let Some(size) = item["size"].as_u64() { - data["size"] = size.into(); - } - result.push(data); - } - } + let info = &proxmox_backup::api2::admin::datastore::API_RETURN_SCHEMA_LIST_SNAPSHOTS; - if output_format != "text" { format_and_print_result(&result.into(), &output_format); } + format_and_print_result_full(&mut data, info, &output_format, &options); Ok(Value::Null) } -fn forget_snapshots( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + snapshot: { + type: String, + description: "Snapshot path.", + }, + } + } +)] +/// Forget (remove) backup snapshots. +async fn forget_snapshots(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; let path = tools::required_string_param(¶m, "snapshot")?; - let snapshot = BackupDir::parse(path)?; + let snapshot: BackupDir = path.parse()?; - let mut client = HttpClient::new(repo.host(), repo.user(), None)?; + let mut client = connect(repo.host(), repo.user())?; let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store()); - let result = async_main(async move { - client.delete(&path, Some(json!({ - "backup-type": snapshot.group().backup_type(), - "backup-id": snapshot.group().backup_id(), - "backup-time": snapshot.backup_time().timestamp(), - }))).await - })?; + let result = client.delete(&path, Some(json!({ + "backup-type": snapshot.group().backup_type(), + "backup-id": snapshot.group().backup_id(), + "backup-time": snapshot.backup_time().timestamp(), + }))).await?; record_repository(&repo); Ok(result) } -fn api_login( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + } + } +)] +/// Try to login. If successful, store ticket. +async fn api_login(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; - let client = HttpClient::new(repo.host(), repo.user(), None)?; - async_main(async move { client.login().await })?; + let client = connect(repo.host(), repo.user())?; + client.login().await?; record_repository(&repo); Ok(Value::Null) } -fn api_logout( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + } + } +)] +/// Logout (delete stored ticket). +fn api_logout(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; - delete_ticket_info(repo.host(), repo.user())?; + delete_ticket_info("proxmox-backup", repo.host(), repo.user())?; Ok(Value::Null) } -fn dump_catalog( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { - - let repo = extract_repository_from_value(¶m)?; - - let path = tools::required_string_param(¶m, "snapshot")?; - let snapshot = BackupDir::parse(path)?; - - let keyfile = param["keyfile"].as_str().map(PathBuf::from); - let crypt_config = match keyfile { - None => None, - Some(path) => { - let (key, _) = load_and_decrtypt_key(&path, &get_encryption_key_password)?; - Some(Arc::new(CryptConfig::new(key)?)) +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + snapshot: { + type: String, + description: "Snapshot path.", + }, + "output-format": { + schema: OUTPUT_FORMAT, + optional: true, + }, } - }; - - let client = HttpClient::new(repo.host(), repo.user(), None)?; - - async_main(async move { - let client = BackupReader::start( - client, - crypt_config.clone(), - repo.store(), - &snapshot.group().backup_type(), - &snapshot.group().backup_id(), - snapshot.backup_time(), - true, - ).await?; - - let manifest = client.download_manifest().await?; - - let index = client.download_dynamic_index(&manifest, CATALOG_NAME).await?; - - let most_used = index.find_most_used_chunks(8); - - let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used); - - let mut reader = BufferedDynamicReader::new(index, chunk_reader); - - let mut catalogfile = std::fs::OpenOptions::new() - .write(true) - .read(true) - .custom_flags(libc::O_TMPFILE) - .open("/tmp")?; - - std::io::copy(&mut reader, &mut catalogfile) - .map_err(|err| format_err!("unable to download catalog - {}", err))?; - - catalogfile.seek(SeekFrom::Start(0))?; - - let mut catalog_reader = CatalogReader::new(catalogfile); - - catalog_reader.dump()?; - - record_repository(&repo); - - Ok::<(), Error>(()) - })?; - - Ok(Value::Null) -} - -fn list_snapshot_files( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + } +)] +/// List snapshot files. +async fn list_snapshot_files(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; let path = tools::required_string_param(¶m, "snapshot")?; - let snapshot = BackupDir::parse(path)?; + let snapshot: BackupDir = path.parse()?; - let output_format = param["output-format"].as_str().unwrap_or("text").to_owned(); + let output_format = get_output_format(¶m); - let client = HttpClient::new(repo.host(), repo.user(), None)?; + let client = connect(repo.host(), repo.user())?; let path = format!("api2/json/admin/datastore/{}/files", repo.store()); - let mut result = async_main(async move { - client.get(&path, Some(json!({ - "backup-type": snapshot.group().backup_type(), - "backup-id": snapshot.group().backup_id(), - "backup-time": snapshot.backup_time().timestamp(), - }))).await - })?; + let mut result = client.get(&path, Some(json!({ + "backup-type": snapshot.group().backup_type(), + "backup-id": snapshot.group().backup_id(), + "backup-time": snapshot.backup_time().timestamp(), + }))).await?; record_repository(&repo); - let list: Value = result["data"].take(); + let info = &proxmox_backup::api2::admin::datastore::API_RETURN_SCHEMA_LIST_SNAPSHOT_FILES; - if output_format == "text" { - for item in list.as_array().unwrap().iter() { - println!( - "{} {}", - strip_server_file_expenstion(item["filename"].as_str().unwrap()), - item["size"].as_u64().unwrap_or(0), - ); - } - } else { - format_and_print_result(&list, &output_format); - } + let mut data: Value = result["data"].take(); + + let options = default_table_format_options(); + + format_and_print_result_full(&mut data, info, &output_format, &options); Ok(Value::Null) } -fn start_garbage_collection( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + "output-format": { + schema: OUTPUT_FORMAT, + optional: true, + }, + }, + }, +)] +/// Start garbage collection for a specific repository. +async fn start_garbage_collection(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; - let mut client = HttpClient::new(repo.host(), repo.user(), None)?; + let output_format = get_output_format(¶m); + + let mut client = connect(repo.host(), repo.user())?; let path = format!("api2/json/admin/datastore/{}/gc", repo.store()); - let result = async_main(async move { client.post(&path, None).await })?; + let result = client.post(&path, None).await?; record_repository(&repo); - Ok(result) -} - -fn parse_backupspec(value: &str) -> Result<(&str, &str), Error> { + view_task_result(client, result, &output_format).await?; - if let Some(caps) = (BACKUPSPEC_REGEX.regex_obj)().captures(value) { - return Ok((caps.get(1).unwrap().as_str(), caps.get(2).unwrap().as_str())); - } - bail!("unable to parse directory specification '{}'", value); + Ok(Value::Null) } fn spawn_catalog_upload( client: Arc, - crypt_config: Option>, + crypt_mode: CryptMode, ) -> Result< ( - Arc>>, + Arc>>, tokio::sync::oneshot::Receiver> ), Error> { - let (catalog_tx, catalog_rx) = mpsc::channel(10); // allow to buffer 10 writes - let catalog_stream = catalog_rx.map_err(Error::from); + let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes + let catalog_stream = crate::tools::StdChannelStream(catalog_rx); let catalog_chunk_size = 512*1024; let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); - let catalog = Arc::new(Mutex::new(CatalogWriter::new(SenderWriter::new(catalog_tx))?)); + let catalog = Arc::new(Mutex::new(CatalogWriter::new(crate::tools::StdChannelWriter::new(catalog_tx))?)); let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel(); tokio::spawn(async move { let catalog_upload_result = client - .upload_stream(CATALOG_NAME, catalog_chunk_stream, "dynamic", None, crypt_config) + .upload_stream(crypt_mode, None, CATALOG_NAME, catalog_chunk_stream, "dynamic", None) .await; if let Err(ref err) = catalog_upload_result { @@ -590,7 +663,120 @@ fn spawn_catalog_upload( Ok((catalog, catalog_result_rx)) } -fn create_backup( +fn keyfile_parameters(param: &Value) -> Result<(Option, CryptMode), Error> { + let keyfile = match param.get("keyfile") { + Some(Value::String(keyfile)) => Some(keyfile), + Some(_) => bail!("bad --keyfile parameter type"), + None => None, + }; + + let crypt_mode: Option = match param.get("crypt-mode") { + Some(mode) => Some(serde_json::from_value(mode.clone())?), + None => None, + }; + + Ok(match (keyfile, crypt_mode) { + // no parameters: + (None, None) => (key::optional_default_key_path()?, CryptMode::Encrypt), + + // just --crypt-mode=none + (None, Some(CryptMode::None)) => (None, CryptMode::None), + + // just --crypt-mode other than none + (None, Some(crypt_mode)) => match key::optional_default_key_path()? { + None => bail!("--crypt-mode without --keyfile and no default key file available"), + Some(path) => (Some(path), crypt_mode), + } + + // just --keyfile + (Some(keyfile), None) => (Some(PathBuf::from(keyfile)), CryptMode::Encrypt), + + // --keyfile and --crypt-mode=none + (Some(_), Some(CryptMode::None)) => { + bail!("--keyfile and --crypt-mode=none are mutually exclusive"); + } + + // --keyfile and --crypt-mode other than none + (Some(keyfile), Some(crypt_mode)) => (Some(PathBuf::from(keyfile)), crypt_mode), + }) +} + +#[api( + input: { + properties: { + backupspec: { + type: Array, + description: "List of backup source specifications ([:] ...)", + items: { + schema: BACKUP_SOURCE_SCHEMA, + } + }, + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + "include-dev": { + description: "Include mountpoints with same st_dev number (see ``man fstat``) as specified files.", + optional: true, + items: { + type: String, + description: "Path to file.", + } + }, + keyfile: { + schema: KEYFILE_SCHEMA, + optional: true, + }, + "crypt-mode": { + type: CryptMode, + optional: true, + }, + "skip-lost-and-found": { + type: Boolean, + description: "Skip lost+found directory.", + optional: true, + }, + "backup-type": { + schema: BACKUP_TYPE_SCHEMA, + optional: true, + }, + "backup-id": { + schema: BACKUP_ID_SCHEMA, + optional: true, + }, + "backup-time": { + schema: BACKUP_TIME_SCHEMA, + optional: true, + }, + "chunk-size": { + schema: CHUNK_SIZE_SCHEMA, + optional: true, + }, + "exclude": { + type: Array, + description: "List of paths or patterns for matching files to exclude.", + optional: true, + items: { + type: String, + description: "Path or match pattern.", + } + }, + "entries-max": { + type: Integer, + description: "Max number of entries to hold in memory.", + optional: true, + default: proxmox_backup::pxar::ENCODER_MAX_ENTRIES as isize, + }, + "verbose": { + type: Boolean, + description: "Verbose output.", + optional: true, + }, + } + } +)] +/// Create (host) backup. +async fn create_backup( param: Value, _info: &ApiMethod, _rpcenv: &mut dyn RpcEnvironment, @@ -614,7 +800,7 @@ fn create_backup( verify_chunk_size(size)?; } - let keyfile = param["keyfile"].as_str().map(PathBuf::from); + let (keyfile, crypt_mode) = keyfile_parameters(¶m)?; let backup_id = param["backup-id"].as_str().unwrap_or(&proxmox::tools::nodename()); @@ -622,6 +808,21 @@ fn create_backup( let include_dev = param["include-dev"].as_array(); + let entries_max = param["entries-max"].as_u64() + .unwrap_or(proxmox_backup::pxar::ENCODER_MAX_ENTRIES as u64); + + let empty = Vec::new(); + let exclude_args = param["exclude"].as_array().unwrap_or(&empty); + + let mut pattern_list = Vec::with_capacity(exclude_args.len()); + for entry in exclude_args { + let entry = entry.as_str().ok_or_else(|| format_err!("Invalid pattern string slice"))?; + pattern_list.push( + MatchEntry::parse_pattern(entry, PatternFlag::PATH_NAME, MatchType::Exclude) + .map_err(|err| format_err!("invalid exclude pattern entry: {}", err))? + ); + } + let mut devices = if all_file_systems { None } else { Some(HashSet::new()) }; if let Some(include_dev) = include_dev { @@ -641,12 +842,10 @@ fn create_backup( let mut upload_list = vec![]; - enum BackupType { PXAR, IMAGE, CONFIG, LOGFILE }; - - let mut upload_catalog = false; - for backupspec in backupspec_list { - let (target, filename) = parse_backupspec(backupspec.as_str().unwrap())?; + let spec = parse_backup_specification(backupspec.as_str().unwrap())?; + let filename = &spec.config_string; + let target = &spec.archive_name; use std::os::unix::fs::FileTypeExt; @@ -654,19 +853,14 @@ fn create_backup( .map_err(|err| format_err!("unable to access '{}' - {}", filename, err))?; let file_type = metadata.file_type(); - let extension = target.rsplit('.').next() - .ok_or_else(|| format_err!("missing target file extenion '{}'", target))?; - - match extension { - "pxar" => { + match spec.spec_type { + BackupSpecificationType::PXAR => { if !file_type.is_dir() { bail!("got unexpected file type (expected directory)"); } - upload_list.push((BackupType::PXAR, filename.to_owned(), format!("{}.didx", target), 0)); - upload_catalog = true; + upload_list.push((BackupSpecificationType::PXAR, filename.to_owned(), format!("{}.didx", target), 0)); } - "img" => { - + BackupSpecificationType::IMAGE => { if !(file_type.is_file() || file_type.is_block_device()) { bail!("got unexpected file type (expected file or block device)"); } @@ -675,29 +869,26 @@ fn create_backup( if size == 0 { bail!("got zero-sized file '{}'", filename); } - upload_list.push((BackupType::IMAGE, filename.to_owned(), format!("{}.fidx", target), size)); + upload_list.push((BackupSpecificationType::IMAGE, filename.to_owned(), format!("{}.fidx", target), size)); } - "conf" => { + BackupSpecificationType::CONFIG => { if !file_type.is_file() { bail!("got unexpected file type (expected regular file)"); } - upload_list.push((BackupType::CONFIG, filename.to_owned(), format!("{}.blob", target), metadata.len())); + upload_list.push((BackupSpecificationType::CONFIG, filename.to_owned(), format!("{}.blob", target), metadata.len())); } - "log" => { + BackupSpecificationType::LOGFILE => { if !file_type.is_file() { bail!("got unexpected file type (expected regular file)"); } - upload_list.push((BackupType::LOGFILE, filename.to_owned(), format!("{}.blob", target), metadata.len())); - } - _ => { - bail!("got unknown archive extension '{}'", extension); + upload_list.push((BackupSpecificationType::LOGFILE, filename.to_owned(), format!("{}.blob", target), metadata.len())); } } } let backup_time = Utc.timestamp(backup_time_opt.unwrap_or_else(|| Utc::now().timestamp()), 0); - let client = HttpClient::new(repo.host(), repo.user(), None)?; + let client = connect(repo.host(), repo.user())?; record_repository(&repo); println!("Starting backup: {}/{}/{}", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)); @@ -711,7 +902,7 @@ fn create_backup( let (crypt_config, rsa_encrypted_key) = match keyfile { None => (None, None), Some(path) => { - let (key, created) = load_and_decrtypt_key(&path, &get_encryption_key_password)?; + let (key, created) = load_and_decrypt_key(&path, &key::get_encryption_key_password)?; let crypt_config = CryptConfig::new(key)?; @@ -727,122 +918,146 @@ fn create_backup( } }; - async_main(async move { - let client = BackupWriter::start( - client, - repo.store(), - backup_type, - &backup_id, - backup_time, - verbose, - ).await?; - - let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp()); - let mut manifest = BackupManifest::new(snapshot); - - let (catalog, catalog_result_rx) = spawn_catalog_upload(client.clone(), crypt_config.clone())?; - - for (backup_type, filename, target, size) in upload_list { - match backup_type { - BackupType::CONFIG => { - println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target); - let stats = client - .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) - .await?; - manifest.add_file(target, stats.size, stats.csum); - } - BackupType::LOGFILE => { // fixme: remove - not needed anymore ? - println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target); - let stats = client - .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) - .await?; - manifest.add_file(target, stats.size, stats.csum); - } - BackupType::PXAR => { - println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target); - catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?; - let stats = backup_directory( - &client, - &filename, - &target, - chunk_size_opt, - devices.clone(), - verbose, - skip_lost_and_found, - crypt_config.clone(), - catalog.clone(), - ).await?; - manifest.add_file(target, stats.size, stats.csum); - catalog.lock().unwrap().end_directory()?; - } - BackupType::IMAGE => { - println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); - let stats = backup_image( - &client, - &filename, - &target, - size, - chunk_size_opt, - verbose, - crypt_config.clone(), - ).await?; - manifest.add_file(target, stats.size, stats.csum); + let client = BackupWriter::start( + client, + crypt_config.clone(), + repo.store(), + backup_type, + &backup_id, + backup_time, + verbose, + ).await?; + + let previous_manifest = if let Ok(previous_manifest) = client.download_previous_manifest().await { + Some(Arc::new(previous_manifest)) + } else { + None + }; + + let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp()); + let mut manifest = BackupManifest::new(snapshot); + + let mut catalog = None; + let mut catalog_result_tx = None; + + for (backup_type, filename, target, size) in upload_list { + match backup_type { + BackupSpecificationType::CONFIG => { + println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target); + let stats = client + .upload_blob_from_file(&filename, &target, true, crypt_mode) + .await?; + manifest.add_file(target, stats.size, stats.csum, crypt_mode)?; + } + BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ? + println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target); + let stats = client + .upload_blob_from_file(&filename, &target, true, crypt_mode) + .await?; + manifest.add_file(target, stats.size, stats.csum, crypt_mode)?; + } + BackupSpecificationType::PXAR => { + // start catalog upload on first use + if catalog.is_none() { + let (cat, res) = spawn_catalog_upload(client.clone(), crypt_mode)?; + catalog = Some(cat); + catalog_result_tx = Some(res); } + let catalog = catalog.as_ref().unwrap(); + + println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target); + catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?; + let stats = backup_directory( + &client, + crypt_mode, + previous_manifest.clone(), + &filename, + &target, + chunk_size_opt, + devices.clone(), + verbose, + skip_lost_and_found, + catalog.clone(), + pattern_list.clone(), + entries_max as usize, + ).await?; + manifest.add_file(target, stats.size, stats.csum, crypt_mode)?; + catalog.lock().unwrap().end_directory()?; + } + BackupSpecificationType::IMAGE => { + println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); + let stats = backup_image( + &client, + crypt_mode, + previous_manifest.clone(), + &filename, + &target, + size, + chunk_size_opt, + verbose, + ).await?; + manifest.add_file(target, stats.size, stats.csum, crypt_mode)?; } } + } - // finalize and upload catalog - if upload_catalog { - let mutex = Arc::try_unwrap(catalog) - .map_err(|_| format_err!("unable to get catalog (still used)"))?; - let mut catalog = mutex.into_inner().unwrap(); + // finalize and upload catalog + if let Some(catalog) = catalog { + let mutex = Arc::try_unwrap(catalog) + .map_err(|_| format_err!("unable to get catalog (still used)"))?; + let mut catalog = mutex.into_inner().unwrap(); - catalog.finish()?; + catalog.finish()?; - drop(catalog); // close upload stream + drop(catalog); // close upload stream + if let Some(catalog_result_rx) = catalog_result_tx { let stats = catalog_result_rx.await??; - - manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum); + manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum, crypt_mode)?; } + } - if let Some(rsa_encrypted_key) = rsa_encrypted_key { - let target = "rsa-encrypted.key"; - println!("Upload RSA encoded key to '{:?}' as {}", repo, target); - let stats = client - .upload_blob_from_data(rsa_encrypted_key, target, None, false, false) - .await?; - manifest.add_file(format!("{}.blob", target), stats.size, stats.csum); - - // openssl rsautl -decrypt -inkey master-private.pem -in rsa-encrypted.key -out t - /* - let mut buffer2 = vec![0u8; rsa.size() as usize]; - let pem_data = file_get_contents("master-private.pem")?; - let rsa = openssl::rsa::Rsa::private_key_from_pem(&pem_data)?; - let len = rsa.private_decrypt(&buffer, &mut buffer2, openssl::rsa::Padding::PKCS1)?; - println!("TEST {} {:?}", len, buffer2); - */ - } + if let Some(rsa_encrypted_key) = rsa_encrypted_key { + let target = "rsa-encrypted.key"; + println!("Upload RSA encoded key to '{:?}' as {}", repo, target); + let stats = client + .upload_blob_from_data(rsa_encrypted_key, target, false, CryptMode::None) + .await?; + manifest.add_file(format!("{}.blob", target), stats.size, stats.csum, crypt_mode)?; + + // openssl rsautl -decrypt -inkey master-private.pem -in rsa-encrypted.key -out t + /* + let mut buffer2 = vec![0u8; rsa.size() as usize]; + let pem_data = file_get_contents("master-private.pem")?; + let rsa = openssl::rsa::Rsa::private_key_from_pem(&pem_data)?; + let len = rsa.private_decrypt(&buffer, &mut buffer2, openssl::rsa::Padding::PKCS1)?; + println!("TEST {} {:?}", len, buffer2); + */ + } - // create manifest (index.json) - let manifest = manifest.into_json(); + // create manifest (index.json) + let manifest = manifest.into_json(); - println!("Upload index.json to '{:?}'", repo); - let manifest = serde_json::to_string_pretty(&manifest)?.into(); - client - .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, crypt_config.clone(), true, true) - .await?; + println!("Upload index.json to '{:?}'", repo); + let manifest = serde_json::to_string_pretty(&manifest)?.into(); + // manifests are never encrypted + let manifest_crypt_mode = match crypt_mode { + CryptMode::None => CryptMode::None, + _ => CryptMode::SignOnly, + }; + client + .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, true, manifest_crypt_mode) + .await?; - client.finish().await?; + client.finish().await?; - let end_time = Local::now(); - let elapsed = end_time.signed_duration_since(start_time); - println!("Duration: {}", elapsed); + let end_time = Local::now(); + let elapsed = end_time.signed_duration_since(start_time); + println!("Duration: {}", elapsed); - println!("End Time: {}", end_time.to_rfc3339_opts(chrono::SecondsFormat::Secs, false)); + println!("End Time: {}", end_time.to_rfc3339_opts(chrono::SecondsFormat::Secs, false)); - Ok(Value::Null) - }) + Ok(Value::Null) } fn complete_backup_source(arg: &str, param: &HashMap) -> Vec { @@ -866,15 +1081,7 @@ fn complete_backup_source(arg: &str, param: &HashMap) -> Vec Result { - async_main(restore_do(param)) -} - -fn dump_image( +async fn dump_image( client: Arc, crypt_config: Option>, index: FixedIndexReader, @@ -884,7 +1091,7 @@ fn dump_image( let most_used = index.find_most_used_chunks(8); - let mut chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used); + let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used); // Note: we avoid using BufferedFixedReader, because that add an additional buffer/copy // and thus slows down reading. Instead, directly use RemoteChunkReader @@ -894,7 +1101,7 @@ fn dump_image( for pos in 0..index.index_count() { let digest = index.index_digest(pos).unwrap(); - let raw_data = chunk_reader.read_chunk(&digest)?; + let raw_data = chunk_reader.read_chunk(&digest).await?; writer.write_all(&raw_data)?; bytes += raw_data.len(); if verbose { @@ -919,7 +1126,59 @@ fn dump_image( Ok(()) } -async fn restore_do(param: Value) -> Result { +fn parse_archive_type(name: &str) -> (String, ArchiveType) { + if name.ends_with(".didx") || name.ends_with(".fidx") || name.ends_with(".blob") { + (name.into(), archive_type(name).unwrap()) + } else if name.ends_with(".pxar") { + (format!("{}.didx", name), ArchiveType::DynamicIndex) + } else if name.ends_with(".img") { + (format!("{}.fidx", name), ArchiveType::FixedIndex) + } else { + (format!("{}.blob", name), ArchiveType::Blob) + } +} + +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + snapshot: { + type: String, + description: "Group/Snapshot path.", + }, + "archive-name": { + description: "Backup archive name.", + type: String, + }, + target: { + type: String, + description: r###"Target directory path. Use '-' to write to standard output. + +We do not extraxt '.pxar' archives when writing to standard output. + +"### + }, + "allow-existing-dirs": { + type: Boolean, + description: "Do not fail if directories already exists.", + optional: true, + }, + keyfile: { + schema: KEYFILE_SCHEMA, + optional: true, + }, + "crypt-mode": { + type: CryptMode, + optional: true, + }, + } + } +)] +/// Restore backup repository. +async fn restore(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; let verbose = param["verbose"].as_bool().unwrap_or(false); @@ -928,55 +1187,33 @@ async fn restore_do(param: Value) -> Result { let archive_name = tools::required_string_param(¶m, "archive-name")?; - let client = HttpClient::new(repo.host(), repo.user(), None)?; + let client = connect(repo.host(), repo.user())?; record_repository(&repo); let path = tools::required_string_param(¶m, "snapshot")?; let (backup_type, backup_id, backup_time) = if path.matches('/').count() == 1 { - let group = BackupGroup::parse(path)?; - - let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store()); - let result = client.get(&path, Some(json!({ - "backup-type": group.backup_type(), - "backup-id": group.backup_id(), - }))).await?; - - let list = result["data"].as_array().unwrap(); - if list.is_empty() { - bail!("backup group '{}' does not contain any snapshots:", path); - } - - let epoch = list[0]["backup-time"].as_i64().unwrap(); - let backup_time = Utc.timestamp(epoch, 0); - (group.backup_type().to_owned(), group.backup_id().to_owned(), backup_time) + let group: BackupGroup = path.parse()?; + api_datastore_latest_snapshot(&client, repo.store(), group).await? } else { - let snapshot = BackupDir::parse(path)?; + let snapshot: BackupDir = path.parse()?; (snapshot.group().backup_type().to_owned(), snapshot.group().backup_id().to_owned(), snapshot.backup_time()) }; let target = tools::required_string_param(¶m, "target")?; let target = if target == "-" { None } else { Some(target) }; - let keyfile = param["keyfile"].as_str().map(PathBuf::from); + let (keyfile, _crypt_mode) = keyfile_parameters(¶m)?; let crypt_config = match keyfile { None => None, Some(path) => { - let (key, _) = load_and_decrtypt_key(&path, &get_encryption_key_password)?; + let (key, _) = load_and_decrypt_key(&path, &key::get_encryption_key_password)?; Some(Arc::new(CryptConfig::new(key)?)) } }; - let server_archive_name = if archive_name.ends_with(".pxar") { - format!("{}.didx", archive_name) - } else if archive_name.ends_with(".img") { - format!("{}.fidx", archive_name) - } else { - format!("{}.blob", archive_name) - }; - let client = BackupReader::start( client, crypt_config.clone(), @@ -989,10 +1226,12 @@ async fn restore_do(param: Value) -> Result { let manifest = client.download_manifest().await?; - if server_archive_name == MANIFEST_BLOB_NAME { + let (archive_name, archive_type) = parse_archive_type(archive_name); + + if archive_name == MANIFEST_BLOB_NAME { let backup_index_data = manifest.into_json().to_string(); if let Some(target) = target { - file_set_contents(target, backup_index_data.as_bytes(), None)?; + replace_file(target, backup_index_data.as_bytes(), CreateOptions::new())?; } else { let stdout = std::io::stdout(); let mut writer = stdout.lock(); @@ -1000,9 +1239,9 @@ async fn restore_do(param: Value) -> Result { .map_err(|err| format_err!("unable to pipe data - {}", err))?; } - } else if server_archive_name.ends_with(".blob") { + } else if archive_type == ArchiveType::Blob { - let mut reader = client.download_blob(&manifest, &server_archive_name).await?; + let mut reader = client.download_blob(&manifest, &archive_name).await?; if let Some(target) = target { let mut writer = std::fs::OpenOptions::new() @@ -1019,9 +1258,9 @@ async fn restore_do(param: Value) -> Result { .map_err(|err| format_err!("unable to pipe data - {}", err))?; } - } else if server_archive_name.ends_with(".didx") { + } else if archive_type == ArchiveType::DynamicIndex { - let index = client.download_dynamic_index(&manifest, &server_archive_name).await?; + let index = client.download_dynamic_index(&manifest, &archive_name).await?; let most_used = index.find_most_used_chunks(8); @@ -1030,18 +1269,19 @@ async fn restore_do(param: Value) -> Result { let mut reader = BufferedDynamicReader::new(index, chunk_reader); if let Some(target) = target { - - let feature_flags = pxar::flags::DEFAULT; - let mut decoder = pxar::SequentialDecoder::new(&mut reader, feature_flags); - decoder.set_callback(move |path| { - if verbose { - eprintln!("{:?}", path); - } - Ok(()) - }); - decoder.set_allow_existing_dirs(allow_existing_dirs); - - decoder.restore(Path::new(target), &Vec::new())?; + proxmox_backup::pxar::extract_archive( + pxar::decoder::Decoder::from_std(reader)?, + Path::new(target), + &[], + proxmox_backup::pxar::Flags::DEFAULT, + allow_existing_dirs, + |path| { + if verbose { + println!("{:?}", path); + } + }, + ) + .map_err(|err| format_err!("error extracting archive - {}", err))?; } else { let mut writer = std::fs::OpenOptions::new() .write(true) @@ -1051,9 +1291,9 @@ async fn restore_do(param: Value) -> Result { std::io::copy(&mut reader, &mut writer) .map_err(|err| format_err!("unable to pipe data - {}", err))?; } - } else if server_archive_name.ends_with(".fidx") { + } else if archive_type == ArchiveType::FixedIndex { - let index = client.download_fixed_index(&manifest, &server_archive_name).await?; + let index = client.download_fixed_index(&manifest, &archive_name).await?; let mut writer = if let Some(target) = target { std::fs::OpenOptions::new() @@ -1069,35 +1309,55 @@ async fn restore_do(param: Value) -> Result { .map_err(|err| format_err!("unable to open /dev/stdout - {}", err))? }; - dump_image(client.clone(), crypt_config.clone(), index, &mut writer, verbose)?; - - } else { - bail!("unknown archive file extension (expected .pxar of .img)"); + dump_image(client.clone(), crypt_config.clone(), index, &mut writer, verbose).await?; } Ok(Value::Null) } -fn upload_log( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + snapshot: { + type: String, + description: "Group/Snapshot path.", + }, + logfile: { + type: String, + description: "The path to the log file you want to upload.", + }, + keyfile: { + schema: KEYFILE_SCHEMA, + optional: true, + }, + "crypt-mode": { + type: CryptMode, + optional: true, + }, + } + } +)] +/// Upload backup log file. +async fn upload_log(param: Value) -> Result { let logfile = tools::required_string_param(¶m, "logfile")?; let repo = extract_repository_from_value(¶m)?; let snapshot = tools::required_string_param(¶m, "snapshot")?; - let snapshot = BackupDir::parse(snapshot)?; + let snapshot: BackupDir = snapshot.parse()?; - let mut client = HttpClient::new(repo.host(), repo.user(), None)?; + let mut client = connect(repo.host(), repo.user())?; - let keyfile = param["keyfile"].as_str().map(PathBuf::from); + let (keyfile, crypt_mode) = keyfile_parameters(¶m)?; let crypt_config = match keyfile { None => None, Some(path) => { - let (key, _created) = load_and_decrtypt_key(&path, &get_encryption_key_password)?; + let (key, _created) = load_and_decrypt_key(&path, &key::get_encryption_key_password)?; let crypt_config = CryptConfig::new(key)?; Some(Arc::new(crypt_config)) } @@ -1105,7 +1365,19 @@ fn upload_log( let data = file_get_contents(logfile)?; - let blob = DataBlob::encode(&data, crypt_config.as_ref().map(Arc::as_ref), true)?; + let blob = match crypt_mode { + CryptMode::None => DataBlob::encode(&data, None, true)?, + CryptMode::Encrypt => { + DataBlob::encode(&data, crypt_config.as_ref().map(Arc::as_ref), true)? + } + CryptMode::SignOnly => DataBlob::create_signed( + &data, + crypt_config + .ok_or_else(|| format_err!("cannot sign without crypt config"))? + .as_ref(), + true, + )?, + }; let raw_data = blob.into_inner(); @@ -1119,151 +1391,154 @@ fn upload_log( let body = hyper::Body::from(raw_data); - async_main(async move { - client.upload("application/octet-stream", body, &path, Some(args)).await - }) + client.upload("application/octet-stream", body, &path, Some(args)).await } -fn display_task_log( - client: HttpClient, - upid_str: &str, -) -> Result<(), Error> { - println!("TESTLOG {}", upid_str); - - let path = format!("api2/json/nodes/localhost/tasks/{}/log", upid_str); - - let mut start = 1; - let limit = 500; - - loop { - let param = json!({ "start": start, "limit": limit, "test-status": true }); - let result = async_main(async { client.get(&path, Some(param)).await })?; - - let active = result["active"].as_bool().unwrap(); - let total = result["total"].as_u64().unwrap(); - let data = result["data"].as_array().unwrap(); - - let lines = data.len(); - - for item in data { - let n = item["n"].as_u64().unwrap(); - let t = item["t"].as_str().unwrap(); - if n != start { bail!("got wrong line number in response data ({} != {}", n, start); } - start += 1; - println!("{}", t); - } - - if start > total { - if active { - std::thread::sleep(std::time::Duration::from_millis(1000)); - } else { - break; - } - } else { - if lines != limit { bail!("got wrong number of lines from server ({} != {})", lines, limit); } - } - } - - Ok(()) -} - -fn prune( - mut param: Value, +const API_METHOD_PRUNE: ApiMethod = ApiMethod::new( + &ApiHandler::Async(&prune), + &ObjectSchema::new( + "Prune a backup repository.", + &proxmox_backup::add_common_prune_prameters!([ + ("dry-run", true, &BooleanSchema::new( + "Just show what prune would do, but do not delete anything.") + .schema()), + ("group", false, &StringSchema::new("Backup group.").schema()), + ], [ + ("output-format", true, &OUTPUT_FORMAT), + ( + "quiet", + true, + &BooleanSchema::new("Minimal output - only show removals.") + .schema() + ), + ("repository", true, &REPO_URL_SCHEMA), + ]) + ) +); + +fn prune<'a>( + param: Value, _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + _rpcenv: &'a mut dyn RpcEnvironment, +) -> proxmox::api::ApiFuture<'a> { + async move { + prune_async(param).await + }.boxed() +} +async fn prune_async(mut param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; - let mut client = HttpClient::new(repo.host(), repo.user(), None)?; + let mut client = connect(repo.host(), repo.user())?; let path = format!("api2/json/admin/datastore/{}/prune", repo.store()); let group = tools::required_string_param(¶m, "group")?; - let group = BackupGroup::parse(group)?; - let output_format = param["output-format"].as_str().unwrap_or("text").to_owned(); + let group: BackupGroup = group.parse()?; + + let output_format = get_output_format(¶m); - let dry_run = param["dry-run"].as_bool().unwrap_or(false); + let quiet = param["quiet"].as_bool().unwrap_or(false); param.as_object_mut().unwrap().remove("repository"); param.as_object_mut().unwrap().remove("group"); - param.as_object_mut().unwrap().remove("dry-run"); param.as_object_mut().unwrap().remove("output-format"); + param.as_object_mut().unwrap().remove("quiet"); param["backup-type"] = group.backup_type().into(); param["backup-id"] = group.backup_id().into(); - if dry_run { - let result = async_main(async { client.get(&path, Some(param)).await })?; - let data = &result["data"]; - - if output_format == "text" { - for item in data.as_array().unwrap() { - let timestamp = item["backup-time"].as_i64().unwrap(); - let timestamp = BackupDir::backup_time_to_string(Utc.timestamp(timestamp, 0)); - let keep = item["keep"].as_bool().unwrap(); - println!("{}/{}/{} {}", - group.backup_type(), - group.backup_id(), - timestamp, - if keep { "keep" } else { "remove" }, - ); - } - } else { - format_and_print_result(&data, &output_format); - } - } else { - let result = async_main(async { client.post(&path, Some(param)).await })?; - let data = &result["data"]; - if output_format == "text" { - if let Some(upid) = data.as_str() { - println!("UPID {:?}", data); - display_task_log(client, upid)?; - } - } else { - format_and_print_result(&data, &output_format); - } - } + let mut result = client.post(&path, Some(param)).await?; + record_repository(&repo); + let render_snapshot_path = |_v: &Value, record: &Value| -> Result { + let item: PruneListItem = serde_json::from_value(record.to_owned())?; + let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time); + Ok(snapshot.relative_path().to_str().unwrap().to_owned()) + }; + + let render_prune_action = |v: &Value, _record: &Value| -> Result { + Ok(match v.as_bool() { + Some(true) => "keep", + Some(false) => "remove", + None => "unknown", + }.to_string()) + }; + + let options = default_table_format_options() + .sortby("backup-type", false) + .sortby("backup-id", false) + .sortby("backup-time", false) + .column(ColumnConfig::new("backup-id").renderer(render_snapshot_path).header("snapshot")) + .column(ColumnConfig::new("backup-time").renderer(tools::format::render_epoch).header("date")) + .column(ColumnConfig::new("keep").renderer(render_prune_action).header("action")) + ; + + let info = &proxmox_backup::api2::admin::datastore::API_RETURN_SCHEMA_PRUNE; + + let mut data = result["data"].take(); + + if quiet { + let list: Vec = data.as_array().unwrap().iter().filter(|item| { + item["keep"].as_bool() == Some(false) + }).map(|v| v.clone()).collect(); + data = list.into(); + } + + format_and_print_result_full(&mut data, info, &output_format, &options); + Ok(Value::Null) } -fn status( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { +#[api( + input: { + properties: { + repository: { + schema: REPO_URL_SCHEMA, + optional: true, + }, + "output-format": { + schema: OUTPUT_FORMAT, + optional: true, + }, + } + } +)] +/// Get repository status. +async fn status(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; - let output_format = param["output-format"].as_str().unwrap_or("text").to_owned(); + let output_format = get_output_format(¶m); - let client = HttpClient::new(repo.host(), repo.user(), None)?; + let client = connect(repo.host(), repo.user())?; let path = format!("api2/json/admin/datastore/{}/status", repo.store()); - let result = async_main(async move { client.get(&path, None).await })?; - let data = &result["data"]; + let mut result = client.get(&path, None).await?; + let mut data = result["data"].take(); record_repository(&repo); - if output_format == "text" { - let total = data["total"].as_u64().unwrap(); - let used = data["used"].as_u64().unwrap(); - let avail = data["avail"].as_u64().unwrap(); + let render_total_percentage = |v: &Value, record: &Value| -> Result { + let v = v.as_u64().unwrap(); + let total = record["total"].as_u64().unwrap(); let roundup = total/200; + let per = ((v+roundup)*100)/total; + let info = format!(" ({} %)", per); + Ok(format!("{} {:>8}", v, info)) + }; - println!( - "total: {} used: {} ({} %) available: {}", - total, - used, - ((used+roundup)*100)/total, - avail, - ); - } else { - format_and_print_result(data, &output_format); - } + let options = default_table_format_options() + .noheader(true) + .column(ColumnConfig::new("total").renderer(render_total_percentage)) + .column(ColumnConfig::new("used").renderer(render_total_percentage)) + .column(ColumnConfig::new("avail").renderer(render_total_percentage)); + + let schema = &proxmox_backup::api2::admin::datastore::API_RETURN_SCHEMA_STATUS; + + format_and_print_result_full(&mut data, schema, &output_format, &options); Ok(Value::Null) } @@ -1271,7 +1546,18 @@ fn status( // like get, but simply ignore errors and return Null instead async fn try_get(repo: &BackupRepository, url: &str) -> Value { - let client = match HttpClient::new(repo.host(), repo.user(), None) { + let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok(); + let password = std::env::var(ENV_VAR_PBS_PASSWORD).ok(); + + let options = HttpClientOptions::new() + .prefix(Some("proxmox-backup".to_string())) + .password(password) + .interactive(false) + .fingerprint(fingerprint) + .fingerprint_cache(true) + .ticket_cache(true); + + let client = match HttpClient::new(repo.host(), repo.user(), options) { Ok(v) => v, _ => return Value::Null, }; @@ -1290,7 +1576,7 @@ async fn try_get(repo: &BackupRepository, url: &str) -> Value { } fn complete_backup_group(_arg: &str, param: &HashMap) -> Vec { - async_main(async { complete_backup_group_do(param).await }) + proxmox_backup::tools::runtime::main(async { complete_backup_group_do(param).await }) } async fn complete_backup_group_do(param: &HashMap) -> Vec { @@ -1319,8 +1605,8 @@ async fn complete_backup_group_do(param: &HashMap) -> Vec) -> Vec { - async_main(async { complete_group_or_snapshot_do(arg, param).await }) +pub fn complete_group_or_snapshot(arg: &str, param: &HashMap) -> Vec { + proxmox_backup::tools::runtime::main(async { complete_group_or_snapshot_do(arg, param).await }) } async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap) -> Vec { @@ -1339,7 +1625,7 @@ async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap) -> Vec { - async_main(async { complete_backup_snapshot_do(param).await }) + proxmox_backup::tools::runtime::main(async { complete_backup_snapshot_do(param).await }) } async fn complete_backup_snapshot_do(param: &HashMap) -> Vec { @@ -1370,7 +1656,7 @@ async fn complete_backup_snapshot_do(param: &HashMap) -> Vec) -> Vec { - async_main(async { complete_server_file_name_do(param).await }) + proxmox_backup::tools::runtime::main(async { complete_server_file_name_do(param).await }) } async fn complete_server_file_name_do(param: &HashMap) -> Vec { @@ -1382,9 +1668,9 @@ async fn complete_server_file_name_do(param: &HashMap) -> Vec return result, }; - let snapshot = match param.get("snapshot") { + let snapshot: BackupDir = match param.get("snapshot") { Some(path) => { - match BackupDir::parse(path) { + match path.parse() { Ok(v) => v, _ => return result, } @@ -1416,7 +1702,21 @@ async fn complete_server_file_name_do(param: &HashMap) -> Vec) -> Vec { complete_server_file_name(arg, param) .iter() - .map(|v| strip_server_file_expenstion(&v)) + .map(|v| tools::format::strip_server_file_expenstion(&v)) + .collect() +} + +pub fn complete_pxar_archive_name(arg: &str, param: &HashMap) -> Vec { + complete_server_file_name(arg, param) + .iter() + .filter_map(|v| { + let name = tools::format::strip_server_file_expenstion(&v); + if name.ends_with(".pxar") { + Some(name) + } else { + None + } + }) .collect() } @@ -1434,69 +1734,6 @@ fn complete_chunk_size(_arg: &str, _param: &HashMap) -> Vec Result, Error> { - - // fixme: implement other input methods - - use std::env::VarError::*; - match std::env::var("PBS_ENCRYPTION_PASSWORD") { - Ok(p) => return Ok(p.as_bytes().to_vec()), - Err(NotUnicode(_)) => bail!("PBS_ENCRYPTION_PASSWORD contains bad characters"), - Err(NotPresent) => { - // Try another method - } - } - - // If we're on a TTY, query the user for a password - if crate::tools::tty::stdin_isatty() { - return Ok(crate::tools::tty::read_password("Encryption Key Password: ")?); - } - - bail!("no password input mechanism available"); -} - -fn key_create( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { - - let path = tools::required_string_param(¶m, "path")?; - let path = PathBuf::from(path); - - let kdf = param["kdf"].as_str().unwrap_or("scrypt"); - - let key = proxmox::sys::linux::random_data(32)?; - - if kdf == "scrypt" { - // always read passphrase from tty - if !crate::tools::tty::stdin_isatty() { - bail!("unable to read passphrase - no tty"); - } - - let password = crate::tools::tty::read_password("Encryption Key Password: ")?; - - let key_config = encrypt_key_with_passphrase(&key, &password)?; - - store_key_config(&path, false, key_config)?; - - Ok(Value::Null) - } else if kdf == "none" { - let created = Local.timestamp(Local::now().timestamp(), 0); - - store_key_config(&path, false, KeyConfig { - kdf: None, - created, - modified: created, - data: key, - })?; - - Ok(Value::Null) - } else { - unreachable!(); - } -} - fn master_pubkey_path() -> Result { let base = BaseDirectories::with_prefix("proxmox-backup")?; @@ -1506,518 +1743,50 @@ fn master_pubkey_path() -> Result { Ok(path) } -fn key_import_master_pubkey( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { - - let path = tools::required_string_param(¶m, "path")?; - let path = PathBuf::from(path); - - let pem_data = file_get_contents(&path)?; - - if let Err(err) = openssl::pkey::PKey::public_key_from_pem(&pem_data) { - bail!("Unable to decode PEM data - {}", err); - } - - let target_path = master_pubkey_path()?; - - file_set_contents(&target_path, &pem_data, None)?; - - println!("Imported public master key to {:?}", target_path); - - Ok(Value::Null) -} - -fn key_create_master_key( - _param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { - - // we need a TTY to query the new password - if !crate::tools::tty::stdin_isatty() { - bail!("unable to create master key - no tty"); - } - - let rsa = openssl::rsa::Rsa::generate(4096)?; - let pkey = openssl::pkey::PKey::from_rsa(rsa)?; - - let new_pw = String::from_utf8(crate::tools::tty::read_password("Master Key Password: ")?)?; - let verify_pw = String::from_utf8(crate::tools::tty::read_password("Verify Password: ")?)?; - - if new_pw != verify_pw { - bail!("Password verification fail!"); - } - - if new_pw.len() < 5 { - bail!("Password is too short!"); - } - - let pub_key: Vec = pkey.public_key_to_pem()?; - let filename_pub = "master-public.pem"; - println!("Writing public master key to {}", filename_pub); - file_set_contents(filename_pub, pub_key.as_slice(), None)?; - - let cipher = openssl::symm::Cipher::aes_256_cbc(); - let priv_key: Vec = pkey.private_key_to_pem_pkcs8_passphrase(cipher, new_pw.as_bytes())?; - - let filename_priv = "master-private.pem"; - println!("Writing private master key to {}", filename_priv); - file_set_contents(filename_priv, priv_key.as_slice(), None)?; - - Ok(Value::Null) +use proxmox_backup::client::RemoteChunkReader; +/// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better +/// async use! +/// +/// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture`, +/// so that we can properly access it from multiple threads simultaneously while not issuing +/// duplicate simultaneous reads over http. +pub struct BufferedDynamicReadAt { + inner: Mutex>, } -fn key_change_passphrase( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { - - let path = tools::required_string_param(¶m, "path")?; - let path = PathBuf::from(path); - - let kdf = param["kdf"].as_str().unwrap_or("scrypt"); - - // we need a TTY to query the new password - if !crate::tools::tty::stdin_isatty() { - bail!("unable to change passphrase - no tty"); - } - - let (key, created) = load_and_decrtypt_key(&path, &get_encryption_key_password)?; - - if kdf == "scrypt" { - - let new_pw = String::from_utf8(crate::tools::tty::read_password("New Password: ")?)?; - let verify_pw = String::from_utf8(crate::tools::tty::read_password("Verify Password: ")?)?; - - if new_pw != verify_pw { - bail!("Password verification fail!"); - } - - if new_pw.len() < 5 { - bail!("Password is too short!"); +impl BufferedDynamicReadAt { + fn new(inner: BufferedDynamicReader) -> Self { + Self { + inner: Mutex::new(inner), } - - let mut new_key_config = encrypt_key_with_passphrase(&key, new_pw.as_bytes())?; - new_key_config.created = created; // keep original value - - store_key_config(&path, true, new_key_config)?; - - Ok(Value::Null) - } else if kdf == "none" { - let modified = Local.timestamp(Local::now().timestamp(), 0); - - store_key_config(&path, true, KeyConfig { - kdf: None, - created, // keep original value - modified, - data: key.to_vec(), - })?; - - Ok(Value::Null) - } else { - unreachable!(); } } -fn key_mgmt_cli() -> CliCommandMap { - - const KDF_SCHEMA: Schema = - StringSchema::new("Key derivation function. Choose 'none' to store the key unecrypted.") - .format(&ApiStringFormat::Enum(&["scrypt", "none"])) - .default("scrypt") - .schema(); - - #[sortable] - const API_METHOD_KEY_CREATE: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&key_create), - &ObjectSchema::new( - "Create a new encryption key.", - &sorted!([ - ("path", false, &StringSchema::new("File system path.").schema()), - ("kdf", true, &KDF_SCHEMA), - ]), - ) - ); - - let key_create_cmd_def = CliCommand::new(&API_METHOD_KEY_CREATE) - .arg_param(&["path"]) - .completion_cb("path", tools::complete_file_name); - - #[sortable] - const API_METHOD_KEY_CHANGE_PASSPHRASE: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&key_change_passphrase), - &ObjectSchema::new( - "Change the passphrase required to decrypt the key.", - &sorted!([ - ("path", false, &StringSchema::new("File system path.").schema()), - ("kdf", true, &KDF_SCHEMA), - ]), - ) - ); - - let key_change_passphrase_cmd_def = CliCommand::new(&API_METHOD_KEY_CHANGE_PASSPHRASE) - .arg_param(&["path"]) - .completion_cb("path", tools::complete_file_name); - - const API_METHOD_KEY_CREATE_MASTER_KEY: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&key_create_master_key), - &ObjectSchema::new("Create a new 4096 bit RSA master pub/priv key pair.", &[]) - ); - - let key_create_master_key_cmd_def = CliCommand::new(&API_METHOD_KEY_CREATE_MASTER_KEY); - - #[sortable] - const API_METHOD_KEY_IMPORT_MASTER_PUBKEY: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&key_import_master_pubkey), - &ObjectSchema::new( - "Import a new RSA public key and use it as master key. The key is expected to be in '.pem' format.", - &sorted!([ ("path", false, &StringSchema::new("File system path.").schema()) ]), - ) - ); - - let key_import_master_pubkey_cmd_def = CliCommand::new(&API_METHOD_KEY_IMPORT_MASTER_PUBKEY) - .arg_param(&["path"]) - .completion_cb("path", tools::complete_file_name); - - CliCommandMap::new() - .insert("create".to_owned(), key_create_cmd_def.into()) - .insert("create-master-key".to_owned(), key_create_master_key_cmd_def.into()) - .insert("import-master-pubkey".to_owned(), key_import_master_pubkey_cmd_def.into()) - .insert("change-passphrase".to_owned(), key_change_passphrase_cmd_def.into()) -} - -fn mount( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { - let verbose = param["verbose"].as_bool().unwrap_or(false); - if verbose { - // This will stay in foreground with debug output enabled as None is - // passed for the RawFd. - return async_main(mount_do(param, None)); +impl ReadAt for BufferedDynamicReadAt { + fn start_read_at<'a>( + self: Pin<&'a Self>, + _cx: &mut Context, + buf: &'a mut [u8], + offset: u64, + ) -> MaybeReady, ReadAtOperation<'a>> { + use std::io::Read; + MaybeReady::Ready(tokio::task::block_in_place(move || { + let mut reader = self.inner.lock().unwrap(); + reader.seek(SeekFrom::Start(offset))?; + Ok(reader.read(buf)?) + })) } - // Process should be deamonized. - // Make sure to fork before the async runtime is instantiated to avoid troubles. - let pipe = pipe()?; - match fork() { - Ok(ForkResult::Parent { .. }) => { - nix::unistd::close(pipe.1).unwrap(); - // Blocks the parent process until we are ready to go in the child - let _res = nix::unistd::read(pipe.0, &mut [0]).unwrap(); - Ok(Value::Null) - } - Ok(ForkResult::Child) => { - nix::unistd::close(pipe.0).unwrap(); - nix::unistd::setsid().unwrap(); - async_main(mount_do(param, Some(pipe.1))) - } - Err(_) => bail!("failed to daemonize process"), + fn poll_complete<'a>( + self: Pin<&'a Self>, + _op: ReadAtOperation<'a>, + ) -> MaybeReady, ReadAtOperation<'a>> { + panic!("LocalDynamicReadAt::start_read_at returned Pending"); } } -async fn mount_do(param: Value, pipe: Option) -> Result { - let repo = extract_repository_from_value(¶m)?; - let archive_name = tools::required_string_param(¶m, "archive-name")?; - let target = tools::required_string_param(¶m, "target")?; - let client = HttpClient::new(repo.host(), repo.user(), None)?; - - record_repository(&repo); - - let path = tools::required_string_param(¶m, "snapshot")?; - let (backup_type, backup_id, backup_time) = if path.matches('/').count() == 1 { - let group = BackupGroup::parse(path)?; - - let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store()); - let result = client.get(&path, Some(json!({ - "backup-type": group.backup_type(), - "backup-id": group.backup_id(), - }))).await?; - - let list = result["data"].as_array().unwrap(); - if list.is_empty() { - bail!("backup group '{}' does not contain any snapshots:", path); - } - - let epoch = list[0]["backup-time"].as_i64().unwrap(); - let backup_time = Utc.timestamp(epoch, 0); - (group.backup_type().to_owned(), group.backup_id().to_owned(), backup_time) - } else { - let snapshot = BackupDir::parse(path)?; - (snapshot.group().backup_type().to_owned(), snapshot.group().backup_id().to_owned(), snapshot.backup_time()) - }; - - let keyfile = param["keyfile"].as_str().map(PathBuf::from); - let crypt_config = match keyfile { - None => None, - Some(path) => { - let (key, _) = load_and_decrtypt_key(&path, &get_encryption_key_password)?; - Some(Arc::new(CryptConfig::new(key)?)) - } - }; - - let server_archive_name = if archive_name.ends_with(".pxar") { - format!("{}.didx", archive_name) - } else { - bail!("Can only mount pxar archives."); - }; - - let client = BackupReader::start( - client, - crypt_config.clone(), - repo.store(), - &backup_type, - &backup_id, - backup_time, - true, - ).await?; - - let manifest = client.download_manifest().await?; - - if server_archive_name.ends_with(".didx") { - let index = client.download_dynamic_index(&manifest, &server_archive_name).await?; - let most_used = index.find_most_used_chunks(8); - let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used); - let reader = BufferedDynamicReader::new(index, chunk_reader); - let decoder = pxar::Decoder::new(reader)?; - let options = OsStr::new("ro,default_permissions"); - let mut session = pxar::fuse::Session::new(decoder, &options, pipe.is_none()) - .map_err(|err| format_err!("pxar mount failed: {}", err))?; - - // Mount the session but not call fuse deamonize as this will cause - // issues with the runtime after the fork - let deamonize = false; - session.mount(&Path::new(target), deamonize)?; - - if let Some(pipe) = pipe { - nix::unistd::chdir(Path::new("/")).unwrap(); - // Finish creation of deamon by redirecting filedescriptors. - let nullfd = nix::fcntl::open( - "/dev/null", - nix::fcntl::OFlag::O_RDWR, - nix::sys::stat::Mode::empty(), - ).unwrap(); - nix::unistd::dup2(nullfd, 0).unwrap(); - nix::unistd::dup2(nullfd, 1).unwrap(); - nix::unistd::dup2(nullfd, 2).unwrap(); - if nullfd > 2 { - nix::unistd::close(nullfd).unwrap(); - } - // Signal the parent process that we are done with the setup and it can - // terminate. - nix::unistd::write(pipe, &[0u8])?; - nix::unistd::close(pipe).unwrap(); - } - - let multithreaded = true; - session.run_loop(multithreaded)?; - } else { - bail!("unknown archive file extension (expected .pxar)"); - } - - Ok(Value::Null) -} - -fn shell( - param: Value, - _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { - async_main(catalog_shell(param)) -} - -async fn catalog_shell(param: Value) -> Result { - let repo = extract_repository_from_value(¶m)?; - let client = HttpClient::new(repo.host(), repo.user(), None)?; - let path = tools::required_string_param(¶m, "snapshot")?; - let archive_name = tools::required_string_param(¶m, "archive-name")?; - - let (backup_type, backup_id, backup_time) = if path.matches('/').count() == 1 { - let group = BackupGroup::parse(path)?; - - let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store()); - let result = client.get(&path, Some(json!({ - "backup-type": group.backup_type(), - "backup-id": group.backup_id(), - }))).await?; - - let list = result["data"].as_array().unwrap(); - if list.is_empty() { - bail!("backup group '{}' does not contain any snapshots:", path); - } - - let epoch = list[0]["backup-time"].as_i64().unwrap(); - let backup_time = Utc.timestamp(epoch, 0); - (group.backup_type().to_owned(), group.backup_id().to_owned(), backup_time) - } else { - let snapshot = BackupDir::parse(path)?; - (snapshot.group().backup_type().to_owned(), snapshot.group().backup_id().to_owned(), snapshot.backup_time()) - }; - - let keyfile = param["keyfile"].as_str().map(|p| PathBuf::from(p)); - let crypt_config = match keyfile { - None => None, - Some(path) => { - let (key, _) = load_and_decrtypt_key(&path, &get_encryption_key_password)?; - Some(Arc::new(CryptConfig::new(key)?)) - } - }; - - let server_archive_name = if archive_name.ends_with(".pxar") { - format!("{}.didx", archive_name) - } else { - bail!("Can only mount pxar archives."); - }; - - let client = BackupReader::start( - client, - crypt_config.clone(), - repo.store(), - &backup_type, - &backup_id, - backup_time, - true, - ).await?; - - let tmpfile = std::fs::OpenOptions::new() - .write(true) - .read(true) - .custom_flags(libc::O_TMPFILE) - .open("/tmp")?; - - let manifest = client.download_manifest().await?; - - let index = client.download_dynamic_index(&manifest, &server_archive_name).await?; - let most_used = index.find_most_used_chunks(8); - let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config.clone(), most_used); - let reader = BufferedDynamicReader::new(index, chunk_reader); - let mut decoder = pxar::Decoder::new(reader)?; - decoder.set_callback(|path| { - println!("{:?}", path); - Ok(()) - }); - - let tmpfile = client.download(CATALOG_NAME, tmpfile).await?; - let index = DynamicIndexReader::new(tmpfile) - .map_err(|err| format_err!("unable to read catalog index - {}", err))?; - - // Note: do not use values stored in index (not trusted) - instead, computed them again - let (csum, size) = index.compute_csum(); - manifest.verify_file(CATALOG_NAME, &csum, size)?; - - let most_used = index.find_most_used_chunks(8); - let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used); - let mut reader = BufferedDynamicReader::new(index, chunk_reader); - let mut catalogfile = std::fs::OpenOptions::new() - .write(true) - .read(true) - .custom_flags(libc::O_TMPFILE) - .open("/tmp")?; - - std::io::copy(&mut reader, &mut catalogfile) - .map_err(|err| format_err!("unable to download catalog - {}", err))?; - - catalogfile.seek(SeekFrom::Start(0))?; - let catalog_reader = CatalogReader::new(catalogfile); - let state = Shell::new( - catalog_reader, - &server_archive_name, - decoder, - )?; - - println!("Starting interactive shell"); - state.shell()?; - - record_repository(&repo); - - Ok(Value::Null) -} - fn main() { - const BACKUP_SOURCE_SCHEMA: Schema = StringSchema::new("Backup source specification ([