1 //! Backup protocol (HTTP2 upgrade)
3 use anyhow
::{bail, format_err, Error}
;
6 use hyper
::header
::{HeaderValue, UPGRADE}
;
7 use hyper
::http
::request
::Parts
;
8 use hyper
::{Body, Request, Response, StatusCode}
;
9 use serde
::Deserialize
;
10 use serde_json
::{json, Value}
;
12 use proxmox_router
::list_subdirs_api_method
;
14 ApiHandler
, ApiMethod
, ApiResponseFuture
, Permission
, Router
, RpcEnvironment
, SubdirMap
,
16 use proxmox_schema
::*;
17 use proxmox_sys
::sortable
;
20 Authid
, BackupNamespace
, BackupType
, DatastoreWithNamespace
, Operation
, SnapshotVerifyState
,
21 VerifyState
, BACKUP_ARCHIVE_NAME_SCHEMA
, BACKUP_ID_SCHEMA
, BACKUP_NAMESPACE_SCHEMA
,
22 BACKUP_TIME_SCHEMA
, BACKUP_TYPE_SCHEMA
, CHUNK_DIGEST_SCHEMA
, DATASTORE_SCHEMA
,
23 PRIV_DATASTORE_BACKUP
,
25 use pbs_config
::CachedUserInfo
;
26 use pbs_datastore
::index
::IndexFile
;
27 use pbs_datastore
::manifest
::{archive_type, ArchiveType}
;
28 use pbs_datastore
::{DataStore, PROXMOX_BACKUP_PROTOCOL_ID_V1}
;
29 use pbs_tools
::json
::{required_array_param, required_integer_param, required_string_param}
;
30 use proxmox_rest_server
::{H2Service, WorkerTask}
;
31 use proxmox_sys
::fs
::lock_dir_noblock_shared
;
39 pub const ROUTER
: Router
= Router
::new().upgrade(&API_METHOD_UPGRADE_BACKUP
);
42 pub const API_METHOD_UPGRADE_BACKUP
: ApiMethod
= ApiMethod
::new(
43 &ApiHandler
::AsyncHttp(&upgrade_to_backup_protocol
),
45 concat
!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1
!(), "')."),
47 ("store", false, &DATASTORE_SCHEMA
),
48 ("ns", true, &BACKUP_NAMESPACE_SCHEMA
),
49 ("backup-type", false, &BACKUP_TYPE_SCHEMA
),
50 ("backup-id", false, &BACKUP_ID_SCHEMA
),
51 ("backup-time", false, &BACKUP_TIME_SCHEMA
),
52 ("debug", true, &BooleanSchema
::new("Enable verbose debug logging.").schema()),
53 ("benchmark", true, &BooleanSchema
::new("Job is a benchmark (do not keep data).").schema()),
57 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
58 Some("Requires on /datastore/{store}[/{namespace}] DATASTORE_BACKUP and being the owner of the group"),
62 pub(crate) fn optional_ns_param(param
: &Value
) -> Result
<BackupNamespace
, Error
> {
63 match param
.get("ns") {
64 Some(Value
::String(ns
)) => ns
.parse(),
65 None
=> Ok(BackupNamespace
::root()),
66 _
=> bail
!("invalid ns parameter"),
70 fn upgrade_to_backup_protocol(
75 rpcenv
: Box
<dyn RpcEnvironment
>,
76 ) -> ApiResponseFuture
{
78 let debug
= param
["debug"].as_bool().unwrap_or(false);
79 let benchmark
= param
["benchmark"].as_bool().unwrap_or(false);
81 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
83 let store
= required_string_param(¶m
, "store")?
.to_owned();
84 let backup_ns
= optional_ns_param(¶m
)?
;
85 let store_with_ns
= DatastoreWithNamespace
{
87 ns
: backup_ns
.clone(),
89 let backup_dir_arg
= pbs_api_types
::BackupDir
::deserialize(¶m
)?
;
91 let user_info
= CachedUserInfo
::new()?
;
93 let privs
= user_info
.lookup_privs(&auth_id
, &store_with_ns
.acl_path());
94 if privs
& PRIV_DATASTORE_BACKUP
== 0 {
95 proxmox_router
::http_bail
!(FORBIDDEN
, "permission check failed");
98 let datastore
= DataStore
::lookup_datastore(&store
, Some(Operation
::Write
))?
;
100 let protocols
= parts
103 .ok_or_else(|| format_err
!("missing Upgrade header"))?
106 if protocols
!= PROXMOX_BACKUP_PROTOCOL_ID_V1
!() {
107 bail
!("invalid protocol name");
110 if parts
.version
>= http
::version
::Version
::HTTP_2
{
112 "unexpected http version '{:?}' (expected version < 2)",
117 if !datastore
.namespace_path(&backup_ns
).exists() {
118 proxmox_router
::http_bail
!(NOT_FOUND
, "namespace not found");
121 // FIXME: include namespace here?
122 let worker_id
= format
!("{}:{}/{}", store
, backup_dir_arg
.ty(), backup_dir_arg
.id());
124 let env_type
= rpcenv
.env_type();
126 let backup_group
= datastore
.backup_group(backup_ns
, backup_dir_arg
.group
.clone());
128 let worker_type
= if backup_group
.backup_type() == BackupType
::Host
129 && backup_group
.backup_id() == "benchmark"
132 bail
!("unable to run benchmark without --benchmark flags");
137 bail
!("benchmark flags is only allowed on 'host/benchmark'");
142 // lock backup group to only allow one backup per group at a time
143 let (owner
, _group_guard
) = datastore
.create_locked_backup_group(
144 backup_group
.backup_ns(),
145 backup_group
.as_ref(),
151 owner
== auth_id
|| (owner
.is_token() && Authid
::from(owner
.user().clone()) == auth_id
);
152 if !correct_owner
&& worker_type
!= "benchmark" {
153 // only the owner is allowed to create additional snapshots
154 bail
!("backup owner check failed ({} != {})", auth_id
, owner
);
158 let info
= backup_group
.last_backup(true).unwrap_or(None
);
159 if let Some(info
) = info
{
160 let (manifest
, _
) = info
.backup_dir
.load_manifest()?
;
161 let verify
= manifest
.unprotected
["verify_state"].clone();
162 match serde_json
::from_value
::<SnapshotVerifyState
>(verify
) {
163 Ok(verify
) => match verify
.state
{
164 VerifyState
::Ok
=> Some(info
),
165 VerifyState
::Failed
=> None
,
168 // no verify state found, treat as valid
177 let backup_dir
= backup_group
.backup_dir(backup_dir_arg
.time
)?
;
179 let _last_guard
= if let Some(last
) = &last_backup
{
180 if backup_dir
.backup_time() <= last
.backup_dir
.backup_time() {
181 bail
!("backup timestamp is older than last backup.");
184 // lock last snapshot to prevent forgetting/pruning it during backup
185 let full_path
= last
.backup_dir
.full_path();
186 Some(lock_dir_noblock_shared(
189 "base snapshot is already locked by another operation",
195 let (path
, is_new
, snap_guard
) =
196 datastore
.create_locked_backup_dir(backup_dir
.backup_ns(), backup_dir
.as_ref())?
;
198 bail
!("backup directory already exists.");
207 let mut env
= BackupEnvironment
::new(
216 env
.last_backup
= last_backup
;
219 "starting new {} on datastore '{}': {:?}",
220 worker_type
, store
, path
224 H2Service
::new(env
.clone(), worker
.clone(), &BACKUP_API_ROUTER
, debug
);
226 let abort_future
= worker
.abort_future();
228 let env2
= env
.clone();
230 let mut req_fut
= hyper
::upgrade
::on(Request
::from_parts(parts
, req_body
))
231 .map_err(Error
::from
)
232 .and_then(move |conn
| {
233 env2
.debug("protocol upgrade done");
235 let mut http
= hyper
::server
::conn
::Http
::new();
236 http
.http2_only(true);
237 // increase window size: todo - find optiomal size
238 let window_size
= 32 * 1024 * 1024; // max = (1 << 31) - 2
239 http
.http2_initial_stream_window_size(window_size
);
240 http
.http2_initial_connection_window_size(window_size
);
241 http
.http2_max_frame_size(4 * 1024 * 1024);
243 let env3
= env2
.clone();
244 http
.serve_connection(conn
, service
).map(move |result
| {
247 // Avoid Transport endpoint is not connected (os error 107)
248 // fixme: find a better way to test for that error
249 if err
.to_string().starts_with("connection error")
254 Err(Error
::from(err
))
261 let mut abort_future
= abort_future
.map(|_
| Err(format_err
!("task aborted")));
264 // keep flock until task ends
265 let _group_guard
= _group_guard
;
266 let snap_guard
= snap_guard
;
267 let _last_guard
= _last_guard
;
270 req
= req_fut
=> req
,
271 abrt
= abort_future
=> abrt
,
274 env
.log("benchmark finished successfully");
275 proxmox_async
::runtime
::block_in_place(|| env
.remove_backup())?
;
279 let verify
= |env
: BackupEnvironment
| {
280 if let Err(err
) = env
.verify_after_complete(snap_guard
) {
282 "backup finished, but starting the requested verify task failed: {}",
288 match (res
, env
.ensure_finished()) {
290 env
.log("backup finished successfully");
294 (Err(err
), Ok(())) => {
295 // ignore errors after finish
296 env
.log(format
!("backup had errors but finished: {}", err
));
300 (Ok(_
), Err(err
)) => {
301 env
.log(format
!("backup ended and finish failed: {}", err
));
302 env
.log("removing unfinished backup");
303 proxmox_async
::runtime
::block_in_place(|| env
.remove_backup())?
;
306 (Err(err
), Err(_
)) => {
307 env
.log(format
!("backup failed: {}", err
));
308 env
.log("removing failed backup");
309 proxmox_async
::runtime
::block_in_place(|| env
.remove_backup())?
;
317 let response
= Response
::builder()
318 .status(StatusCode
::SWITCHING_PROTOCOLS
)
321 HeaderValue
::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1
!()),
323 .body(Body
::empty())?
;
330 const BACKUP_API_SUBDIRS
: SubdirMap
= &[
331 ("blob", &Router
::new().upload(&API_METHOD_UPLOAD_BLOB
)),
334 &Router
::new().upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK
),
338 &Router
::new().post(&API_METHOD_CLOSE_DYNAMIC_INDEX
),
343 .post(&API_METHOD_CREATE_DYNAMIC_INDEX
)
344 .put(&API_METHOD_DYNAMIC_APPEND
),
348 &Router
::new().post(&ApiMethod
::new(
349 &ApiHandler
::Sync(&finish_backup
),
350 &ObjectSchema
::new("Mark backup as finished.", &[]),
355 &Router
::new().upload(&API_METHOD_UPLOAD_FIXED_CHUNK
),
359 &Router
::new().post(&API_METHOD_CLOSE_FIXED_INDEX
),
364 .post(&API_METHOD_CREATE_FIXED_INDEX
)
365 .put(&API_METHOD_FIXED_APPEND
),
369 &Router
::new().download(&API_METHOD_DOWNLOAD_PREVIOUS
),
372 "previous_backup_time",
373 &Router
::new().get(&API_METHOD_GET_PREVIOUS_BACKUP_TIME
),
377 &Router
::new().upload(&API_METHOD_UPLOAD_SPEEDTEST
),
381 pub const BACKUP_API_ROUTER
: Router
= Router
::new()
382 .get(&list_subdirs_api_method
!(BACKUP_API_SUBDIRS
))
383 .subdirs(BACKUP_API_SUBDIRS
);
386 pub const API_METHOD_CREATE_DYNAMIC_INDEX
: ApiMethod
= ApiMethod
::new(
387 &ApiHandler
::Sync(&create_dynamic_index
),
389 "Create dynamic chunk index file.",
390 &sorted
!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),]),
394 fn create_dynamic_index(
397 rpcenv
: &mut dyn RpcEnvironment
,
398 ) -> Result
<Value
, Error
> {
399 let env
: &BackupEnvironment
= rpcenv
.as_ref();
401 let name
= required_string_param(¶m
, "archive-name")?
.to_owned();
403 let archive_name
= name
.clone();
404 if !archive_name
.ends_with(".didx") {
405 bail
!("wrong archive extension: '{}'", archive_name
);
408 let mut path
= env
.backup_dir
.relative_path();
409 path
.push(archive_name
);
411 let index
= env
.datastore
.create_dynamic_writer(&path
)?
;
412 let wid
= env
.register_dynamic_writer(index
, name
)?
;
414 env
.log(format
!("created new dynamic index {} ({:?})", wid
, path
));
420 pub const API_METHOD_CREATE_FIXED_INDEX
: ApiMethod
= ApiMethod
::new(
421 &ApiHandler
::Sync(&create_fixed_index
),
423 "Create fixed chunk index file.",
425 ("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),
429 &IntegerSchema
::new("File size.").minimum(1).schema()
435 "If set, compare last backup's \
436 csum and reuse index for incremental backup if it matches."
444 fn create_fixed_index(
447 rpcenv
: &mut dyn RpcEnvironment
,
448 ) -> Result
<Value
, Error
> {
449 let env
: &BackupEnvironment
= rpcenv
.as_ref();
451 let name
= required_string_param(¶m
, "archive-name")?
.to_owned();
452 let size
= required_integer_param(¶m
, "size")?
as usize;
453 let reuse_csum
= param
["reuse-csum"].as_str();
455 let archive_name
= name
.clone();
456 if !archive_name
.ends_with(".fidx") {
457 bail
!("wrong archive extension: '{}'", archive_name
);
460 let mut path
= env
.backup_dir
.relative_path();
461 path
.push(&archive_name
);
463 let chunk_size
= 4096 * 1024; // todo: ??
465 // do incremental backup if csum is set
466 let mut reader
= None
;
467 let mut incremental
= false;
468 if let Some(csum
) = reuse_csum
{
470 let last_backup
= match &env
.last_backup
{
473 bail
!("cannot reuse index - no valid previous backup exists");
477 let mut last_path
= last_backup
.backup_dir
.relative_path();
478 last_path
.push(&archive_name
);
480 let index
= match env
.datastore
.open_fixed_reader(last_path
) {
483 bail
!("cannot reuse index - no previous backup exists for archive");
487 let (old_csum
, _
) = index
.compute_csum();
488 let old_csum
= hex
::encode(&old_csum
);
489 if old_csum
!= csum
{
491 "expected csum ({}) doesn't match last backup's ({}), cannot do incremental backup",
497 reader
= Some(index
);
500 let mut writer
= env
.datastore
.create_fixed_writer(&path
, size
, chunk_size
)?
;
502 if let Some(reader
) = reader
{
503 writer
.clone_data_from(&reader
)?
;
506 let wid
= env
.register_fixed_writer(writer
, name
, size
, chunk_size
as u32, incremental
)?
;
508 env
.log(format
!("created new fixed index {} ({:?})", wid
, path
));
514 pub const API_METHOD_DYNAMIC_APPEND
: ApiMethod
= ApiMethod
::new(
515 &ApiHandler
::Sync(&dynamic_append
),
517 "Append chunk to dynamic index writer.",
522 &IntegerSchema
::new("Dynamic writer ID.")
530 &ArraySchema
::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA
).schema()
536 "Chunk offset list.",
537 &IntegerSchema
::new("Corresponding chunk offsets.")
550 rpcenv
: &mut dyn RpcEnvironment
,
551 ) -> Result
<Value
, Error
> {
552 let wid
= required_integer_param(¶m
, "wid")?
as usize;
553 let digest_list
= required_array_param(¶m
, "digest-list")?
;
554 let offset_list
= required_array_param(¶m
, "offset-list")?
;
556 if offset_list
.len() != digest_list
.len() {
558 "offset list has wrong length ({} != {})",
564 let env
: &BackupEnvironment
= rpcenv
.as_ref();
566 env
.debug(format
!("dynamic_append {} chunks", digest_list
.len()));
568 for (i
, item
) in digest_list
.iter().enumerate() {
569 let digest_str
= item
.as_str().unwrap();
570 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
571 let offset
= offset_list
[i
].as_u64().unwrap();
573 .lookup_chunk(&digest
)
574 .ok_or_else(|| format_err
!("no such chunk {}", digest_str
))?
;
576 env
.dynamic_writer_append_chunk(wid
, offset
, size
, &digest
)?
;
579 "successfully added chunk {} to dynamic index {} (offset {}, size {})",
580 digest_str
, wid
, offset
, size
588 pub const API_METHOD_FIXED_APPEND
: ApiMethod
= ApiMethod
::new(
589 &ApiHandler
::Sync(&fixed_append
),
591 "Append chunk to fixed index writer.",
596 &IntegerSchema
::new("Fixed writer ID.")
604 &ArraySchema
::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA
).schema()
610 "Chunk offset list.",
611 &IntegerSchema
::new("Corresponding chunk offsets.")
624 rpcenv
: &mut dyn RpcEnvironment
,
625 ) -> Result
<Value
, Error
> {
626 let wid
= required_integer_param(¶m
, "wid")?
as usize;
627 let digest_list
= required_array_param(¶m
, "digest-list")?
;
628 let offset_list
= required_array_param(¶m
, "offset-list")?
;
630 if offset_list
.len() != digest_list
.len() {
632 "offset list has wrong length ({} != {})",
638 let env
: &BackupEnvironment
= rpcenv
.as_ref();
640 env
.debug(format
!("fixed_append {} chunks", digest_list
.len()));
642 for (i
, item
) in digest_list
.iter().enumerate() {
643 let digest_str
= item
.as_str().unwrap();
644 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
645 let offset
= offset_list
[i
].as_u64().unwrap();
647 .lookup_chunk(&digest
)
648 .ok_or_else(|| format_err
!("no such chunk {}", digest_str
))?
;
650 env
.fixed_writer_append_chunk(wid
, offset
, size
, &digest
)?
;
653 "successfully added chunk {} to fixed index {} (offset {}, size {})",
654 digest_str
, wid
, offset
, size
662 pub const API_METHOD_CLOSE_DYNAMIC_INDEX
: ApiMethod
= ApiMethod
::new(
663 &ApiHandler
::Sync(&close_dynamic_index
),
665 "Close dynamic index writer.",
670 &IntegerSchema
::new("Dynamic writer ID.")
679 "Chunk count. This is used to verify that the server got all chunks."
688 "File size. This is used to verify that the server got all data."
696 &StringSchema
::new("Digest list checksum.").schema()
702 fn close_dynamic_index(
705 rpcenv
: &mut dyn RpcEnvironment
,
706 ) -> Result
<Value
, Error
> {
707 let wid
= required_integer_param(¶m
, "wid")?
as usize;
708 let chunk_count
= required_integer_param(¶m
, "chunk-count")?
as u64;
709 let size
= required_integer_param(¶m
, "size")?
as u64;
710 let csum_str
= required_string_param(¶m
, "csum")?
;
711 let csum
= <[u8; 32]>::from_hex(csum_str
)?
;
713 let env
: &BackupEnvironment
= rpcenv
.as_ref();
715 env
.dynamic_writer_close(wid
, chunk_count
, size
, csum
)?
;
717 env
.log(format
!("successfully closed dynamic index {}", wid
));
723 pub const API_METHOD_CLOSE_FIXED_INDEX
: ApiMethod
= ApiMethod
::new(
724 &ApiHandler
::Sync(&close_fixed_index
),
726 "Close fixed index writer.",
731 &IntegerSchema
::new("Fixed writer ID.")
739 &IntegerSchema
::new("Chunk count. This is used to verify that the server got all chunks. Ignored for incremental backups.")
746 &IntegerSchema
::new("File size. This is used to verify that the server got all data. Ignored for incremental backups.")
750 ("csum", false, &StringSchema
::new("Digest list checksum.").schema()),
755 fn close_fixed_index(
758 rpcenv
: &mut dyn RpcEnvironment
,
759 ) -> Result
<Value
, Error
> {
760 let wid
= required_integer_param(¶m
, "wid")?
as usize;
761 let chunk_count
= required_integer_param(¶m
, "chunk-count")?
as u64;
762 let size
= required_integer_param(¶m
, "size")?
as u64;
763 let csum_str
= required_string_param(¶m
, "csum")?
;
764 let csum
= <[u8; 32]>::from_hex(csum_str
)?
;
766 let env
: &BackupEnvironment
= rpcenv
.as_ref();
768 env
.fixed_writer_close(wid
, chunk_count
, size
, csum
)?
;
770 env
.log(format
!("successfully closed fixed index {}", wid
));
778 rpcenv
: &mut dyn RpcEnvironment
,
779 ) -> Result
<Value
, Error
> {
780 let env
: &BackupEnvironment
= rpcenv
.as_ref();
782 env
.finish_backup()?
;
783 env
.log("successfully finished backup");
789 pub const API_METHOD_GET_PREVIOUS_BACKUP_TIME
: ApiMethod
= ApiMethod
::new(
790 &ApiHandler
::Sync(&get_previous_backup_time
),
791 &ObjectSchema
::new("Get previous backup time.", &[]),
794 fn get_previous_backup_time(
797 rpcenv
: &mut dyn RpcEnvironment
,
798 ) -> Result
<Value
, Error
> {
799 let env
: &BackupEnvironment
= rpcenv
.as_ref();
801 let backup_time
= env
804 .map(|info
| info
.backup_dir
.backup_time());
806 Ok(json
!(backup_time
))
810 pub const API_METHOD_DOWNLOAD_PREVIOUS
: ApiMethod
= ApiMethod
::new(
811 &ApiHandler
::AsyncHttp(&download_previous
),
813 "Download archive from previous backup.",
814 &sorted
!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
)]),
818 fn download_previous(
823 rpcenv
: Box
<dyn RpcEnvironment
>,
824 ) -> ApiResponseFuture
{
826 let env
: &BackupEnvironment
= rpcenv
.as_ref();
828 let archive_name
= required_string_param(¶m
, "archive-name")?
.to_owned();
830 let last_backup
= match &env
.last_backup
{
832 None
=> bail
!("no valid previous backup"),
835 let mut path
= last_backup
.backup_dir
.full_path();
836 path
.push(&archive_name
);
839 let index
: Option
<Box
<dyn IndexFile
>> = match archive_type(&archive_name
)?
{
840 ArchiveType
::FixedIndex
=> {
841 let index
= env
.datastore
.open_fixed_reader(&path
)?
;
842 Some(Box
::new(index
))
844 ArchiveType
::DynamicIndex
=> {
845 let index
= env
.datastore
.open_dynamic_reader(&path
)?
;
846 Some(Box
::new(index
))
850 if let Some(index
) = index
{
852 "register chunks in '{}' from previous backup.",
856 for pos
in 0..index
.index_count() {
857 let info
= index
.chunk_info(pos
).unwrap();
858 let size
= info
.range
.end
- info
.range
.start
;
859 env
.register_chunk(info
.digest
, size
as u32)?
;
864 env
.log(format
!("download '{}' from previous backup.", archive_name
));
865 crate::api2
::helpers
::create_download_response(path
).await