1 //! Backup protocol (HTTP2 upgrade)
3 use anyhow
::{bail, format_err, Error}
;
6 use hyper
::header
::{HeaderValue, UPGRADE}
;
7 use hyper
::http
::request
::Parts
;
8 use hyper
::{Body, Request, Response, StatusCode}
;
9 use serde_json
::{json, Value}
;
11 use proxmox_router
::list_subdirs_api_method
;
13 ApiHandler
, ApiMethod
, ApiResponseFuture
, Permission
, Router
, RpcEnvironment
, SubdirMap
,
15 use proxmox_schema
::*;
16 use proxmox_sys
::sortable
;
19 Authid
, Operation
, SnapshotVerifyState
, VerifyState
, BACKUP_ARCHIVE_NAME_SCHEMA
,
20 BACKUP_ID_SCHEMA
, BACKUP_TIME_SCHEMA
, BACKUP_TYPE_SCHEMA
, CHUNK_DIGEST_SCHEMA
,
21 DATASTORE_SCHEMA
, PRIV_DATASTORE_BACKUP
,
23 use pbs_config
::CachedUserInfo
;
24 use pbs_datastore
::backup_info
::{BackupDir, BackupGroup, BackupInfo}
;
25 use pbs_datastore
::index
::IndexFile
;
26 use pbs_datastore
::manifest
::{archive_type, ArchiveType}
;
27 use pbs_datastore
::{DataStore, PROXMOX_BACKUP_PROTOCOL_ID_V1}
;
28 use pbs_tools
::json
::{required_array_param, required_integer_param, required_string_param}
;
29 use proxmox_rest_server
::{H2Service, WorkerTask}
;
30 use proxmox_sys
::fs
::lock_dir_noblock_shared
;
38 pub const ROUTER
: Router
= Router
::new().upgrade(&API_METHOD_UPGRADE_BACKUP
);
41 pub const API_METHOD_UPGRADE_BACKUP
: ApiMethod
= ApiMethod
::new(
42 &ApiHandler
::AsyncHttp(&upgrade_to_backup_protocol
),
44 concat
!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1
!(), "')."),
46 ("store", false, &DATASTORE_SCHEMA
),
47 ("backup-type", false, &BACKUP_TYPE_SCHEMA
),
48 ("backup-id", false, &BACKUP_ID_SCHEMA
),
49 ("backup-time", false, &BACKUP_TIME_SCHEMA
),
50 ("debug", true, &BooleanSchema
::new("Enable verbose debug logging.").schema()),
51 ("benchmark", true, &BooleanSchema
::new("Job is a benchmark (do not keep data).").schema()),
55 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
56 Some("The user needs Datastore.Backup privilege on /datastore/{store} and needs to own the backup group."),
60 fn upgrade_to_backup_protocol(
65 rpcenv
: Box
<dyn RpcEnvironment
>,
66 ) -> ApiResponseFuture
{
68 let debug
= param
["debug"].as_bool().unwrap_or(false);
69 let benchmark
= param
["benchmark"].as_bool().unwrap_or(false);
71 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
73 let store
= required_string_param(¶m
, "store")?
.to_owned();
75 let user_info
= CachedUserInfo
::new()?
;
76 user_info
.check_privs(
78 &["datastore", &store
],
79 PRIV_DATASTORE_BACKUP
,
83 let datastore
= DataStore
::lookup_datastore(&store
, Some(Operation
::Write
))?
;
85 let backup_type
= required_string_param(¶m
, "backup-type")?
;
86 let backup_id
= required_string_param(¶m
, "backup-id")?
;
87 let backup_time
= required_integer_param(¶m
, "backup-time")?
;
92 .ok_or_else(|| format_err
!("missing Upgrade header"))?
95 if protocols
!= PROXMOX_BACKUP_PROTOCOL_ID_V1
!() {
96 bail
!("invalid protocol name");
99 if parts
.version
>= http
::version
::Version
::HTTP_2
{
101 "unexpected http version '{:?}' (expected version < 2)",
106 let worker_id
= format
!("{}:{}/{}", store
, backup_type
, backup_id
);
108 let env_type
= rpcenv
.env_type();
110 let backup_group
= BackupGroup
::new(backup_type
, backup_id
);
112 let worker_type
= if backup_type
== "host" && backup_id
== "benchmark" {
114 bail
!("unable to run benchmark without --benchmark flags");
119 bail
!("benchmark flags is only allowed on 'host/benchmark'");
124 // lock backup group to only allow one backup per group at a time
125 let (owner
, _group_guard
) =
126 datastore
.create_locked_backup_group(&backup_group
, &auth_id
)?
;
130 owner
== auth_id
|| (owner
.is_token() && Authid
::from(owner
.user().clone()) == auth_id
);
131 if !correct_owner
&& worker_type
!= "benchmark" {
132 // only the owner is allowed to create additional snapshots
133 bail
!("backup owner check failed ({} != {})", auth_id
, owner
);
137 let info
= BackupInfo
::last_backup(&datastore
.base_path(), &backup_group
, true)
139 if let Some(info
) = info
{
140 let (manifest
, _
) = datastore
.load_manifest(&info
.backup_dir
)?
;
141 let verify
= manifest
.unprotected
["verify_state"].clone();
142 match serde_json
::from_value
::<SnapshotVerifyState
>(verify
) {
143 Ok(verify
) => match verify
.state
{
144 VerifyState
::Ok
=> Some(info
),
145 VerifyState
::Failed
=> None
,
148 // no verify state found, treat as valid
157 let backup_dir
= BackupDir
::with_group(backup_group
, backup_time
)?
;
159 let _last_guard
= if let Some(last
) = &last_backup
{
160 if backup_dir
.backup_time() <= last
.backup_dir
.backup_time() {
161 bail
!("backup timestamp is older than last backup.");
164 // lock last snapshot to prevent forgetting/pruning it during backup
165 let full_path
= datastore
.snapshot_path(&last
.backup_dir
);
166 Some(lock_dir_noblock_shared(
169 "base snapshot is already locked by another operation",
175 let (path
, is_new
, snap_guard
) = datastore
.create_locked_backup_dir(&backup_dir
)?
;
177 bail
!("backup directory already exists.");
186 let mut env
= BackupEnvironment
::new(
195 env
.last_backup
= last_backup
;
198 "starting new {} on datastore '{}': {:?}",
199 worker_type
, store
, path
203 H2Service
::new(env
.clone(), worker
.clone(), &BACKUP_API_ROUTER
, debug
);
205 let abort_future
= worker
.abort_future();
207 let env2
= env
.clone();
209 let mut req_fut
= hyper
::upgrade
::on(Request
::from_parts(parts
, req_body
))
210 .map_err(Error
::from
)
211 .and_then(move |conn
| {
212 env2
.debug("protocol upgrade done");
214 let mut http
= hyper
::server
::conn
::Http
::new();
215 http
.http2_only(true);
216 // increase window size: todo - find optiomal size
217 let window_size
= 32 * 1024 * 1024; // max = (1 << 31) - 2
218 http
.http2_initial_stream_window_size(window_size
);
219 http
.http2_initial_connection_window_size(window_size
);
220 http
.http2_max_frame_size(4 * 1024 * 1024);
222 let env3
= env2
.clone();
223 http
.serve_connection(conn
, service
).map(move |result
| {
226 // Avoid Transport endpoint is not connected (os error 107)
227 // fixme: find a better way to test for that error
228 if err
.to_string().starts_with("connection error")
233 Err(Error
::from(err
))
240 let mut abort_future
= abort_future
.map(|_
| Err(format_err
!("task aborted")));
243 // keep flock until task ends
244 let _group_guard
= _group_guard
;
245 let snap_guard
= snap_guard
;
246 let _last_guard
= _last_guard
;
249 req
= req_fut
=> req
,
250 abrt
= abort_future
=> abrt
,
253 env
.log("benchmark finished successfully");
254 proxmox_async
::runtime
::block_in_place(|| env
.remove_backup())?
;
258 let verify
= |env
: BackupEnvironment
| {
259 if let Err(err
) = env
.verify_after_complete(snap_guard
) {
261 "backup finished, but starting the requested verify task failed: {}",
267 match (res
, env
.ensure_finished()) {
269 env
.log("backup finished successfully");
273 (Err(err
), Ok(())) => {
274 // ignore errors after finish
275 env
.log(format
!("backup had errors but finished: {}", err
));
279 (Ok(_
), Err(err
)) => {
280 env
.log(format
!("backup ended and finish failed: {}", err
));
281 env
.log("removing unfinished backup");
282 proxmox_async
::runtime
::block_in_place(|| env
.remove_backup())?
;
285 (Err(err
), Err(_
)) => {
286 env
.log(format
!("backup failed: {}", err
));
287 env
.log("removing failed backup");
288 proxmox_async
::runtime
::block_in_place(|| env
.remove_backup())?
;
296 let response
= Response
::builder()
297 .status(StatusCode
::SWITCHING_PROTOCOLS
)
300 HeaderValue
::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1
!()),
302 .body(Body
::empty())?
;
309 const BACKUP_API_SUBDIRS
: SubdirMap
= &[
310 ("blob", &Router
::new().upload(&API_METHOD_UPLOAD_BLOB
)),
313 &Router
::new().upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK
),
317 &Router
::new().post(&API_METHOD_CLOSE_DYNAMIC_INDEX
),
322 .post(&API_METHOD_CREATE_DYNAMIC_INDEX
)
323 .put(&API_METHOD_DYNAMIC_APPEND
),
327 &Router
::new().post(&ApiMethod
::new(
328 &ApiHandler
::Sync(&finish_backup
),
329 &ObjectSchema
::new("Mark backup as finished.", &[]),
334 &Router
::new().upload(&API_METHOD_UPLOAD_FIXED_CHUNK
),
338 &Router
::new().post(&API_METHOD_CLOSE_FIXED_INDEX
),
343 .post(&API_METHOD_CREATE_FIXED_INDEX
)
344 .put(&API_METHOD_FIXED_APPEND
),
348 &Router
::new().download(&API_METHOD_DOWNLOAD_PREVIOUS
),
351 "previous_backup_time",
352 &Router
::new().get(&API_METHOD_GET_PREVIOUS_BACKUP_TIME
),
356 &Router
::new().upload(&API_METHOD_UPLOAD_SPEEDTEST
),
360 pub const BACKUP_API_ROUTER
: Router
= Router
::new()
361 .get(&list_subdirs_api_method
!(BACKUP_API_SUBDIRS
))
362 .subdirs(BACKUP_API_SUBDIRS
);
365 pub const API_METHOD_CREATE_DYNAMIC_INDEX
: ApiMethod
= ApiMethod
::new(
366 &ApiHandler
::Sync(&create_dynamic_index
),
368 "Create dynamic chunk index file.",
369 &sorted
!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),]),
373 fn create_dynamic_index(
376 rpcenv
: &mut dyn RpcEnvironment
,
377 ) -> Result
<Value
, Error
> {
378 let env
: &BackupEnvironment
= rpcenv
.as_ref();
380 let name
= required_string_param(¶m
, "archive-name")?
.to_owned();
382 let archive_name
= name
.clone();
383 if !archive_name
.ends_with(".didx") {
384 bail
!("wrong archive extension: '{}'", archive_name
);
387 let mut path
= env
.backup_dir
.relative_path();
388 path
.push(archive_name
);
390 let index
= env
.datastore
.create_dynamic_writer(&path
)?
;
391 let wid
= env
.register_dynamic_writer(index
, name
)?
;
393 env
.log(format
!("created new dynamic index {} ({:?})", wid
, path
));
399 pub const API_METHOD_CREATE_FIXED_INDEX
: ApiMethod
= ApiMethod
::new(
400 &ApiHandler
::Sync(&create_fixed_index
),
402 "Create fixed chunk index file.",
404 ("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),
408 &IntegerSchema
::new("File size.").minimum(1).schema()
414 "If set, compare last backup's \
415 csum and reuse index for incremental backup if it matches."
423 fn create_fixed_index(
426 rpcenv
: &mut dyn RpcEnvironment
,
427 ) -> Result
<Value
, Error
> {
428 let env
: &BackupEnvironment
= rpcenv
.as_ref();
430 let name
= required_string_param(¶m
, "archive-name")?
.to_owned();
431 let size
= required_integer_param(¶m
, "size")?
as usize;
432 let reuse_csum
= param
["reuse-csum"].as_str();
434 let archive_name
= name
.clone();
435 if !archive_name
.ends_with(".fidx") {
436 bail
!("wrong archive extension: '{}'", archive_name
);
439 let mut path
= env
.backup_dir
.relative_path();
440 path
.push(&archive_name
);
442 let chunk_size
= 4096 * 1024; // todo: ??
444 // do incremental backup if csum is set
445 let mut reader
= None
;
446 let mut incremental
= false;
447 if let Some(csum
) = reuse_csum
{
449 let last_backup
= match &env
.last_backup
{
452 bail
!("cannot reuse index - no valid previous backup exists");
456 let mut last_path
= last_backup
.backup_dir
.relative_path();
457 last_path
.push(&archive_name
);
459 let index
= match env
.datastore
.open_fixed_reader(last_path
) {
462 bail
!("cannot reuse index - no previous backup exists for archive");
466 let (old_csum
, _
) = index
.compute_csum();
467 let old_csum
= hex
::encode(&old_csum
);
468 if old_csum
!= csum
{
470 "expected csum ({}) doesn't match last backup's ({}), cannot do incremental backup",
476 reader
= Some(index
);
479 let mut writer
= env
.datastore
.create_fixed_writer(&path
, size
, chunk_size
)?
;
481 if let Some(reader
) = reader
{
482 writer
.clone_data_from(&reader
)?
;
485 let wid
= env
.register_fixed_writer(writer
, name
, size
, chunk_size
as u32, incremental
)?
;
487 env
.log(format
!("created new fixed index {} ({:?})", wid
, path
));
493 pub const API_METHOD_DYNAMIC_APPEND
: ApiMethod
= ApiMethod
::new(
494 &ApiHandler
::Sync(&dynamic_append
),
496 "Append chunk to dynamic index writer.",
501 &IntegerSchema
::new("Dynamic writer ID.")
509 &ArraySchema
::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA
).schema()
515 "Chunk offset list.",
516 &IntegerSchema
::new("Corresponding chunk offsets.")
529 rpcenv
: &mut dyn RpcEnvironment
,
530 ) -> Result
<Value
, Error
> {
531 let wid
= required_integer_param(¶m
, "wid")?
as usize;
532 let digest_list
= required_array_param(¶m
, "digest-list")?
;
533 let offset_list
= required_array_param(¶m
, "offset-list")?
;
535 if offset_list
.len() != digest_list
.len() {
537 "offset list has wrong length ({} != {})",
543 let env
: &BackupEnvironment
= rpcenv
.as_ref();
545 env
.debug(format
!("dynamic_append {} chunks", digest_list
.len()));
547 for (i
, item
) in digest_list
.iter().enumerate() {
548 let digest_str
= item
.as_str().unwrap();
549 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
550 let offset
= offset_list
[i
].as_u64().unwrap();
552 .lookup_chunk(&digest
)
553 .ok_or_else(|| format_err
!("no such chunk {}", digest_str
))?
;
555 env
.dynamic_writer_append_chunk(wid
, offset
, size
, &digest
)?
;
558 "successfully added chunk {} to dynamic index {} (offset {}, size {})",
559 digest_str
, wid
, offset
, size
567 pub const API_METHOD_FIXED_APPEND
: ApiMethod
= ApiMethod
::new(
568 &ApiHandler
::Sync(&fixed_append
),
570 "Append chunk to fixed index writer.",
575 &IntegerSchema
::new("Fixed writer ID.")
583 &ArraySchema
::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA
).schema()
589 "Chunk offset list.",
590 &IntegerSchema
::new("Corresponding chunk offsets.")
603 rpcenv
: &mut dyn RpcEnvironment
,
604 ) -> Result
<Value
, Error
> {
605 let wid
= required_integer_param(¶m
, "wid")?
as usize;
606 let digest_list
= required_array_param(¶m
, "digest-list")?
;
607 let offset_list
= required_array_param(¶m
, "offset-list")?
;
609 if offset_list
.len() != digest_list
.len() {
611 "offset list has wrong length ({} != {})",
617 let env
: &BackupEnvironment
= rpcenv
.as_ref();
619 env
.debug(format
!("fixed_append {} chunks", digest_list
.len()));
621 for (i
, item
) in digest_list
.iter().enumerate() {
622 let digest_str
= item
.as_str().unwrap();
623 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
624 let offset
= offset_list
[i
].as_u64().unwrap();
626 .lookup_chunk(&digest
)
627 .ok_or_else(|| format_err
!("no such chunk {}", digest_str
))?
;
629 env
.fixed_writer_append_chunk(wid
, offset
, size
, &digest
)?
;
632 "successfully added chunk {} to fixed index {} (offset {}, size {})",
633 digest_str
, wid
, offset
, size
641 pub const API_METHOD_CLOSE_DYNAMIC_INDEX
: ApiMethod
= ApiMethod
::new(
642 &ApiHandler
::Sync(&close_dynamic_index
),
644 "Close dynamic index writer.",
649 &IntegerSchema
::new("Dynamic writer ID.")
658 "Chunk count. This is used to verify that the server got all chunks."
667 "File size. This is used to verify that the server got all data."
675 &StringSchema
::new("Digest list checksum.").schema()
681 fn close_dynamic_index(
684 rpcenv
: &mut dyn RpcEnvironment
,
685 ) -> Result
<Value
, Error
> {
686 let wid
= required_integer_param(¶m
, "wid")?
as usize;
687 let chunk_count
= required_integer_param(¶m
, "chunk-count")?
as u64;
688 let size
= required_integer_param(¶m
, "size")?
as u64;
689 let csum_str
= required_string_param(¶m
, "csum")?
;
690 let csum
= <[u8; 32]>::from_hex(csum_str
)?
;
692 let env
: &BackupEnvironment
= rpcenv
.as_ref();
694 env
.dynamic_writer_close(wid
, chunk_count
, size
, csum
)?
;
696 env
.log(format
!("successfully closed dynamic index {}", wid
));
702 pub const API_METHOD_CLOSE_FIXED_INDEX
: ApiMethod
= ApiMethod
::new(
703 &ApiHandler
::Sync(&close_fixed_index
),
705 "Close fixed index writer.",
710 &IntegerSchema
::new("Fixed writer ID.")
718 &IntegerSchema
::new("Chunk count. This is used to verify that the server got all chunks. Ignored for incremental backups.")
725 &IntegerSchema
::new("File size. This is used to verify that the server got all data. Ignored for incremental backups.")
729 ("csum", false, &StringSchema
::new("Digest list checksum.").schema()),
734 fn close_fixed_index(
737 rpcenv
: &mut dyn RpcEnvironment
,
738 ) -> Result
<Value
, Error
> {
739 let wid
= required_integer_param(¶m
, "wid")?
as usize;
740 let chunk_count
= required_integer_param(¶m
, "chunk-count")?
as u64;
741 let size
= required_integer_param(¶m
, "size")?
as u64;
742 let csum_str
= required_string_param(¶m
, "csum")?
;
743 let csum
= <[u8; 32]>::from_hex(csum_str
)?
;
745 let env
: &BackupEnvironment
= rpcenv
.as_ref();
747 env
.fixed_writer_close(wid
, chunk_count
, size
, csum
)?
;
749 env
.log(format
!("successfully closed fixed index {}", wid
));
757 rpcenv
: &mut dyn RpcEnvironment
,
758 ) -> Result
<Value
, Error
> {
759 let env
: &BackupEnvironment
= rpcenv
.as_ref();
761 env
.finish_backup()?
;
762 env
.log("successfully finished backup");
768 pub const API_METHOD_GET_PREVIOUS_BACKUP_TIME
: ApiMethod
= ApiMethod
::new(
769 &ApiHandler
::Sync(&get_previous_backup_time
),
770 &ObjectSchema
::new("Get previous backup time.", &[]),
773 fn get_previous_backup_time(
776 rpcenv
: &mut dyn RpcEnvironment
,
777 ) -> Result
<Value
, Error
> {
778 let env
: &BackupEnvironment
= rpcenv
.as_ref();
780 let backup_time
= env
783 .map(|info
| info
.backup_dir
.backup_time());
785 Ok(json
!(backup_time
))
789 pub const API_METHOD_DOWNLOAD_PREVIOUS
: ApiMethod
= ApiMethod
::new(
790 &ApiHandler
::AsyncHttp(&download_previous
),
792 "Download archive from previous backup.",
793 &sorted
!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
)]),
797 fn download_previous(
802 rpcenv
: Box
<dyn RpcEnvironment
>,
803 ) -> ApiResponseFuture
{
805 let env
: &BackupEnvironment
= rpcenv
.as_ref();
807 let archive_name
= required_string_param(¶m
, "archive-name")?
.to_owned();
809 let last_backup
= match &env
.last_backup
{
811 None
=> bail
!("no valid previous backup"),
814 let mut path
= env
.datastore
.snapshot_path(&last_backup
.backup_dir
);
815 path
.push(&archive_name
);
818 let index
: Option
<Box
<dyn IndexFile
>> = match archive_type(&archive_name
)?
{
819 ArchiveType
::FixedIndex
=> {
820 let index
= env
.datastore
.open_fixed_reader(&path
)?
;
821 Some(Box
::new(index
))
823 ArchiveType
::DynamicIndex
=> {
824 let index
= env
.datastore
.open_dynamic_reader(&path
)?
;
825 Some(Box
::new(index
))
829 if let Some(index
) = index
{
831 "register chunks in '{}' from previous backup.",
835 for pos
in 0..index
.index_count() {
836 let info
= index
.chunk_info(pos
).unwrap();
837 let size
= info
.range
.end
- info
.range
.start
;
838 env
.register_chunk(info
.digest
, size
as u32)?
;
843 env
.log(format
!("download '{}' from previous backup.", archive_name
));
844 crate::api2
::helpers
::create_download_response(path
).await