use proxmox_backup::backup::*;
//use proxmox_backup::backup::image_index::*;
//use proxmox_backup::config::datastore;
-//use proxmox_backup::catar::encoder::*;
+//use proxmox_backup::pxar::encoder::*;
//use proxmox_backup::backup::datastore::*;
use serde_json::{json, Value};
use xdg::BaseDirectories;
use lazy_static::lazy_static;
+use futures::*;
+use tokio::sync::mpsc;
lazy_static! {
- static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:catar|raw)):(.+)$").unwrap();
+ static ref BACKUPSPEC_REGEX: Regex = Regex::new(r"^([a-zA-Z0-9_-]+\.(?:pxar|img)):(.+)$").unwrap();
}
_ => return,
};
- let mut data = tools::file_get_json(&path).unwrap_or(json!({}));
+ let mut data = tools::file_get_json(&path, None).unwrap_or(json!({}));
let repo = repo.to_string();
_ => return result,
};
- let data = tools::file_get_json(&path).unwrap_or(json!({}));
+ let data = tools::file_get_json(&path, None).unwrap_or(json!({}));
if let Some(map) = data.as_object() {
for (repo, _count) in map {
}
fn backup_directory<P: AsRef<Path>>(
- client: &mut HttpClient,
- repo: &BackupRepository,
+ client: &BackupClient,
dir_path: P,
archive_name: &str,
- backup_id: &str,
- backup_time: DateTime<Local>,
chunk_size: Option<u64>,
all_file_systems: bool,
verbose: bool,
) -> Result<(), Error> {
- let mut param = json!({
- "archive-name": archive_name,
- "backup-type": "host",
- "backup-id": backup_id,
- "backup-time": backup_time.timestamp(),
- });
+ if let Some(_size) = chunk_size {
+ unimplemented!();
+ }
+
+ let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?;
+ let chunk_stream = ChunkStream::new(pxar_stream);
+
+ let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
+
+ let stream = rx
+ .map_err(Error::from)
+ .and_then(|x| x); // flatten
- if let Some(size) = chunk_size {
- param["chunk-size"] = size.into();
+ // spawn chunker inside a separate task so that it can run parallel
+ tokio::spawn(
+ tx.send_all(chunk_stream.then(|r| Ok(r)))
+ .map_err(|e| {}).map(|_| ())
+ );
+
+ client.upload_stream(archive_name, stream, "dynamic", None).wait()?;
+
+ Ok(())
+}
+
+fn backup_image<P: AsRef<Path>>(
+ client: &BackupClient,
+ image_path: P,
+ archive_name: &str,
+ image_size: u64,
+ chunk_size: Option<u64>,
+ verbose: bool,
+) -> Result<(), Error> {
+
+ if let Some(_size) = chunk_size {
+ unimplemented!();
}
- let query = tools::json_object_to_query(param)?;
+ let path = image_path.as_ref().to_owned();
- let path = format!("api2/json/admin/datastore/{}/catar?{}", repo.store(), query);
+ let file = tokio::fs::File::open(path).wait()?;
- let stream = CaTarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?;
+ let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
+ .map_err(Error::from);
- let body = Body::wrap_stream(stream);
+ let stream = FixedChunkStream::new(stream, 4*1024*1024);
- client.upload("application/x-proxmox-backup-catar", body, &path)?;
+ client.upload_stream(archive_name, stream, "fixed", Some(image_size)).wait()?;
Ok(())
}
result
}
+/* not used:
fn list_backups(
param: Value,
_info: &ApiMethod,
let repo_url = tools::required_string_param(¶m, "repository")?;
let repo: BackupRepository = repo_url.parse()?;
- let mut client = HttpClient::new(repo.host(), repo.user());
+ let mut client = HttpClient::new(repo.host(), repo.user())?;
let path = format!("api2/json/admin/datastore/{}/backups", repo.store());
- let result = client.get(&path)?;
+ let result = client.get(&path, None)?;
record_repository(&repo);
//Ok(result)
Ok(Value::Null)
}
+ */
fn list_backup_groups(
param: Value,
let repo_url = tools::required_string_param(¶m, "repository")?;
let repo: BackupRepository = repo_url.parse()?;
- let mut client = HttpClient::new(repo.host(), repo.user());
+ let client = HttpClient::new(repo.host(), repo.user())?;
let path = format!("api2/json/admin/datastore/{}/groups", repo.store());
- let mut result = client.get(&path)?;
+ let mut result = client.get(&path, None).wait()?;
record_repository(&repo);
let path = tools::required_string_param(¶m, "group")?;
let group = BackupGroup::parse(path)?;
- let query = tools::json_object_to_query(json!({
- "backup-type": group.backup_type(),
- "backup-id": group.backup_id(),
- }))?;
-
- let mut client = HttpClient::new(repo.host(), repo.user());
+ let client = HttpClient::new(repo.host(), repo.user())?;
- let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query);
+ let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store());
- // fixme: params
- let result = client.get(&path)?;
+ let result = client.get(&path, Some(json!({
+ "backup-type": group.backup_type(),
+ "backup-id": group.backup_id(),
+ }))).wait()?;
record_repository(&repo);
let path = tools::required_string_param(¶m, "snapshot")?;
let snapshot = BackupDir::parse(path)?;
- let query = tools::json_object_to_query(json!({
+ let mut client = HttpClient::new(repo.host(), repo.user())?;
+
+ let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store());
+
+ let result = client.delete(&path, Some(json!({
"backup-type": snapshot.group().backup_type(),
"backup-id": snapshot.group().backup_id(),
"backup-time": snapshot.backup_time().timestamp(),
- }))?;
-
- let mut client = HttpClient::new(repo.host(), repo.user());
-
- let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), query);
-
- let result = client.delete(&path)?;
+ }))).wait()?;
record_repository(&repo);
let repo_url = tools::required_string_param(¶m, "repository")?;
let repo: BackupRepository = repo_url.parse()?;
- let mut client = HttpClient::new(repo.host(), repo.user());
+ let mut client = HttpClient::new(repo.host(), repo.user())?;
let path = format!("api2/json/admin/datastore/{}/gc", repo.store());
- let result = client.post(&path)?;
+ let result = client.post(&path, None).wait()?;
record_repository(&repo);
let mut upload_list = vec![];
+ enum BackupType { PXAR, IMAGE };
+
for backupspec in backupspec_list {
let (target, filename) = parse_backupspec(backupspec.as_str().unwrap())?;
- let stat = match nix::sys::stat::stat(filename) {
- Ok(s) => s,
+ use std::os::unix::fs::FileTypeExt;
+
+ let metadata = match std::fs::metadata(filename) {
+ Ok(m) => m,
Err(err) => bail!("unable to access '{}' - {}", filename, err),
};
+ let file_type = metadata.file_type();
- if (stat.st_mode & libc::S_IFDIR) != 0 {
+ if file_type.is_dir() {
- upload_list.push((filename.to_owned(), target.to_owned()));
+ upload_list.push((BackupType::PXAR, filename.to_owned(), target.to_owned(), 0));
- } else if (stat.st_mode & (libc::S_IFREG|libc::S_IFBLK)) != 0 {
- if stat.st_size <= 0 { bail!("got strange file size '{}'", stat.st_size); }
- let _size = stat.st_size as usize;
+ } else if file_type.is_file() || file_type.is_block_device() {
- panic!("implement me");
+ let size = tools::image_size(&PathBuf::from(filename))?;
- //backup_image(&datastore, &file, size, &target, chunk_size)?;
+ if size == 0 { bail!("got zero-sized file '{}'", filename); }
- // let idx = datastore.open_image_reader(target)?;
- // idx.print_info();
+ upload_list.push((BackupType::IMAGE, filename.to_owned(), target.to_owned(), size));
} else {
bail!("unsupported file type (expected a directory, file or block device)");
let backup_time = Local.timestamp(Local::now().timestamp(), 0);
- let mut client = HttpClient::new(repo.host(), repo.user());
-
- client.login()?; // login before starting backup
-
+ let client = HttpClient::new(repo.host(), repo.user())?;
record_repository(&repo);
println!("Starting backup");
println!("Client name: {}", tools::nodename());
println!("Start Time: {}", backup_time.to_rfc3339());
- for (filename, target) in upload_list {
- println!("Upload '{}' to '{:?}' as {}", filename, repo, target);
- backup_directory(&mut client, &repo, &filename, &target, backup_id, backup_time,
- chunk_size_opt, all_file_systems, verbose)?;
+ let client = client.start_backup(repo.store(), "host", &backup_id, verbose).wait()?;
+
+ for (backup_type, filename, target, size) in upload_list {
+ match backup_type {
+ BackupType::PXAR => {
+ println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
+ backup_directory(&client, &filename, &target, chunk_size_opt, all_file_systems, verbose)?;
+ }
+ BackupType::IMAGE => {
+ println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
+ backup_image(&client, &filename, &target, size, chunk_size_opt, verbose)?;
+ }
+ }
}
+ client.finish().wait()?;
+
let end_time = Local.timestamp(Local::now().timestamp(), 0);
let elapsed = end_time.signed_duration_since(backup_time);
println!("Duration: {}", elapsed);
let data: Vec<&str> = arg.splitn(2, ':').collect();
if data.len() != 2 {
- result.push(String::from("root.catar:/"));
- result.push(String::from("etc.catar:/etc"));
+ result.push(String::from("root.pxar:/"));
+ result.push(String::from("etc.pxar:/etc"));
return result;
}
let archive_name = tools::required_string_param(¶m, "archive-name")?;
- let mut client = HttpClient::new(repo.host(), repo.user());
-
- client.login()?; // login before starting
+ let mut client = HttpClient::new(repo.host(), repo.user())?;
record_repository(&repo);
if path.matches('/').count() == 1 {
let group = BackupGroup::parse(path)?;
- let subquery = tools::json_object_to_query(json!({
+ let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store());
+ let result = client.get(&path, Some(json!({
"backup-type": group.backup_type(),
"backup-id": group.backup_id(),
- }))?;
-
- let path = format!("api2/json/admin/datastore/{}/snapshots?{}", repo.store(), subquery);
- let result = client.get(&path)?;
+ }))).wait()?;
let list = result["data"].as_array().unwrap();
if list.len() == 0 {
let target = tools::required_string_param(¶m, "target")?;
- if archive_name.ends_with(".catar") {
- let path = format!("api2/json/admin/datastore/{}/catar?{}", repo.store(), query);
+ if archive_name.ends_with(".pxar") {
+ let path = format!("api2/json/admin/datastore/{}/pxar?{}", repo.store(), query);
println!("DOWNLOAD FILE {} to {}", path, target);
let target = PathBuf::from(target);
- let writer = CaTarBackupWriter::new(&target, true)?;
- client.download(&path, Box::new(writer))?;
+ let writer = PxarDecodeWriter::new(&target, true)?;
+ client.download(&path, Box::new(writer)).wait()?;
} else {
bail!("unknown file extensions - unable to download '{}'", archive_name);
}
let repo_url = tools::required_string_param(¶m, "repository")?;
let repo: BackupRepository = repo_url.parse()?;
- let mut client = HttpClient::new(repo.host(), repo.user());
+ let mut client = HttpClient::new(repo.host(), repo.user())?;
let path = format!("api2/json/admin/datastore/{}/prune", repo.store());
param.as_object_mut().unwrap().remove("repository");
- let result = client.post_json(&path, param)?;
+ let result = client.post(&path, Some(param)).wait()?;
record_repository(&repo);
Ok(result)
}
+// like get, but simply ignore errors and return Null instead
fn try_get(repo: &BackupRepository, url: &str) -> Value {
- let mut client = HttpClient::new(repo.host(), repo.user());
+ let client = match HttpClient::new(repo.host(), repo.user()) {
+ Ok(v) => v,
+ _ => return Value::Null,
+ };
- let mut resp = match client.try_get(url) {
+ let mut resp = match client.get(url, None).wait() {
Ok(v) => v,
_ => return Value::Null,
};
result
}
+fn complete_archive_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
+
+ let mut result = vec![];
+
+ let repo = match extract_repo(param) {
+ Some(v) => v,
+ _ => return result,
+ };
+
+ let snapshot = match param.get("snapshot") {
+ Some(path) => {
+ match BackupDir::parse(path) {
+ Ok(v) => v,
+ _ => return result,
+ }
+ }
+ _ => return result,
+ };
+
+ let query = tools::json_object_to_query(json!({
+ "backup-type": snapshot.group().backup_type(),
+ "backup-id": snapshot.group().backup_id(),
+ "backup-time": snapshot.backup_time().timestamp(),
+ })).unwrap();
+
+ let path = format!("api2/json/admin/datastore/{}/files?{}", repo.store(), query);
+
+ let data = try_get(&repo, &path);
+
+ if let Some(list) = data.as_array() {
+ for item in list {
+ if let Some(filename) = item.as_str() {
+ result.push(filename.to_owned());
+ }
+ }
+ }
+
+ strip_chunked_file_expenstions(result)
+}
+
fn complete_chunk_size(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> {
let mut result = vec![];
))
.arg_param(vec!["repository", "snapshot", "archive-name", "target"])
.completion_cb("repository", complete_repository)
- .completion_cb("snapshot", complete_group_or_snapshot);
+ .completion_cb("snapshot", complete_group_or_snapshot)
+ .completion_cb("archive-name", complete_archive_name)
+ .completion_cb("target", tools::complete_file_name);
let prune_cmd_def = CliCommand::new(
ApiMethod::new(
.insert("restore".to_owned(), restore_cmd_def.into())
.insert("snapshots".to_owned(), snapshots_cmd_def.into());
- run_cli_command(cmd_def.into());
+ hyper::rt::run(futures::future::lazy(move || {
+ run_cli_command(cmd_def.into());
+ Ok(())
+ }));
}