1 //! Backup reader/restore protocol (HTTP2 upgrade)
3 use anyhow
::{bail, format_err, Error}
;
5 use hyper
::header
::{self, HeaderValue, UPGRADE}
;
6 use hyper
::http
::request
::Parts
;
7 use hyper
::{Body, Response, Request, StatusCode}
;
11 use proxmox_sys
::sortable
;
13 http_err
, list_subdirs_api_method
, ApiHandler
, ApiMethod
, ApiResponseFuture
, Permission
,
14 Router
, RpcEnvironment
, SubdirMap
,
16 use proxmox_schema
::{BooleanSchema, ObjectSchema}
;
19 Authid
, DATASTORE_SCHEMA
, BACKUP_TYPE_SCHEMA
, BACKUP_TIME_SCHEMA
, BACKUP_ID_SCHEMA
,
20 CHUNK_DIGEST_SCHEMA
, PRIV_DATASTORE_READ
, PRIV_DATASTORE_BACKUP
,
21 BACKUP_ARCHIVE_NAME_SCHEMA
,
23 use proxmox_sys
::fs
::lock_dir_noblock_shared
;
24 use pbs_tools
::json
::{required_integer_param, required_string_param}
;
25 use pbs_datastore
::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1}
;
26 use pbs_datastore
::backup_info
::BackupDir
;
27 use pbs_datastore
::index
::IndexFile
;
28 use pbs_datastore
::manifest
::{archive_type, ArchiveType}
;
29 use pbs_config
::CachedUserInfo
;
30 use proxmox_rest_server
::{WorkerTask, H2Service}
;
32 use crate::api2
::helpers
;
37 pub const ROUTER
: Router
= Router
::new()
38 .upgrade(&API_METHOD_UPGRADE_BACKUP
);
41 pub const API_METHOD_UPGRADE_BACKUP
: ApiMethod
= ApiMethod
::new(
42 &ApiHandler
::AsyncHttp(&upgrade_to_backup_reader_protocol
),
44 concat
!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!(), "')."),
46 ("store", false, &DATASTORE_SCHEMA
),
47 ("backup-type", false, &BACKUP_TYPE_SCHEMA
),
48 ("backup-id", false, &BACKUP_ID_SCHEMA
),
49 ("backup-time", false, &BACKUP_TIME_SCHEMA
),
50 ("debug", true, &BooleanSchema
::new("Enable verbose debug logging.").schema()),
54 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
55 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
59 fn upgrade_to_backup_reader_protocol(
64 rpcenv
: Box
<dyn RpcEnvironment
>,
65 ) -> ApiResponseFuture
{
68 let debug
= param
["debug"].as_bool().unwrap_or(false);
70 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
71 let store
= required_string_param(¶m
, "store")?
.to_owned();
73 let user_info
= CachedUserInfo
::new()?
;
74 let privs
= user_info
.lookup_privs(&auth_id
, &["datastore", &store
]);
76 let priv_read
= privs
& PRIV_DATASTORE_READ
!= 0;
77 let priv_backup
= privs
& PRIV_DATASTORE_BACKUP
!= 0;
79 // priv_backup needs owner check further down below!
80 if !priv_read
&& !priv_backup
{
81 bail
!("no permissions on /datastore/{}", store
);
84 let datastore
= DataStore
::lookup_datastore(&store
)?
;
86 let backup_type
= required_string_param(¶m
, "backup-type")?
;
87 let backup_id
= required_string_param(¶m
, "backup-id")?
;
88 let backup_time
= required_integer_param(¶m
, "backup-time")?
;
93 .ok_or_else(|| format_err
!("missing Upgrade header"))?
96 if protocols
!= PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!() {
97 bail
!("invalid protocol name");
100 if parts
.version
>= http
::version
::Version
::HTTP_2
{
101 bail
!("unexpected http version '{:?}' (expected version < 2)", parts
.version
);
104 let env_type
= rpcenv
.env_type();
106 let backup_dir
= BackupDir
::new(backup_type
, backup_id
, backup_time
)?
;
108 let owner
= datastore
.get_owner(backup_dir
.group())?
;
109 let correct_owner
= owner
== auth_id
111 && Authid
::from(owner
.user().clone()) == auth_id
);
113 bail
!("backup owner check failed!");
117 let _guard
= lock_dir_noblock_shared(
118 &datastore
.snapshot_path(&backup_dir
),
120 "locked by another operation")?
;
122 let path
= datastore
.base_path();
124 //let files = BackupInfo::list_files(&path, &backup_dir)?;
126 let worker_id
= format
!("{}:{}/{}/{:08X}", store
, backup_type
, backup_id
, backup_dir
.backup_time());
128 WorkerTask
::spawn("reader", Some(worker_id
), auth_id
.to_string(), true, move |worker
| async
move {
131 let mut env
= ReaderEnvironment
::new(
141 env
.log(format
!("starting new backup reader datastore '{}': {:?}", store
, path
));
143 let service
= H2Service
::new(env
.clone(), worker
.clone(), &READER_API_ROUTER
, debug
);
145 let mut abort_future
= worker
.abort_future()
146 .map(|_
| Err(format_err
!("task aborted")));
148 let env2
= env
.clone();
149 let req_fut
= async
move {
150 let conn
= hyper
::upgrade
::on(Request
::from_parts(parts
, req_body
)).await?
;
151 env2
.debug("protocol upgrade done");
153 let mut http
= hyper
::server
::conn
::Http
::new();
154 http
.http2_only(true);
155 // increase window size: todo - find optiomal size
156 let window_size
= 32*1024*1024; // max = (1 << 31) - 2
157 http
.http2_initial_stream_window_size(window_size
);
158 http
.http2_initial_connection_window_size(window_size
);
159 http
.http2_max_frame_size(4*1024*1024);
161 http
.serve_connection(conn
, service
)
162 .map_err(Error
::from
).await
166 req
= req_fut
.fuse() => req?
,
167 abort
= abort_future
=> abort?
,
170 env
.log("reader finished successfully");
175 let response
= Response
::builder()
176 .status(StatusCode
::SWITCHING_PROTOCOLS
)
177 .header(UPGRADE
, HeaderValue
::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!()))
178 .body(Body
::empty())?
;
184 const READER_API_SUBDIRS
: SubdirMap
= &[
186 "chunk", &Router
::new()
187 .download(&API_METHOD_DOWNLOAD_CHUNK
)
190 "download", &Router
::new()
191 .download(&API_METHOD_DOWNLOAD_FILE
)
194 "speedtest", &Router
::new()
195 .download(&API_METHOD_SPEEDTEST
)
199 pub const READER_API_ROUTER
: Router
= Router
::new()
200 .get(&list_subdirs_api_method
!(READER_API_SUBDIRS
))
201 .subdirs(READER_API_SUBDIRS
);
204 pub const API_METHOD_DOWNLOAD_FILE
: ApiMethod
= ApiMethod
::new(
205 &ApiHandler
::AsyncHttp(&download_file
),
207 "Download specified file.",
209 ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),
219 rpcenv
: Box
<dyn RpcEnvironment
>,
220 ) -> ApiResponseFuture
{
223 let env
: &ReaderEnvironment
= rpcenv
.as_ref();
225 let file_name
= required_string_param(¶m
, "file-name")?
.to_owned();
227 let mut path
= env
.datastore
.base_path();
228 path
.push(env
.backup_dir
.relative_path());
229 path
.push(&file_name
);
231 env
.log(format
!("download {:?}", path
.clone()));
233 let index
: Option
<Box
<dyn IndexFile
+ Send
>> = match archive_type(&file_name
)?
{
234 ArchiveType
::FixedIndex
=> {
235 let index
= env
.datastore
.open_fixed_reader(&path
)?
;
236 Some(Box
::new(index
))
238 ArchiveType
::DynamicIndex
=> {
239 let index
= env
.datastore
.open_dynamic_reader(&path
)?
;
240 Some(Box
::new(index
))
245 if let Some(index
) = index
{
246 env
.log(format
!("register chunks in '{}' as downloadable.", file_name
));
248 for pos
in 0..index
.index_count() {
249 let info
= index
.chunk_info(pos
).unwrap();
250 env
.register_chunk(info
.digest
);
254 helpers
::create_download_response(path
).await
259 pub const API_METHOD_DOWNLOAD_CHUNK
: ApiMethod
= ApiMethod
::new(
260 &ApiHandler
::AsyncHttp(&download_chunk
),
262 "Download specified chunk.",
264 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
274 rpcenv
: Box
<dyn RpcEnvironment
>,
275 ) -> ApiResponseFuture
{
278 let env
: &ReaderEnvironment
= rpcenv
.as_ref();
280 let digest_str
= required_string_param(¶m
, "digest")?
;
281 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
283 if !env
.check_chunk_access(digest
) {
284 env
.log(format
!("attempted to download chunk {} which is not in registered chunk list", digest_str
));
285 return Err(http_err
!(UNAUTHORIZED
, "download chunk {} not allowed", digest_str
));
288 let (path
, _
) = env
.datastore
.chunk_path(&digest
);
289 let path2
= path
.clone();
291 env
.debug(format
!("download chunk {:?}", path
));
293 let data
= proxmox_async
::runtime
::block_in_place(|| std
::fs
::read(path
))
294 .map_err(move |err
| http_err
!(BAD_REQUEST
, "reading file {:?} failed: {}", path2
, err
))?
;
296 let body
= Body
::from(data
);
298 // fixme: set other headers ?
299 Ok(Response
::builder()
300 .status(StatusCode
::OK
)
301 .header(header
::CONTENT_TYPE
, "application/octet-stream")
308 fn download_chunk_old(
313 rpcenv: Box<dyn RpcEnvironment>,
314 ) -> Result<ApiResponseFuture, Error> {
316 let env: &ReaderEnvironment = rpcenv.as_ref();
317 let env2 = env.clone();
319 let digest_str = required_string_param(¶m, "digest")?;
320 let digest = <[u8; 32]>::from_hex(digest_str)?;
322 let (path, _) = env.datastore.chunk_path(&digest);
324 let path2 = path.clone();
325 let path3 = path.clone();
327 let response_future = tokio::fs::File::open(path)
328 .map_err(move |err| http_err!(BAD_REQUEST, "open file {:?} failed: {}", path2, err))
329 .and_then(move |file| {
330 env2.debug(format!("download chunk {:?}", path3));
331 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
332 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
334 let body = Body::wrap_stream(payload);
336 // fixme: set other headers ?
337 futures::future::ok(Response::builder()
338 .status(StatusCode::OK)
339 .header(header::CONTENT_TYPE, "application/octet-stream")
344 Ok(Box::new(response_future))
348 pub const API_METHOD_SPEEDTEST
: ApiMethod
= ApiMethod
::new(
349 &ApiHandler
::AsyncHttp(&speedtest
),
350 &ObjectSchema
::new("Test 1M block download speed.", &[])
358 _rpcenv
: Box
<dyn RpcEnvironment
>,
359 ) -> ApiResponseFuture
{
361 let buffer
= vec
![65u8; 1024*1024]; // nonsense [A,A,A...]
363 let body
= Body
::from(buffer
);
365 let response
= Response
::builder()
366 .status(StatusCode
::OK
)
367 .header(header
::CONTENT_TYPE
, "application/octet-stream")
371 future
::ok(response
).boxed()