-use failure::*;
+use anyhow::{bail, format_err, Error};
use futures::*;
use hyper::header::{HeaderValue, UPGRADE};
use hyper::http::request::Parts;
use hyper::{Body, Response, StatusCode};
use serde_json::{json, Value};
-use proxmox::{sortable, identity};
-use proxmox::api::list_subdirs_api_method;
-use proxmox::api::{ApiFuture, ApiHandler, ApiMethod, Router, RpcEnvironment};
+use proxmox::{sortable, identity, list_subdirs_api_method};
+use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
use proxmox::api::router::SubdirMap;
use proxmox::api::schema::*;
-use crate::tools;
-use crate::tools::wrapped_reader_stream::*;
+use crate::tools::{self, WrappedReaderStream};
use crate::server::{WorkerTask, H2Service};
use crate::backup::*;
use crate::api2::types::*;
+use crate::config::acl::PRIV_DATASTORE_CREATE_BACKUP;
+use crate::config::cached_user_info::CachedUserInfo;
mod environment;
use environment::*;
#[sortable]
pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
- &ApiHandler::Async(&upgrade_to_backup_protocol),
+ &ApiHandler::AsyncHttp(&upgrade_to_backup_protocol),
&ObjectSchema::new(
concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
&sorted!([
- ("store", false, &StringSchema::new("Datastore name.").schema()),
+ ("store", false, &DATASTORE_SCHEMA),
("backup-type", false, &BACKUP_TYPE_SCHEMA),
("backup-id", false, &BACKUP_ID_SCHEMA),
("backup-time", false, &BACKUP_TIME_SCHEMA),
("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
]),
)
+).access(
+ // Note: parameter 'store' is no uri parameter, so we need to test inside function body
+ Some("The user needs Datastore.CreateBackup privilege on /datastore/{store}."),
+ &Permission::Anybody
);
fn upgrade_to_backup_protocol(
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
-) -> Result<ApiFuture, Error> {
+) -> ApiResponseFuture {
+ async move {
let debug = param["debug"].as_bool().unwrap_or(false);
+ let username = rpcenv.get_user().unwrap();
+
let store = tools::required_string_param(¶m, "store")?.to_owned();
+
+ let user_info = CachedUserInfo::new()?;
+ user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_CREATE_BACKUP, false)?;
+
let datastore = DataStore::lookup_datastore(&store)?;
let backup_type = tools::required_string_param(¶m, "backup-type")?;
let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
- let username = rpcenv.get_user().unwrap();
let env_type = rpcenv.env_type();
let backup_group = BackupGroup::new(backup_type, backup_id);
if backup_dir.backup_time() <= last.backup_dir.backup_time() {
bail!("backup timestamp is older than last backup.");
}
+ // fixme: abort if last backup is still running - howto test?
+ // Idea: write upid into a file inside snapshot dir. then test if
+ // it is still running here.
}
let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
let abort_future = worker.abort_future();
let env2 = env.clone();
- let env3 = env.clone();
- let req_fut = req_body
+ let mut req_fut = req_body
.on_upgrade()
.map_err(Error::from)
.and_then(move |conn| {
- env3.debug("protocol upgrade done");
+ env2.debug("protocol upgrade done");
let mut http = hyper::server::conn::Http::new();
http.http2_only(true);
http.serve_connection(conn, service)
.map_err(Error::from)
});
- let abort_future = abort_future
+ let mut abort_future = abort_future
.map(|_| Err(format_err!("task aborted")));
- use futures::future::Either;
- future::select(req_fut, abort_future)
- .map(|res| match res {
- Either::Left((Ok(res), _)) => Ok(res),
- Either::Left((Err(err), _)) => Err(err),
- Either::Right((Ok(res), _)) => Ok(res),
- Either::Right((Err(err), _)) => Err(err),
- })
- .and_then(move |_result| async move {
- env.ensure_finished()?;
- env.log("backup finished sucessfully");
- Ok(())
- })
- .then(move |result| async move {
- if let Err(err) = result {
- match env2.ensure_finished() {
- Ok(()) => {}, // ignore error after finish
- _ => {
- env2.log(format!("backup failed: {}", err));
- env2.log("removing failed backup");
- env2.remove_backup()?;
- return Err(err);
- }
- }
- }
- Ok(())
- })
+ async move {
+ let res = select!{
+ req = req_fut => req,
+ abrt = abort_future => abrt,
+ };
+
+ match (res, env.ensure_finished()) {
+ (Ok(_), Ok(())) => {
+ env.log("backup finished sucessfully");
+ Ok(())
+ },
+ (Err(err), Ok(())) => {
+ // ignore errors after finish
+ env.log(format!("backup had errors but finished: {}", err));
+ Ok(())
+ },
+ (Ok(_), Err(err)) => {
+ env.log(format!("backup ended and finish failed: {}", err));
+ env.log("removing unfinished backup");
+ env.remove_backup()?;
+ Err(err)
+ },
+ (Err(err), Err(_)) => {
+ env.log(format!("backup failed: {}", err));
+ env.log("removing failed backup");
+ env.remove_backup()?;
+ Err(err)
+ },
+ }
+ }
})?;
let response = Response::builder()
.header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
.body(Body::empty())?;
- Ok(Box::new(futures::future::ok(response)))
+ Ok(response)
+ }.boxed()
}
pub const BACKUP_API_SUBDIRS: SubdirMap = &[
#[sortable]
pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
- &ApiHandler::Async(&dynamic_chunk_index),
+ &ApiHandler::AsyncHttp(&dynamic_chunk_index),
&ObjectSchema::new(
r###"
Download the dynamic chunk index from the previous backup.
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
-) -> Result<ApiFuture, Error> {
-
- let env: &BackupEnvironment = rpcenv.as_ref();
+) -> ApiResponseFuture {
- let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned();
+ async move {
+ let env: &BackupEnvironment = rpcenv.as_ref();
- if !archive_name.ends_with(".didx") {
- bail!("wrong archive extension: '{}'", archive_name);
- }
+ let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned();
- let empty_response = {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::empty())?
- };
-
- let last_backup = match &env.last_backup {
- Some(info) => info,
- None => return Ok(Box::new(future::ok(empty_response))),
- };
-
- let mut path = last_backup.backup_dir.relative_path();
- path.push(&archive_name);
-
- let index = match env.datastore.open_dynamic_reader(path) {
- Ok(index) => index,
- Err(_) => {
- env.log(format!("there is no last backup for archive '{}'", archive_name));
- return Ok(Box::new(future::ok(empty_response)));
+ if !archive_name.ends_with(".didx") {
+ bail!("wrong archive extension: '{}'", archive_name);
}
- };
-
- env.log(format!("download last backup index for archive '{}'", archive_name));
- let count = index.index_count();
- for pos in 0..count {
- let (start, end, digest) = index.chunk_info(pos)?;
- let size = (end - start) as u32;
- env.register_chunk(digest, size)?;
- }
+ let empty_response = {
+ Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?
+ };
+
+ let last_backup = match &env.last_backup {
+ Some(info) => info,
+ None => return Ok(empty_response),
+ };
+
+ let mut path = last_backup.backup_dir.relative_path();
+ path.push(&archive_name);
+
+ let index = match env.datastore.open_dynamic_reader(path) {
+ Ok(index) => index,
+ Err(_) => {
+ env.log(format!("there is no last backup for archive '{}'", archive_name));
+ return Ok(empty_response);
+ }
+ };
+
+ env.log(format!("download last backup index for archive '{}'", archive_name));
+
+ let count = index.index_count();
+ for pos in 0..count {
+ let (start, end, digest) = index.chunk_info(pos)?;
+ let size = (end - start) as u32;
+ env.register_chunk(digest, size)?;
+ }
- let reader = DigestListEncoder::new(Box::new(index));
+ let reader = DigestListEncoder::new(Box::new(index));
- let stream = WrappedReaderStream::new(reader);
+ let stream = WrappedReaderStream::new(reader);
- // fixme: set size, content type?
- let response = http::Response::builder()
- .status(200)
- .body(Body::wrap_stream(stream))?;
+ // fixme: set size, content type?
+ let response = http::Response::builder()
+ .status(200)
+ .body(Body::wrap_stream(stream))?;
- Ok(Box::new(future::ok(response)))
+ Ok(response)
+ }.boxed()
}
#[sortable]
pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
- &ApiHandler::Async(&fixed_chunk_index),
+ &ApiHandler::AsyncHttp(&fixed_chunk_index),
&ObjectSchema::new(
r###"
Download the fixed chunk index from the previous backup.
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
-) -> Result<ApiFuture, Error> {
+) -> ApiResponseFuture {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ async move {
+ let env: &BackupEnvironment = rpcenv.as_ref();
- let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned();
+ let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned();
- if !archive_name.ends_with(".fidx") {
- bail!("wrong archive extension: '{}'", archive_name);
- }
+ if !archive_name.ends_with(".fidx") {
+ bail!("wrong archive extension: '{}'", archive_name);
+ }
- let empty_response = {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::empty())?
- };
-
- let last_backup = match &env.last_backup {
- Some(info) => info,
- None => return Ok(Box::new(future::ok(empty_response))),
- };
-
- let mut path = last_backup.backup_dir.relative_path();
- path.push(&archive_name);
-
- let index = match env.datastore.open_fixed_reader(path) {
- Ok(index) => index,
- Err(_) => {
- env.log(format!("there is no last backup for archive '{}'", archive_name));
- return Ok(Box::new(future::ok(empty_response)));
+ let empty_response = {
+ Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?
+ };
+
+ let last_backup = match &env.last_backup {
+ Some(info) => info,
+ None => return Ok(empty_response),
+ };
+
+ let mut path = last_backup.backup_dir.relative_path();
+ path.push(&archive_name);
+
+ let index = match env.datastore.open_fixed_reader(path) {
+ Ok(index) => index,
+ Err(_) => {
+ env.log(format!("there is no last backup for archive '{}'", archive_name));
+ return Ok(empty_response);
+ }
+ };
+
+ env.log(format!("download last backup index for archive '{}'", archive_name));
+
+ let count = index.index_count();
+ let image_size = index.index_bytes();
+ for pos in 0..count {
+ let digest = index.index_digest(pos).unwrap();
+ // Note: last chunk can be smaller
+ let start = (pos*index.chunk_size) as u64;
+ let mut end = start + index.chunk_size as u64;
+ if end > image_size { end = image_size; }
+ let size = (end - start) as u32;
+ env.register_chunk(*digest, size)?;
}
- };
-
- env.log(format!("download last backup index for archive '{}'", archive_name));
-
- let count = index.index_count();
- let image_size = index.index_bytes();
- for pos in 0..count {
- let digest = index.index_digest(pos).unwrap();
- // Note: last chunk can be smaller
- let start = (pos*index.chunk_size) as u64;
- let mut end = start + index.chunk_size as u64;
- if end > image_size { end = image_size; }
- let size = (end - start) as u32;
- env.register_chunk(*digest, size)?;
- }
- let reader = DigestListEncoder::new(Box::new(index));
+ let reader = DigestListEncoder::new(Box::new(index));
- let stream = WrappedReaderStream::new(reader);
+ let stream = WrappedReaderStream::new(reader);
- // fixme: set size, content type?
- let response = http::Response::builder()
- .status(200)
- .body(Body::wrap_stream(stream))?;
+ // fixme: set size, content type?
+ let response = http::Response::builder()
+ .status(200)
+ .body(Body::wrap_stream(stream))?;
- Ok(Box::new(future::ok(response)))
+ Ok(response)
+ }.boxed()
}