-use failure::*;
-use lazy_static::lazy_static;
-
-use std::sync::Arc;
-
+//use chrono::{Local, TimeZone};
+use anyhow::{bail, format_err, Error};
use futures::*;
use hyper::header::{self, HeaderValue, UPGRADE};
-use hyper::{Body, Response, StatusCode};
use hyper::http::request::Parts;
-//use chrono::{Local, TimeZone};
-
+use hyper::{Body, Response, StatusCode};
use serde_json::Value;
-use crate::tools;
-use crate::api_schema::router::*;
-use crate::api_schema::*;
-use crate::server::{WorkerTask, H2Service};
-use crate::backup::*;
+use proxmox::{sortable, identity};
+use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
+use proxmox::api::schema::*;
+use proxmox::http_err;
+
use crate::api2::types::*;
+use crate::backup::*;
+use crate::server::{WorkerTask, H2Service};
+use crate::tools;
+use crate::config::acl::PRIV_DATASTORE_READ;
+use crate::config::cached_user_info::CachedUserInfo;
+use crate::api2::helpers;
mod environment;
use environment::*;
-pub fn router() -> Router {
- Router::new()
- .upgrade(api_method_upgrade_backup())
-}
-
-pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
- ApiAsyncMethod::new(
- upgrade_to_backup_reader_protocol,
- ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."))
- .required("store", StringSchema::new("Datastore name."))
- .required("backup-type", StringSchema::new("Backup type.")
- .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
- .required("backup-id", StringSchema::new("Backup ID."))
- .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
- .minimum(1547797308))
- .optional("debug", BooleanSchema::new("Enable verbose debug logging."))
+pub const ROUTER: Router = Router::new()
+ .upgrade(&API_METHOD_UPGRADE_BACKUP);
+
+#[sortable]
+pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
+ &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
+ &ObjectSchema::new(
+ concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
+ &sorted!([
+ ("store", false, &DATASTORE_SCHEMA),
+ ("backup-type", false, &BACKUP_TYPE_SCHEMA),
+ ("backup-id", false, &BACKUP_ID_SCHEMA),
+ ("backup-time", false, &BACKUP_TIME_SCHEMA),
+ ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
+ ]),
)
-}
+).access(
+ // Note: parameter 'store' is no uri parameter, so we need to test inside function body
+ Some("The user needs Datastore.Read privilege on /datastore/{store}."),
+ &Permission::Anybody
+);
fn upgrade_to_backup_reader_protocol(
parts: Parts,
req_body: Body,
param: Value,
- _info: &ApiAsyncMethod,
+ _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
-) -> Result<BoxFut, Error> {
+) -> ApiResponseFuture {
- let debug = param["debug"].as_bool().unwrap_or(false);
+ async move {
+ let debug = param["debug"].as_bool().unwrap_or(false);
- let store = tools::required_string_param(¶m, "store")?.to_owned();
- let datastore = DataStore::lookup_datastore(&store)?;
+ let username = rpcenv.get_user().unwrap();
+ let store = tools::required_string_param(¶m, "store")?.to_owned();
- let backup_type = tools::required_string_param(¶m, "backup-type")?;
- let backup_id = tools::required_string_param(¶m, "backup-id")?;
- let backup_time = tools::required_integer_param(¶m, "backup-time")?;
+ let user_info = CachedUserInfo::new()?;
+ user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_READ, false)?;
- let protocols = parts
- .headers
- .get("UPGRADE")
- .ok_or_else(|| format_err!("missing Upgrade header"))?
- .to_str()?;
+ let datastore = DataStore::lookup_datastore(&store)?;
- if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
- bail!("invalid protocol name");
- }
+ let backup_type = tools::required_string_param(¶m, "backup-type")?;
+ let backup_id = tools::required_string_param(¶m, "backup-id")?;
+ let backup_time = tools::required_integer_param(¶m, "backup-time")?;
- if parts.version >= http::version::Version::HTTP_2 {
- bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
- }
+ let protocols = parts
+ .headers
+ .get("UPGRADE")
+ .ok_or_else(|| format_err!("missing Upgrade header"))?
+ .to_str()?;
- let username = rpcenv.get_user().unwrap();
- let env_type = rpcenv.env_type();
+ if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
+ bail!("invalid protocol name");
+ }
- let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
- let path = datastore.base_path();
+ if parts.version >= http::version::Version::HTTP_2 {
+ bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
+ }
- //let files = BackupInfo::list_files(&path, &backup_dir)?;
+ let env_type = rpcenv.env_type();
- let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
+ let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
+ let path = datastore.base_path();
- WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
- let mut env = ReaderEnvironment::new(
- env_type, username.clone(), worker.clone(), datastore, backup_dir);
+ //let files = BackupInfo::list_files(&path, &backup_dir)?;
- env.debug = debug;
+ let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
- env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
+ WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
+ let mut env = ReaderEnvironment::new(
+ env_type, username.clone(), worker.clone(), datastore, backup_dir);
- let service = H2Service::new(env.clone(), worker.clone(), &READER_ROUTER, debug);
+ env.debug = debug;
- let abort_future = worker.abort_future();
+ env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
- let env3 = env.clone();
+ let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
- req_body
- .on_upgrade()
- .map_err(Error::from)
- .and_then(move |conn| {
- env3.debug("protocol upgrade done");
+ let abort_future = worker.abort_future();
- let mut http = hyper::server::conn::Http::new();
- http.http2_only(true);
- // increase window size: todo - find optiomal size
- let window_size = 32*1024*1024; // max = (1 << 31) - 2
- http.http2_initial_stream_window_size(window_size);
- http.http2_initial_connection_window_size(window_size);
+ let req_fut = req_body
+ .on_upgrade()
+ .map_err(Error::from)
+ .and_then({
+ let env = env.clone();
+ move |conn| {
+ env.debug("protocol upgrade done");
- http.serve_connection(conn, service)
- .map_err(Error::from)
- })
- .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
- .map_err(|(err, _)| err)
- .and_then(move |(_result, _)| {
- env.log("reader finished sucessfully");
- Ok(())
- })
- })?;
+ let mut http = hyper::server::conn::Http::new();
+ http.http2_only(true);
+ // increase window size: todo - find optiomal size
+ let window_size = 32*1024*1024; // max = (1 << 31) - 2
+ http.http2_initial_stream_window_size(window_size);
+ http.http2_initial_connection_window_size(window_size);
- let response = Response::builder()
- .status(StatusCode::SWITCHING_PROTOCOLS)
- .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
- .body(Body::empty())?;
+ http.serve_connection(conn, service)
+ .map_err(Error::from)
+ }
+ });
+ let abort_future = abort_future
+ .map(|_| Err(format_err!("task aborted")));
- Ok(Box::new(futures::future::ok(response)))
-}
+ use futures::future::Either;
+ futures::future::select(req_fut, abort_future)
+ .map(|res| match res {
+ Either::Left((Ok(res), _)) => Ok(res),
+ Either::Left((Err(err), _)) => Err(err),
+ Either::Right((Ok(res), _)) => Ok(res),
+ Either::Right((Err(err), _)) => Err(err),
+ })
+ .map_ok(move |_| env.log("reader finished successfully"))
+ })?;
-lazy_static!{
- static ref READER_ROUTER: Router = reader_api();
-}
+ let response = Response::builder()
+ .status(StatusCode::SWITCHING_PROTOCOLS)
+ .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
+ .body(Body::empty())?;
-pub fn reader_api() -> Router {
-
- let router = Router::new()
- .subdir(
- "chunk", Router::new()
- .download(api_method_download_chunk())
- )
- .subdir(
- "download", Router::new()
- .download(api_method_download_file())
- )
- .subdir(
- "speedtest", Router::new()
- .download(api_method_speedtest())
- );
-
- router
+ Ok(response)
+ }.boxed()
}
-pub fn api_method_download_file() -> ApiAsyncMethod {
- ApiAsyncMethod::new(
- download_file,
- ObjectSchema::new("Download specified file.")
- .required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
+pub const READER_API_ROUTER: Router = Router::new()
+ .subdirs(&[
+ (
+ "chunk", &Router::new()
+ .download(&API_METHOD_DOWNLOAD_CHUNK)
+ ),
+ (
+ "download", &Router::new()
+ .download(&API_METHOD_DOWNLOAD_FILE)
+ ),
+ (
+ "speedtest", &Router::new()
+ .download(&API_METHOD_SPEEDTEST)
+ ),
+ ]);
+
+#[sortable]
+pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
+ &ApiHandler::AsyncHttp(&download_file),
+ &ObjectSchema::new(
+ "Download specified file.",
+ &sorted!([
+ ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
+ ]),
)
-}
+);
fn download_file(
_parts: Parts,
_req_body: Body,
param: Value,
- _info: &ApiAsyncMethod,
+ _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
-) -> Result<BoxFut, Error> {
+) -> ApiResponseFuture {
- let env: &ReaderEnvironment = rpcenv.as_ref();
- let env2 = env.clone();
+ async move {
+ let env: &ReaderEnvironment = rpcenv.as_ref();
- let file_name = tools::required_string_param(¶m, "file-name")?.to_owned();
+ let file_name = tools::required_string_param(¶m, "file-name")?.to_owned();
- let mut path = env.datastore.base_path();
- path.push(env.backup_dir.relative_path());
- path.push(&file_name);
+ let mut path = env.datastore.base_path();
+ path.push(env.backup_dir.relative_path());
+ path.push(&file_name);
- let path2 = path.clone();
- let path3 = path.clone();
+ env.log(format!("download {:?}", path.clone()));
- let response_future = tokio::fs::File::open(path)
- .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
- .and_then(move |file| {
- env2.log(format!("download {:?}", path3));
- let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()).
- map(|bytes| hyper::Chunk::from(bytes.freeze()));
+ helpers::create_download_response(path).await
+ }.boxed()
+}
- let body = Body::wrap_stream(payload);
+#[sortable]
+pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
+ &ApiHandler::AsyncHttp(&download_chunk),
+ &ObjectSchema::new(
+ "Download specified chunk.",
+ &sorted!([
+ ("digest", false, &CHUNK_DIGEST_SCHEMA),
+ ]),
+ )
+);
- // fixme: set other headers ?
- Ok(Response::builder()
- .status(StatusCode::OK)
- .header(header::CONTENT_TYPE, "application/octet-stream")
- .body(body)
- .unwrap())
- });
+fn download_chunk(
+ _parts: Parts,
+ _req_body: Body,
+ param: Value,
+ _info: &ApiMethod,
+ rpcenv: Box<dyn RpcEnvironment>,
+) -> ApiResponseFuture {
- Ok(Box::new(response_future))
-}
+ async move {
+ let env: &ReaderEnvironment = rpcenv.as_ref();
-pub fn api_method_download_chunk() -> ApiAsyncMethod {
- ApiAsyncMethod::new(
- download_chunk,
- ObjectSchema::new("Download specified chunk.")
- .required("digest", CHUNK_DIGEST_SCHEMA.clone())
- )
+ let digest_str = tools::required_string_param(¶m, "digest")?;
+ let digest = proxmox::tools::hex_to_digest(digest_str)?;
+
+ let (path, _) = env.datastore.chunk_path(&digest);
+ let path2 = path.clone();
+
+ env.debug(format!("download chunk {:?}", path));
+
+ let data = tokio::fs::read(path)
+ .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err)))
+ .await?;
+
+ let body = Body::from(data);
+
+ // fixme: set other headers ?
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .body(body)
+ .unwrap())
+ }.boxed()
}
-fn download_chunk(
+/* this is too slow
+fn download_chunk_old(
_parts: Parts,
_req_body: Body,
param: Value,
- _info: &ApiAsyncMethod,
+ _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
-) -> Result<BoxFut, Error> {
+) -> Result<ApiResponseFuture, Error> {
let env: &ReaderEnvironment = rpcenv.as_ref();
let env2 = env.clone();
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
.and_then(move |file| {
env2.debug(format!("download chunk {:?}", path3));
- let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()).
- map(|bytes| hyper::Chunk::from(bytes.freeze()));
+ let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+ .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
let body = Body::wrap_stream(payload);
// fixme: set other headers ?
- Ok(Response::builder()
+ futures::future::ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(body)
Ok(Box::new(response_future))
}
+*/
-pub fn api_method_speedtest() -> ApiAsyncMethod {
- ApiAsyncMethod::new(
- speedtest,
- ObjectSchema::new("Test 4M block download speed.")
- )
-}
+pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
+ &ApiHandler::AsyncHttp(&speedtest),
+ &ObjectSchema::new("Test 4M block download speed.", &[])
+);
fn speedtest(
_parts: Parts,
_req_body: Body,
_param: Value,
- _info: &ApiAsyncMethod,
+ _info: &ApiMethod,
_rpcenv: Box<dyn RpcEnvironment>,
-) -> Result<BoxFut, Error> {
+) -> ApiResponseFuture {
- let buffer = vec![2u8; 8*1024*1024]; // nonsense [2,2,2,2,2...]
+ let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
let body = Body::from(buffer);
.body(body)
.unwrap();
- Ok(Box::new(future::ok(response)))
+ future::ok(response).boxed()
}