1 //! Backup reader/restore protocol (HTTP2 upgrade)
3 use anyhow
::{bail, format_err, Error}
;
6 use hyper
::header
::{self, HeaderValue, UPGRADE}
;
7 use hyper
::http
::request
::Parts
;
8 use hyper
::{Body, Request, Response, StatusCode}
;
9 use serde
::Deserialize
;
10 use serde_json
::Value
;
13 http_err
, list_subdirs_api_method
, ApiHandler
, ApiMethod
, ApiResponseFuture
, Permission
,
14 Router
, RpcEnvironment
, SubdirMap
,
16 use proxmox_schema
::{BooleanSchema, ObjectSchema}
;
17 use proxmox_sys
::sortable
;
20 Authid
, Operation
, BACKUP_ARCHIVE_NAME_SCHEMA
, BACKUP_ID_SCHEMA
, BACKUP_NAMESPACE_SCHEMA
,
21 BACKUP_TIME_SCHEMA
, BACKUP_TYPE_SCHEMA
, CHUNK_DIGEST_SCHEMA
, DATASTORE_SCHEMA
,
22 PRIV_DATASTORE_BACKUP
, PRIV_DATASTORE_READ
,
24 use pbs_config
::CachedUserInfo
;
25 use pbs_datastore
::index
::IndexFile
;
26 use pbs_datastore
::manifest
::{archive_type, ArchiveType}
;
27 use pbs_datastore
::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1}
;
28 use pbs_tools
::json
::required_string_param
;
29 use proxmox_rest_server
::{H2Service, WorkerTask}
;
30 use proxmox_sys
::fs
::lock_dir_noblock_shared
;
32 use crate::api2
::backup
::optional_ns_param
;
33 use crate::api2
::helpers
;
38 pub const ROUTER
: Router
= Router
::new().upgrade(&API_METHOD_UPGRADE_BACKUP
);
41 pub const API_METHOD_UPGRADE_BACKUP
: ApiMethod
= ApiMethod
::new(
42 &ApiHandler
::AsyncHttp(&upgrade_to_backup_reader_protocol
),
45 "Upgraded to backup protocol ('",
46 PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!(),
50 ("store", false, &DATASTORE_SCHEMA
),
51 ("ns", true, &BACKUP_NAMESPACE_SCHEMA
),
52 ("backup-type", false, &BACKUP_TYPE_SCHEMA
),
53 ("backup-id", false, &BACKUP_ID_SCHEMA
),
54 ("backup-time", false, &BACKUP_TIME_SCHEMA
),
58 &BooleanSchema
::new("Enable verbose debug logging.").schema()
64 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
65 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
69 fn upgrade_to_backup_reader_protocol(
74 rpcenv
: Box
<dyn RpcEnvironment
>,
75 ) -> ApiResponseFuture
{
77 let debug
= param
["debug"].as_bool().unwrap_or(false);
79 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
80 let store
= required_string_param(¶m
, "store")?
.to_owned();
82 let user_info
= CachedUserInfo
::new()?
;
83 let privs
= user_info
.lookup_privs(&auth_id
, &["datastore", &store
]);
85 let priv_read
= privs
& PRIV_DATASTORE_READ
!= 0;
86 let priv_backup
= privs
& PRIV_DATASTORE_BACKUP
!= 0;
88 // priv_backup needs owner check further down below!
89 if !priv_read
&& !priv_backup
{
90 bail
!("no permissions on /datastore/{}", store
);
93 let datastore
= DataStore
::lookup_datastore(&store
, Some(Operation
::Read
))?
;
95 let backup_ns
= optional_ns_param(¶m
)?
;
96 let backup_dir
= pbs_api_types
::BackupDir
::deserialize(¶m
)?
;
101 .ok_or_else(|| format_err
!("missing Upgrade header"))?
104 if protocols
!= PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!() {
105 bail
!("invalid protocol name");
108 if parts
.version
>= http
::version
::Version
::HTTP_2
{
110 "unexpected http version '{:?}' (expected version < 2)",
115 let env_type
= rpcenv
.env_type();
117 let backup_dir
= datastore
.backup_dir(backup_ns
, backup_dir
)?
;
119 let owner
= backup_dir
.get_owner()?
;
120 let correct_owner
= owner
== auth_id
121 || (owner
.is_token() && Authid
::from(owner
.user().clone()) == auth_id
);
123 bail
!("backup owner check failed!");
127 let _guard
= lock_dir_noblock_shared(
128 &backup_dir
.full_path(),
130 "locked by another operation",
133 let path
= datastore
.base_path();
135 //let files = BackupInfo::list_files(&path, &backup_dir)?;
137 // FIXME: include namespace here?
138 let worker_id
= format
!(
141 backup_dir
.backup_type(),
142 backup_dir
.backup_id(),
143 backup_dir
.backup_time(),
151 move |worker
| async
move {
154 let mut env
= ReaderEnvironment
::new(
165 "starting new backup reader datastore '{}': {:?}",
170 H2Service
::new(env
.clone(), worker
.clone(), &READER_API_ROUTER
, debug
);
172 let mut abort_future
= worker
174 .map(|_
| Err(format_err
!("task aborted")));
176 let env2
= env
.clone();
177 let req_fut
= async
move {
178 let conn
= hyper
::upgrade
::on(Request
::from_parts(parts
, req_body
)).await?
;
179 env2
.debug("protocol upgrade done");
181 let mut http
= hyper
::server
::conn
::Http
::new();
182 http
.http2_only(true);
183 // increase window size: todo - find optiomal size
184 let window_size
= 32 * 1024 * 1024; // max = (1 << 31) - 2
185 http
.http2_initial_stream_window_size(window_size
);
186 http
.http2_initial_connection_window_size(window_size
);
187 http
.http2_max_frame_size(4 * 1024 * 1024);
189 http
.serve_connection(conn
, service
)
190 .map_err(Error
::from
)
195 req
= req_fut
.fuse() => req?
,
196 abort
= abort_future
=> abort?
,
199 env
.log("reader finished successfully");
205 let response
= Response
::builder()
206 .status(StatusCode
::SWITCHING_PROTOCOLS
)
209 HeaderValue
::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!()),
211 .body(Body
::empty())?
;
218 const READER_API_SUBDIRS
: SubdirMap
= &[
219 ("chunk", &Router
::new().download(&API_METHOD_DOWNLOAD_CHUNK
)),
222 &Router
::new().download(&API_METHOD_DOWNLOAD_FILE
),
224 ("speedtest", &Router
::new().download(&API_METHOD_SPEEDTEST
)),
227 pub const READER_API_ROUTER
: Router
= Router
::new()
228 .get(&list_subdirs_api_method
!(READER_API_SUBDIRS
))
229 .subdirs(READER_API_SUBDIRS
);
232 pub const API_METHOD_DOWNLOAD_FILE
: ApiMethod
= ApiMethod
::new(
233 &ApiHandler
::AsyncHttp(&download_file
),
235 "Download specified file.",
236 &sorted
!([("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),]),
245 rpcenv
: Box
<dyn RpcEnvironment
>,
246 ) -> ApiResponseFuture
{
248 let env
: &ReaderEnvironment
= rpcenv
.as_ref();
250 let file_name
= required_string_param(¶m
, "file-name")?
.to_owned();
252 let mut path
= env
.datastore
.base_path();
253 path
.push(env
.backup_dir
.relative_path());
254 path
.push(&file_name
);
256 env
.log(format
!("download {:?}", path
.clone()));
258 let index
: Option
<Box
<dyn IndexFile
+ Send
>> = match archive_type(&file_name
)?
{
259 ArchiveType
::FixedIndex
=> {
260 let index
= env
.datastore
.open_fixed_reader(&path
)?
;
261 Some(Box
::new(index
))
263 ArchiveType
::DynamicIndex
=> {
264 let index
= env
.datastore
.open_dynamic_reader(&path
)?
;
265 Some(Box
::new(index
))
270 if let Some(index
) = index
{
272 "register chunks in '{}' as downloadable.",
276 for pos
in 0..index
.index_count() {
277 let info
= index
.chunk_info(pos
).unwrap();
278 env
.register_chunk(info
.digest
);
282 helpers
::create_download_response(path
).await
288 pub const API_METHOD_DOWNLOAD_CHUNK
: ApiMethod
= ApiMethod
::new(
289 &ApiHandler
::AsyncHttp(&download_chunk
),
291 "Download specified chunk.",
292 &sorted
!([("digest", false, &CHUNK_DIGEST_SCHEMA
),]),
301 rpcenv
: Box
<dyn RpcEnvironment
>,
302 ) -> ApiResponseFuture
{
304 let env
: &ReaderEnvironment
= rpcenv
.as_ref();
306 let digest_str
= required_string_param(¶m
, "digest")?
;
307 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
309 if !env
.check_chunk_access(digest
) {
311 "attempted to download chunk {} which is not in registered chunk list",
314 return Err(http_err
!(
316 "download chunk {} not allowed",
321 let (path
, _
) = env
.datastore
.chunk_path(&digest
);
322 let path2
= path
.clone();
324 env
.debug(format
!("download chunk {:?}", path
));
327 proxmox_async
::runtime
::block_in_place(|| std
::fs
::read(path
)).map_err(move |err
| {
328 http_err
!(BAD_REQUEST
, "reading file {:?} failed: {}", path2
, err
)
331 let body
= Body
::from(data
);
333 // fixme: set other headers ?
334 Ok(Response
::builder()
335 .status(StatusCode
::OK
)
336 .header(header
::CONTENT_TYPE
, "application/octet-stream")
344 fn download_chunk_old(
349 rpcenv: Box<dyn RpcEnvironment>,
350 ) -> Result<ApiResponseFuture, Error> {
352 let env: &ReaderEnvironment = rpcenv.as_ref();
353 let env2 = env.clone();
355 let digest_str = required_string_param(¶m, "digest")?;
356 let digest = <[u8; 32]>::from_hex(digest_str)?;
358 let (path, _) = env.datastore.chunk_path(&digest);
360 let path2 = path.clone();
361 let path3 = path.clone();
363 let response_future = tokio::fs::File::open(path)
364 .map_err(move |err| http_err!(BAD_REQUEST, "open file {:?} failed: {}", path2, err))
365 .and_then(move |file| {
366 env2.debug(format!("download chunk {:?}", path3));
367 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
368 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
370 let body = Body::wrap_stream(payload);
372 // fixme: set other headers ?
373 futures::future::ok(Response::builder()
374 .status(StatusCode::OK)
375 .header(header::CONTENT_TYPE, "application/octet-stream")
380 Ok(Box::new(response_future))
384 pub const API_METHOD_SPEEDTEST
: ApiMethod
= ApiMethod
::new(
385 &ApiHandler
::AsyncHttp(&speedtest
),
386 &ObjectSchema
::new("Test 1M block download speed.", &[]),
394 _rpcenv
: Box
<dyn RpcEnvironment
>,
395 ) -> ApiResponseFuture
{
396 let buffer
= vec
![65u8; 1024 * 1024]; // nonsense [A,A,A...]
398 let body
= Body
::from(buffer
);
400 let response
= Response
::builder()
401 .status(StatusCode
::OK
)
402 .header(header
::CONTENT_TYPE
, "application/octet-stream")
406 future
::ok(response
).boxed()