use proxmox::{sortable, identity};
use proxmox::tools::fs::{file_get_contents, file_get_json, file_set_contents, image_size};
-use proxmox::api::{ApiHandler, ApiMethod, RpcEnvironment};
+use proxmox::api::{ApiFuture, ApiHandler, ApiMethod, RpcEnvironment};
use proxmox::api::schema::*;
use proxmox::api::cli::*;
use proxmox::api::api;
}
}
-fn list_backup_groups(
+fn list_backup_groups<'a>(
param: Value,
_info: &ApiMethod,
- _rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
+ _rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ list_backup_groups_async(param).await
+ }.boxed()
+}
+
+async fn list_backup_groups_async(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(¶m)?;
let path = format!("api2/json/admin/datastore/{}/groups", repo.store());
- let mut result = async_main(async move {
- client.get(&path, None).await
- })?;
+ let mut result = client.get(&path, None).await?;
record_repository(&repo);
Ok(Value::Null)
}
-fn list_snapshots(
+fn list_snapshots<'a>(
param: Value,
_info: &ApiMethod,
- _rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
+ _rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ list_snapshots_async(param).await
+ }.boxed()
+}
+
+async fn list_snapshots_async(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(¶m)?;
args["backup-id"] = group.backup_id().into();
}
- let result = async_main(async move {
- client.get(&path, Some(args)).await
- })?;
+ let result = client.get(&path, Some(args)).await?;
record_repository(&repo);
Ok(Value::Null)
}
-fn forget_snapshots(
+fn forget_snapshots<'a>(
param: Value,
_info: &ApiMethod,
- _rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
+ _rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ forget_snapshots_async(param).await
+ }.boxed()
+}
+
+async fn forget_snapshots_async(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(¶m)?;
let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store());
- let result = async_main(async move {
- client.delete(&path, Some(json!({
- "backup-type": snapshot.group().backup_type(),
- "backup-id": snapshot.group().backup_id(),
- "backup-time": snapshot.backup_time().timestamp(),
- }))).await
- })?;
+ let result = client.delete(&path, Some(json!({
+ "backup-type": snapshot.group().backup_type(),
+ "backup-id": snapshot.group().backup_id(),
+ "backup-time": snapshot.backup_time().timestamp(),
+ }))).await?;
record_repository(&repo);
Ok(result)
}
-fn api_login(
+fn api_login<'a>(
param: Value,
_info: &ApiMethod,
- _rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
+ _rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ api_login_async(param).await
+ }.boxed()
+}
+
+async fn api_login_async(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(¶m)?;
let client = HttpClient::new(repo.host(), repo.user(), None)?;
- async_main(async move { client.login().await })?;
+ client.login().await?;
record_repository(&repo);
Ok(Value::Null)
}
-fn dump_catalog(
+fn dump_catalog<'a>(
param: Value,
_info: &ApiMethod,
- _rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
+ _rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ dump_catalog_async(param).await
+ }.boxed()
+}
+
+async fn dump_catalog_async(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(¶m)?;
let client = HttpClient::new(repo.host(), repo.user(), None)?;
- async_main(async move {
- let client = BackupReader::start(
- client,
- crypt_config.clone(),
- repo.store(),
- &snapshot.group().backup_type(),
- &snapshot.group().backup_id(),
- snapshot.backup_time(),
- true,
- ).await?;
+ let client = BackupReader::start(
+ client,
+ crypt_config.clone(),
+ repo.store(),
+ &snapshot.group().backup_type(),
+ &snapshot.group().backup_id(),
+ snapshot.backup_time(),
+ true,
+ ).await?;
- let manifest = client.download_manifest().await?;
+ let manifest = client.download_manifest().await?;
- let index = client.download_dynamic_index(&manifest, CATALOG_NAME).await?;
+ let index = client.download_dynamic_index(&manifest, CATALOG_NAME).await?;
- let most_used = index.find_most_used_chunks(8);
-
- let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
+ let most_used = index.find_most_used_chunks(8);
- let mut reader = BufferedDynamicReader::new(index, chunk_reader);
+ let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
- let mut catalogfile = std::fs::OpenOptions::new()
- .write(true)
- .read(true)
- .custom_flags(libc::O_TMPFILE)
- .open("/tmp")?;
+ let mut reader = BufferedDynamicReader::new(index, chunk_reader);
- std::io::copy(&mut reader, &mut catalogfile)
- .map_err(|err| format_err!("unable to download catalog - {}", err))?;
+ let mut catalogfile = std::fs::OpenOptions::new()
+ .write(true)
+ .read(true)
+ .custom_flags(libc::O_TMPFILE)
+ .open("/tmp")?;
- catalogfile.seek(SeekFrom::Start(0))?;
+ std::io::copy(&mut reader, &mut catalogfile)
+ .map_err(|err| format_err!("unable to download catalog - {}", err))?;
- let mut catalog_reader = CatalogReader::new(catalogfile);
+ catalogfile.seek(SeekFrom::Start(0))?;
- catalog_reader.dump()?;
+ let mut catalog_reader = CatalogReader::new(catalogfile);
- record_repository(&repo);
+ catalog_reader.dump()?;
- Ok::<(), Error>(())
- })?;
+ record_repository(&repo);
Ok(Value::Null)
}
-fn list_snapshot_files(
+fn list_snapshot_files<'a>(
param: Value,
_info: &ApiMethod,
- _rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
+ _rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ list_snapshot_files_async(param).await
+ }.boxed()
+}
+
+async fn list_snapshot_files_async(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(¶m)?;
let path = format!("api2/json/admin/datastore/{}/files", repo.store());
- let mut result = async_main(async move {
- client.get(&path, Some(json!({
- "backup-type": snapshot.group().backup_type(),
- "backup-id": snapshot.group().backup_id(),
- "backup-time": snapshot.backup_time().timestamp(),
- }))).await
- })?;
+ let mut result = client.get(&path, Some(json!({
+ "backup-type": snapshot.group().backup_type(),
+ "backup-id": snapshot.group().backup_id(),
+ "backup-time": snapshot.backup_time().timestamp(),
+ }))).await?;
record_repository(&repo);
Ok(Value::Null)
}
-fn start_garbage_collection(
+fn start_garbage_collection<'a>(
+ param: Value,
+ info: &'static ApiMethod,
+ rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ start_garbage_collection_async(param, info, rpcenv).await
+ }.boxed()
+}
+
+async fn start_garbage_collection_async(
param: Value,
_info: &ApiMethod,
_rpcenv: &mut dyn RpcEnvironment,
let path = format!("api2/json/admin/datastore/{}/gc", repo.store());
- async_main(async {
- let result = client.post(&path, None).await?;
+ let result = client.post(&path, None).await?;
- record_repository(&repo);
+ record_repository(&repo);
- view_task_result(client, result, &output_format).await
- })?;
+ view_task_result(client, result, &output_format).await?;
Ok(Value::Null)
}
Ok((catalog, catalog_result_rx))
}
-fn create_backup(
+fn create_backup<'a>(
+ param: Value,
+ info: &'static ApiMethod,
+ rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ create_backup_async(param, info, rpcenv).await
+ }.boxed()
+}
+
+async fn create_backup_async(
param: Value,
_info: &ApiMethod,
_rpcenv: &mut dyn RpcEnvironment,
}
};
- async_main(async move {
- let client = BackupWriter::start(
- client,
- repo.store(),
- backup_type,
- &backup_id,
- backup_time,
- verbose,
- ).await?;
-
- let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp());
- let mut manifest = BackupManifest::new(snapshot);
-
- let (catalog, catalog_result_rx) = spawn_catalog_upload(client.clone(), crypt_config.clone())?;
-
- for (backup_type, filename, target, size) in upload_list {
- match backup_type {
- BackupType::CONFIG => {
- println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target);
- let stats = client
- .upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
- .await?;
- manifest.add_file(target, stats.size, stats.csum);
- }
- BackupType::LOGFILE => { // fixme: remove - not needed anymore ?
- println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target);
- let stats = client
- .upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
- .await?;
- manifest.add_file(target, stats.size, stats.csum);
- }
- BackupType::PXAR => {
- println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
- catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
- let stats = backup_directory(
- &client,
- &filename,
- &target,
- chunk_size_opt,
- devices.clone(),
- verbose,
- skip_lost_and_found,
- crypt_config.clone(),
- catalog.clone(),
- ).await?;
- manifest.add_file(target, stats.size, stats.csum);
- catalog.lock().unwrap().end_directory()?;
- }
- BackupType::IMAGE => {
- println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
- let stats = backup_image(
- &client,
- &filename,
- &target,
- size,
- chunk_size_opt,
- verbose,
- crypt_config.clone(),
- ).await?;
- manifest.add_file(target, stats.size, stats.csum);
- }
+ let client = BackupWriter::start(
+ client,
+ repo.store(),
+ backup_type,
+ &backup_id,
+ backup_time,
+ verbose,
+ ).await?;
+
+ let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp());
+ let mut manifest = BackupManifest::new(snapshot);
+
+ let (catalog, catalog_result_rx) = spawn_catalog_upload(client.clone(), crypt_config.clone())?;
+
+ for (backup_type, filename, target, size) in upload_list {
+ match backup_type {
+ BackupType::CONFIG => {
+ println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target);
+ let stats = client
+ .upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
+ .await?;
+ manifest.add_file(target, stats.size, stats.csum);
+ }
+ BackupType::LOGFILE => { // fixme: remove - not needed anymore ?
+ println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target);
+ let stats = client
+ .upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
+ .await?;
+ manifest.add_file(target, stats.size, stats.csum);
+ }
+ BackupType::PXAR => {
+ println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
+ catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
+ let stats = backup_directory(
+ &client,
+ &filename,
+ &target,
+ chunk_size_opt,
+ devices.clone(),
+ verbose,
+ skip_lost_and_found,
+ crypt_config.clone(),
+ catalog.clone(),
+ ).await?;
+ manifest.add_file(target, stats.size, stats.csum);
+ catalog.lock().unwrap().end_directory()?;
+ }
+ BackupType::IMAGE => {
+ println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
+ let stats = backup_image(
+ &client,
+ &filename,
+ &target,
+ size,
+ chunk_size_opt,
+ verbose,
+ crypt_config.clone(),
+ ).await?;
+ manifest.add_file(target, stats.size, stats.csum);
}
}
+ }
- // finalize and upload catalog
- if upload_catalog {
- let mutex = Arc::try_unwrap(catalog)
- .map_err(|_| format_err!("unable to get catalog (still used)"))?;
- let mut catalog = mutex.into_inner().unwrap();
+ // finalize and upload catalog
+ if upload_catalog {
+ let mutex = Arc::try_unwrap(catalog)
+ .map_err(|_| format_err!("unable to get catalog (still used)"))?;
+ let mut catalog = mutex.into_inner().unwrap();
- catalog.finish()?;
+ catalog.finish()?;
- drop(catalog); // close upload stream
+ drop(catalog); // close upload stream
- let stats = catalog_result_rx.await??;
+ let stats = catalog_result_rx.await??;
- manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum);
- }
+ manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum);
+ }
- if let Some(rsa_encrypted_key) = rsa_encrypted_key {
- let target = "rsa-encrypted.key";
- println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
- let stats = client
- .upload_blob_from_data(rsa_encrypted_key, target, None, false, false)
- .await?;
- manifest.add_file(format!("{}.blob", target), stats.size, stats.csum);
-
- // openssl rsautl -decrypt -inkey master-private.pem -in rsa-encrypted.key -out t
- /*
- let mut buffer2 = vec![0u8; rsa.size() as usize];
- let pem_data = file_get_contents("master-private.pem")?;
- let rsa = openssl::rsa::Rsa::private_key_from_pem(&pem_data)?;
- let len = rsa.private_decrypt(&buffer, &mut buffer2, openssl::rsa::Padding::PKCS1)?;
- println!("TEST {} {:?}", len, buffer2);
- */
- }
+ if let Some(rsa_encrypted_key) = rsa_encrypted_key {
+ let target = "rsa-encrypted.key";
+ println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
+ let stats = client
+ .upload_blob_from_data(rsa_encrypted_key, target, None, false, false)
+ .await?;
+ manifest.add_file(format!("{}.blob", target), stats.size, stats.csum);
+
+ // openssl rsautl -decrypt -inkey master-private.pem -in rsa-encrypted.key -out t
+ /*
+ let mut buffer2 = vec![0u8; rsa.size() as usize];
+ let pem_data = file_get_contents("master-private.pem")?;
+ let rsa = openssl::rsa::Rsa::private_key_from_pem(&pem_data)?;
+ let len = rsa.private_decrypt(&buffer, &mut buffer2, openssl::rsa::Padding::PKCS1)?;
+ println!("TEST {} {:?}", len, buffer2);
+ */
+ }
- // create manifest (index.json)
- let manifest = manifest.into_json();
+ // create manifest (index.json)
+ let manifest = manifest.into_json();
- println!("Upload index.json to '{:?}'", repo);
- let manifest = serde_json::to_string_pretty(&manifest)?.into();
- client
- .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, crypt_config.clone(), true, true)
- .await?;
+ println!("Upload index.json to '{:?}'", repo);
+ let manifest = serde_json::to_string_pretty(&manifest)?.into();
+ client
+ .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, crypt_config.clone(), true, true)
+ .await?;
- client.finish().await?;
+ client.finish().await?;
- let end_time = Local::now();
- let elapsed = end_time.signed_duration_since(start_time);
- println!("Duration: {}", elapsed);
+ let end_time = Local::now();
+ let elapsed = end_time.signed_duration_since(start_time);
+ println!("Duration: {}", elapsed);
- println!("End Time: {}", end_time.to_rfc3339_opts(chrono::SecondsFormat::Secs, false));
+ println!("End Time: {}", end_time.to_rfc3339_opts(chrono::SecondsFormat::Secs, false));
- Ok(Value::Null)
- })
+ Ok(Value::Null)
}
fn complete_backup_source(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
result
}
-fn restore(
- param: Value,
- _info: &ApiMethod,
- _rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
- async_main(restore_do(param))
-}
-
fn dump_image<W: Write>(
client: Arc<BackupReader>,
crypt_config: Option<Arc<CryptConfig>>,
Ok(())
}
+fn restore<'a>(
+ param: Value,
+ _info: &ApiMethod,
+ _rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ restore_do(param).await
+ }.boxed()
+}
+
async fn restore_do(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(¶m)?;
Ok(Value::Null)
}
-fn upload_log(
+fn upload_log<'a>(
param: Value,
_info: &ApiMethod,
- _rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
+ _rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+ async move {
+ upload_log_async(param).await
+ }.boxed()
+}
+
+async fn upload_log_async(param: Value) -> Result<Value, Error> {
let logfile = tools::required_string_param(¶m, "logfile")?;
let repo = extract_repository_from_value(¶m)?;
let body = hyper::Body::from(raw_data);
- async_main(async move {
- client.upload("application/octet-stream", body, &path, Some(args)).await
- })
+ client.upload("application/octet-stream", body, &path, Some(args)).await
}
fn prune(
Ok(Value::Null)
}
-fn catalog_shell(
+fn catalog_shell<'a>(
param: Value,
_info: &ApiMethod,
- _rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
- async_main(catalog_shell_async(param))
+ _rpcenv: &'a mut dyn RpcEnvironment,
+) -> ApiFuture<'a> {
+
+ async move {
+ catalog_shell_async(param).await
+ }.boxed()
}
async fn catalog_shell_async(param: Value) -> Result<Value, Error> {
#[sortable]
const API_METHOD_SHELL: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&catalog_shell),
+ &ApiHandler::Async(&catalog_shell),
&ObjectSchema::new(
"Shell to interactively inspect and restore snapshots.",
&sorted!([
#[sortable]
const API_METHOD_DUMP_CATALOG: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&dump_catalog),
+ &ApiHandler::Async(&dump_catalog),
&ObjectSchema::new(
"Dump catalog.",
&sorted!([
#[sortable]
const API_METHOD_CREATE_BACKUP: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&create_backup),
+ &ApiHandler::Async(&create_backup),
&ObjectSchema::new(
"Create (host) backup.",
&sorted!([
#[sortable]
const API_METHOD_UPLOAD_LOG: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&upload_log),
+ &ApiHandler::Async(&upload_log),
&ObjectSchema::new(
"Upload backup log file.",
&sorted!([
#[sortable]
const API_METHOD_LIST_BACKUP_GROUPS: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&list_backup_groups),
+ &ApiHandler::Async(&list_backup_groups),
&ObjectSchema::new(
"List backup groups.",
&sorted!([
#[sortable]
const API_METHOD_LIST_SNAPSHOTS: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&list_snapshots),
+ &ApiHandler::Async(&list_snapshots),
&ObjectSchema::new(
"List backup snapshots.",
&sorted!([
#[sortable]
const API_METHOD_FORGET_SNAPSHOTS: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&forget_snapshots),
+ &ApiHandler::Async(&forget_snapshots),
&ObjectSchema::new(
"Forget (remove) backup snapshots.",
&sorted!([
#[sortable]
const API_METHOD_START_GARBAGE_COLLECTION: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&start_garbage_collection),
+ &ApiHandler::Async(&start_garbage_collection),
&ObjectSchema::new(
"Start garbage collection for a specific repository.",
&sorted!([ ("repository", true, &REPO_URL_SCHEMA) ]),
#[sortable]
const API_METHOD_RESTORE: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&restore),
+ &ApiHandler::Async(&restore),
&ObjectSchema::new(
"Restore backup repository.",
&sorted!([
#[sortable]
const API_METHOD_LIST_SNAPSHOT_FILES: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&list_snapshot_files),
+ &ApiHandler::Async(&list_snapshot_files),
&ObjectSchema::new(
"List snapshot files.",
&sorted!([
#[sortable]
const API_METHOD_API_LOGIN: ApiMethod = ApiMethod::new(
- &ApiHandler::Sync(&api_login),
+ &ApiHandler::Async(&api_login),
&ObjectSchema::new(
"Try to login. If successful, store ticket.",
&sorted!([ ("repository", true, &REPO_URL_SCHEMA) ]),