1 //use chrono::{Local, TimeZone};
2 use anyhow
::{bail, format_err, Error}
;
4 use hyper
::header
::{self, HeaderValue, UPGRADE}
;
5 use hyper
::http
::request
::Parts
;
6 use hyper
::{Body, Response, StatusCode}
;
9 use proxmox
::{sortable, identity}
;
10 use proxmox
::api
::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission}
;
11 use proxmox
::api
::schema
::*;
12 use proxmox
::http_err
;
14 use crate::api2
::types
::*;
16 use crate::server
::{WorkerTask, H2Service}
;
18 use crate::config
::acl
::PRIV_DATASTORE_READ
;
19 use crate::config
::cached_user_info
::CachedUserInfo
;
24 pub const ROUTER
: Router
= Router
::new()
25 .upgrade(&API_METHOD_UPGRADE_BACKUP
);
28 pub const API_METHOD_UPGRADE_BACKUP
: ApiMethod
= ApiMethod
::new(
29 &ApiHandler
::AsyncHttp(&upgrade_to_backup_reader_protocol
),
31 concat
!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!(), "')."),
33 ("store", false, &DATASTORE_SCHEMA
),
34 ("backup-type", false, &StringSchema
::new("Backup type.")
35 .format(&ApiStringFormat
::Enum(&["vm", "ct", "host"]))
38 ("backup-id", false, &StringSchema
::new("Backup ID.").schema()),
39 ("backup-time", false, &IntegerSchema
::new("Backup time (Unix epoch.)")
40 .minimum(1_547_797_308)
43 ("debug", true, &BooleanSchema
::new("Enable verbose debug logging.").schema()),
47 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
48 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
52 fn upgrade_to_backup_reader_protocol(
57 rpcenv
: Box
<dyn RpcEnvironment
>,
58 ) -> ApiResponseFuture
{
61 let debug
= param
["debug"].as_bool().unwrap_or(false);
63 let username
= rpcenv
.get_user().unwrap();
64 let store
= tools
::required_string_param(¶m
, "store")?
.to_owned();
66 let user_info
= CachedUserInfo
::new()?
;
67 user_info
.check_privs(&username
, &["datastore", &store
], PRIV_DATASTORE_READ
, false)?
;
69 let datastore
= DataStore
::lookup_datastore(&store
)?
;
71 let backup_type
= tools
::required_string_param(¶m
, "backup-type")?
;
72 let backup_id
= tools
::required_string_param(¶m
, "backup-id")?
;
73 let backup_time
= tools
::required_integer_param(¶m
, "backup-time")?
;
78 .ok_or_else(|| format_err
!("missing Upgrade header"))?
81 if protocols
!= PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!() {
82 bail
!("invalid protocol name");
85 if parts
.version
>= http
::version
::Version
::HTTP_2
{
86 bail
!("unexpected http version '{:?}' (expected version < 2)", parts
.version
);
89 let env_type
= rpcenv
.env_type();
91 let backup_dir
= BackupDir
::new(backup_type
, backup_id
, backup_time
);
92 let path
= datastore
.base_path();
94 //let files = BackupInfo::list_files(&path, &backup_dir)?;
96 let worker_id
= format
!("{}_{}_{}_{:08X}", store
, backup_type
, backup_id
, backup_dir
.backup_time().timestamp());
98 WorkerTask
::spawn("reader", Some(worker_id
), &username
.clone(), true, move |worker
| {
99 let mut env
= ReaderEnvironment
::new(
100 env_type
, username
.clone(), worker
.clone(), datastore
, backup_dir
);
104 env
.log(format
!("starting new backup reader datastore '{}': {:?}", store
, path
));
106 let service
= H2Service
::new(env
.clone(), worker
.clone(), &READER_API_ROUTER
, debug
);
108 let abort_future
= worker
.abort_future();
110 let req_fut
= req_body
112 .map_err(Error
::from
)
114 let env
= env
.clone();
116 env
.debug("protocol upgrade done");
118 let mut http
= hyper
::server
::conn
::Http
::new();
119 http
.http2_only(true);
120 // increase window size: todo - find optiomal size
121 let window_size
= 32*1024*1024; // max = (1 << 31) - 2
122 http
.http2_initial_stream_window_size(window_size
);
123 http
.http2_initial_connection_window_size(window_size
);
125 http
.serve_connection(conn
, service
)
126 .map_err(Error
::from
)
129 let abort_future
= abort_future
130 .map(|_
| Err(format_err
!("task aborted")));
132 use futures
::future
::Either
;
133 futures
::future
::select(req_fut
, abort_future
)
134 .map(|res
| match res
{
135 Either
::Left((Ok(res
), _
)) => Ok(res
),
136 Either
::Left((Err(err
), _
)) => Err(err
),
137 Either
::Right((Ok(res
), _
)) => Ok(res
),
138 Either
::Right((Err(err
), _
)) => Err(err
),
140 .map_ok(move |_
| env
.log("reader finished sucessfully"))
143 let response
= Response
::builder()
144 .status(StatusCode
::SWITCHING_PROTOCOLS
)
145 .header(UPGRADE
, HeaderValue
::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!()))
146 .body(Body
::empty())?
;
152 pub const READER_API_ROUTER
: Router
= Router
::new()
155 "chunk", &Router
::new()
156 .download(&API_METHOD_DOWNLOAD_CHUNK
)
159 "download", &Router
::new()
160 .download(&API_METHOD_DOWNLOAD_FILE
)
163 "speedtest", &Router
::new()
164 .download(&API_METHOD_SPEEDTEST
)
169 pub const API_METHOD_DOWNLOAD_FILE
: ApiMethod
= ApiMethod
::new(
170 &ApiHandler
::AsyncHttp(&download_file
),
172 "Download specified file.",
174 ("file-name", false, &crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
),
184 rpcenv
: Box
<dyn RpcEnvironment
>,
185 ) -> ApiResponseFuture
{
188 let env
: &ReaderEnvironment
= rpcenv
.as_ref();
190 let file_name
= tools
::required_string_param(¶m
, "file-name")?
.to_owned();
192 let mut path
= env
.datastore
.base_path();
193 path
.push(env
.backup_dir
.relative_path());
194 path
.push(&file_name
);
196 let path2
= path
.clone();
197 let path3
= path
.clone();
199 let file
= tokio
::fs
::File
::open(path
)
200 .map_err(move |err
| http_err
!(BAD_REQUEST
, format
!("open file {:?} failed: {}", path2
, err
)))
203 env
.log(format
!("download {:?}", path3
));
205 let payload
= tokio_util
::codec
::FramedRead
::new(file
, tokio_util
::codec
::BytesCodec
::new())
206 .map_ok(|bytes
| hyper
::body
::Bytes
::from(bytes
.freeze()));
208 let body
= Body
::wrap_stream(payload
);
210 // fixme: set other headers ?
211 Ok(Response
::builder()
212 .status(StatusCode
::OK
)
213 .header(header
::CONTENT_TYPE
, "application/octet-stream")
220 pub const API_METHOD_DOWNLOAD_CHUNK
: ApiMethod
= ApiMethod
::new(
221 &ApiHandler
::AsyncHttp(&download_chunk
),
223 "Download specified chunk.",
225 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
235 rpcenv
: Box
<dyn RpcEnvironment
>,
236 ) -> ApiResponseFuture
{
239 let env
: &ReaderEnvironment
= rpcenv
.as_ref();
241 let digest_str
= tools
::required_string_param(¶m
, "digest")?
;
242 let digest
= proxmox
::tools
::hex_to_digest(digest_str
)?
;
244 let (path
, _
) = env
.datastore
.chunk_path(&digest
);
245 let path2
= path
.clone();
247 env
.debug(format
!("download chunk {:?}", path
));
249 let data
= tokio
::fs
::read(path
)
250 .map_err(move |err
| http_err
!(BAD_REQUEST
, format
!("reading file {:?} failed: {}", path2
, err
)))
253 let body
= Body
::from(data
);
255 // fixme: set other headers ?
256 Ok(Response
::builder()
257 .status(StatusCode
::OK
)
258 .header(header
::CONTENT_TYPE
, "application/octet-stream")
265 fn download_chunk_old(
270 rpcenv: Box<dyn RpcEnvironment>,
271 ) -> Result<ApiResponseFuture, Error> {
273 let env: &ReaderEnvironment = rpcenv.as_ref();
274 let env2 = env.clone();
276 let digest_str = tools::required_string_param(¶m, "digest")?;
277 let digest = proxmox::tools::hex_to_digest(digest_str)?;
279 let (path, _) = env.datastore.chunk_path(&digest);
281 let path2 = path.clone();
282 let path3 = path.clone();
284 let response_future = tokio::fs::File::open(path)
285 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
286 .and_then(move |file| {
287 env2.debug(format!("download chunk {:?}", path3));
288 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
289 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
291 let body = Body::wrap_stream(payload);
293 // fixme: set other headers ?
294 futures::future::ok(Response::builder()
295 .status(StatusCode::OK)
296 .header(header::CONTENT_TYPE, "application/octet-stream")
301 Ok(Box::new(response_future))
305 pub const API_METHOD_SPEEDTEST
: ApiMethod
= ApiMethod
::new(
306 &ApiHandler
::AsyncHttp(&speedtest
),
307 &ObjectSchema
::new("Test 4M block download speed.", &[])
315 _rpcenv
: Box
<dyn RpcEnvironment
>,
316 ) -> ApiResponseFuture
{
318 let buffer
= vec
![65u8; 1024*1024]; // nonsense [A,A,A...]
320 let body
= Body
::from(buffer
);
322 let response
= Response
::builder()
323 .status(StatusCode
::OK
)
324 .header(header
::CONTENT_TYPE
, "application/octet-stream")
328 future
::ok(response
).boxed()