1 //! Backup protocol (HTTP2 upgrade)
3 use anyhow
::{bail, format_err, Error}
;
6 use hyper
::header
::{HeaderValue, UPGRADE}
;
7 use hyper
::http
::request
::Parts
;
8 use hyper
::{Body, Request, Response, StatusCode}
;
9 use serde
::Deserialize
;
10 use serde_json
::{json, Value}
;
12 use proxmox_router
::list_subdirs_api_method
;
14 ApiHandler
, ApiMethod
, ApiResponseFuture
, Permission
, Router
, RpcEnvironment
, SubdirMap
,
16 use proxmox_schema
::*;
17 use proxmox_sys
::sortable
;
20 Authid
, BackupNamespace
, BackupType
, Operation
, SnapshotVerifyState
, VerifyState
,
21 BACKUP_ARCHIVE_NAME_SCHEMA
, BACKUP_ID_SCHEMA
, BACKUP_NAMESPACE_SCHEMA
, BACKUP_TIME_SCHEMA
,
22 BACKUP_TYPE_SCHEMA
, CHUNK_DIGEST_SCHEMA
, DATASTORE_SCHEMA
, PRIV_DATASTORE_BACKUP
,
24 use pbs_config
::CachedUserInfo
;
25 use pbs_datastore
::index
::IndexFile
;
26 use pbs_datastore
::manifest
::{archive_type, ArchiveType}
;
27 use pbs_datastore
::{DataStore, PROXMOX_BACKUP_PROTOCOL_ID_V1}
;
28 use pbs_tools
::json
::{required_array_param, required_integer_param, required_string_param}
;
29 use proxmox_rest_server
::{H2Service, WorkerTask}
;
30 use proxmox_sys
::fs
::lock_dir_noblock_shared
;
38 pub const ROUTER
: Router
= Router
::new().upgrade(&API_METHOD_UPGRADE_BACKUP
);
41 pub const API_METHOD_UPGRADE_BACKUP
: ApiMethod
= ApiMethod
::new(
42 &ApiHandler
::AsyncHttp(&upgrade_to_backup_protocol
),
44 concat
!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1
!(), "')."),
46 ("store", false, &DATASTORE_SCHEMA
),
47 ("ns", true, &BACKUP_NAMESPACE_SCHEMA
),
48 ("backup-type", false, &BACKUP_TYPE_SCHEMA
),
49 ("backup-id", false, &BACKUP_ID_SCHEMA
),
50 ("backup-time", false, &BACKUP_TIME_SCHEMA
),
51 ("debug", true, &BooleanSchema
::new("Enable verbose debug logging.").schema()),
52 ("benchmark", true, &BooleanSchema
::new("Job is a benchmark (do not keep data).").schema()),
56 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
57 Some("Requires on /datastore/{store}[/{namespace}] DATASTORE_BACKUP and being the owner of the group"),
61 pub(crate) fn optional_ns_param(param
: &Value
) -> Result
<BackupNamespace
, Error
> {
62 match param
.get("ns") {
63 Some(Value
::String(ns
)) => ns
.parse(),
64 None
=> Ok(BackupNamespace
::root()),
65 _
=> bail
!("invalid ns parameter"),
69 fn upgrade_to_backup_protocol(
74 rpcenv
: Box
<dyn RpcEnvironment
>,
75 ) -> ApiResponseFuture
{
77 let debug
= param
["debug"].as_bool().unwrap_or(false);
78 let benchmark
= param
["benchmark"].as_bool().unwrap_or(false);
80 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
82 let store
= required_string_param(¶m
, "store")?
.to_owned();
83 let backup_ns
= optional_ns_param(¶m
)?
;
84 let backup_dir_arg
= pbs_api_types
::BackupDir
::deserialize(¶m
)?
;
86 let user_info
= CachedUserInfo
::new()?
;
88 let privs
= if backup_ns
.is_root() {
89 user_info
.lookup_privs(&auth_id
, &["datastore", &store
])
91 user_info
.lookup_privs(&auth_id
, &["datastore", &store
, &backup_ns
.to_string()])
93 if privs
& PRIV_DATASTORE_BACKUP
== 0 {
94 proxmox_router
::http_bail
!(FORBIDDEN
, "permission check failed");
97 let datastore
= DataStore
::lookup_datastore(&store
, Some(Operation
::Write
))?
;
102 .ok_or_else(|| format_err
!("missing Upgrade header"))?
105 if protocols
!= PROXMOX_BACKUP_PROTOCOL_ID_V1
!() {
106 bail
!("invalid protocol name");
109 if parts
.version
>= http
::version
::Version
::HTTP_2
{
111 "unexpected http version '{:?}' (expected version < 2)",
116 if !datastore
.namespace_path(&backup_ns
).exists() {
117 proxmox_router
::http_bail
!(NOT_FOUND
, "namespace not found");
120 // FIXME: include namespace here?
121 let worker_id
= format
!("{}:{}/{}", store
, backup_dir_arg
.ty(), backup_dir_arg
.id());
123 let env_type
= rpcenv
.env_type();
125 let backup_group
= datastore
.backup_group(backup_ns
, backup_dir_arg
.group
.clone());
127 let worker_type
= if backup_group
.backup_type() == BackupType
::Host
128 && backup_group
.backup_id() == "benchmark"
131 bail
!("unable to run benchmark without --benchmark flags");
136 bail
!("benchmark flags is only allowed on 'host/benchmark'");
141 // lock backup group to only allow one backup per group at a time
142 let (owner
, _group_guard
) = datastore
.create_locked_backup_group(
143 backup_group
.backup_ns(),
144 backup_group
.as_ref(),
150 owner
== auth_id
|| (owner
.is_token() && Authid
::from(owner
.user().clone()) == auth_id
);
151 if !correct_owner
&& worker_type
!= "benchmark" {
152 // only the owner is allowed to create additional snapshots
153 bail
!("backup owner check failed ({} != {})", auth_id
, owner
);
157 let info
= backup_group
.last_backup(true).unwrap_or(None
);
158 if let Some(info
) = info
{
159 let (manifest
, _
) = info
.backup_dir
.load_manifest()?
;
160 let verify
= manifest
.unprotected
["verify_state"].clone();
161 match serde_json
::from_value
::<SnapshotVerifyState
>(verify
) {
162 Ok(verify
) => match verify
.state
{
163 VerifyState
::Ok
=> Some(info
),
164 VerifyState
::Failed
=> None
,
167 // no verify state found, treat as valid
176 let backup_dir
= backup_group
.backup_dir(backup_dir_arg
.time
)?
;
178 let _last_guard
= if let Some(last
) = &last_backup
{
179 if backup_dir
.backup_time() <= last
.backup_dir
.backup_time() {
180 bail
!("backup timestamp is older than last backup.");
183 // lock last snapshot to prevent forgetting/pruning it during backup
184 let full_path
= last
.backup_dir
.full_path();
185 Some(lock_dir_noblock_shared(
188 "base snapshot is already locked by another operation",
194 let (path
, is_new
, snap_guard
) =
195 datastore
.create_locked_backup_dir(backup_dir
.backup_ns(), backup_dir
.as_ref())?
;
197 bail
!("backup directory already exists.");
206 let mut env
= BackupEnvironment
::new(
215 env
.last_backup
= last_backup
;
218 "starting new {} on datastore '{}': {:?}",
219 worker_type
, store
, path
223 H2Service
::new(env
.clone(), worker
.clone(), &BACKUP_API_ROUTER
, debug
);
225 let abort_future
= worker
.abort_future();
227 let env2
= env
.clone();
229 let mut req_fut
= hyper
::upgrade
::on(Request
::from_parts(parts
, req_body
))
230 .map_err(Error
::from
)
231 .and_then(move |conn
| {
232 env2
.debug("protocol upgrade done");
234 let mut http
= hyper
::server
::conn
::Http
::new();
235 http
.http2_only(true);
236 // increase window size: todo - find optiomal size
237 let window_size
= 32 * 1024 * 1024; // max = (1 << 31) - 2
238 http
.http2_initial_stream_window_size(window_size
);
239 http
.http2_initial_connection_window_size(window_size
);
240 http
.http2_max_frame_size(4 * 1024 * 1024);
242 let env3
= env2
.clone();
243 http
.serve_connection(conn
, service
).map(move |result
| {
246 // Avoid Transport endpoint is not connected (os error 107)
247 // fixme: find a better way to test for that error
248 if err
.to_string().starts_with("connection error")
253 Err(Error
::from(err
))
260 let mut abort_future
= abort_future
.map(|_
| Err(format_err
!("task aborted")));
263 // keep flock until task ends
264 let _group_guard
= _group_guard
;
265 let snap_guard
= snap_guard
;
266 let _last_guard
= _last_guard
;
269 req
= req_fut
=> req
,
270 abrt
= abort_future
=> abrt
,
273 env
.log("benchmark finished successfully");
274 proxmox_async
::runtime
::block_in_place(|| env
.remove_backup())?
;
278 let verify
= |env
: BackupEnvironment
| {
279 if let Err(err
) = env
.verify_after_complete(snap_guard
) {
281 "backup finished, but starting the requested verify task failed: {}",
287 match (res
, env
.ensure_finished()) {
289 env
.log("backup finished successfully");
293 (Err(err
), Ok(())) => {
294 // ignore errors after finish
295 env
.log(format
!("backup had errors but finished: {}", err
));
299 (Ok(_
), Err(err
)) => {
300 env
.log(format
!("backup ended and finish failed: {}", err
));
301 env
.log("removing unfinished backup");
302 proxmox_async
::runtime
::block_in_place(|| env
.remove_backup())?
;
305 (Err(err
), Err(_
)) => {
306 env
.log(format
!("backup failed: {}", err
));
307 env
.log("removing failed backup");
308 proxmox_async
::runtime
::block_in_place(|| env
.remove_backup())?
;
316 let response
= Response
::builder()
317 .status(StatusCode
::SWITCHING_PROTOCOLS
)
320 HeaderValue
::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1
!()),
322 .body(Body
::empty())?
;
329 const BACKUP_API_SUBDIRS
: SubdirMap
= &[
330 ("blob", &Router
::new().upload(&API_METHOD_UPLOAD_BLOB
)),
333 &Router
::new().upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK
),
337 &Router
::new().post(&API_METHOD_CLOSE_DYNAMIC_INDEX
),
342 .post(&API_METHOD_CREATE_DYNAMIC_INDEX
)
343 .put(&API_METHOD_DYNAMIC_APPEND
),
347 &Router
::new().post(&ApiMethod
::new(
348 &ApiHandler
::Sync(&finish_backup
),
349 &ObjectSchema
::new("Mark backup as finished.", &[]),
354 &Router
::new().upload(&API_METHOD_UPLOAD_FIXED_CHUNK
),
358 &Router
::new().post(&API_METHOD_CLOSE_FIXED_INDEX
),
363 .post(&API_METHOD_CREATE_FIXED_INDEX
)
364 .put(&API_METHOD_FIXED_APPEND
),
368 &Router
::new().download(&API_METHOD_DOWNLOAD_PREVIOUS
),
371 "previous_backup_time",
372 &Router
::new().get(&API_METHOD_GET_PREVIOUS_BACKUP_TIME
),
376 &Router
::new().upload(&API_METHOD_UPLOAD_SPEEDTEST
),
380 pub const BACKUP_API_ROUTER
: Router
= Router
::new()
381 .get(&list_subdirs_api_method
!(BACKUP_API_SUBDIRS
))
382 .subdirs(BACKUP_API_SUBDIRS
);
385 pub const API_METHOD_CREATE_DYNAMIC_INDEX
: ApiMethod
= ApiMethod
::new(
386 &ApiHandler
::Sync(&create_dynamic_index
),
388 "Create dynamic chunk index file.",
389 &sorted
!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),]),
393 fn create_dynamic_index(
396 rpcenv
: &mut dyn RpcEnvironment
,
397 ) -> Result
<Value
, Error
> {
398 let env
: &BackupEnvironment
= rpcenv
.as_ref();
400 let name
= required_string_param(¶m
, "archive-name")?
.to_owned();
402 let archive_name
= name
.clone();
403 if !archive_name
.ends_with(".didx") {
404 bail
!("wrong archive extension: '{}'", archive_name
);
407 let mut path
= env
.backup_dir
.relative_path();
408 path
.push(archive_name
);
410 let index
= env
.datastore
.create_dynamic_writer(&path
)?
;
411 let wid
= env
.register_dynamic_writer(index
, name
)?
;
413 env
.log(format
!("created new dynamic index {} ({:?})", wid
, path
));
419 pub const API_METHOD_CREATE_FIXED_INDEX
: ApiMethod
= ApiMethod
::new(
420 &ApiHandler
::Sync(&create_fixed_index
),
422 "Create fixed chunk index file.",
424 ("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),
428 &IntegerSchema
::new("File size.").minimum(1).schema()
434 "If set, compare last backup's \
435 csum and reuse index for incremental backup if it matches."
443 fn create_fixed_index(
446 rpcenv
: &mut dyn RpcEnvironment
,
447 ) -> Result
<Value
, Error
> {
448 let env
: &BackupEnvironment
= rpcenv
.as_ref();
450 let name
= required_string_param(¶m
, "archive-name")?
.to_owned();
451 let size
= required_integer_param(¶m
, "size")?
as usize;
452 let reuse_csum
= param
["reuse-csum"].as_str();
454 let archive_name
= name
.clone();
455 if !archive_name
.ends_with(".fidx") {
456 bail
!("wrong archive extension: '{}'", archive_name
);
459 let mut path
= env
.backup_dir
.relative_path();
460 path
.push(&archive_name
);
462 let chunk_size
= 4096 * 1024; // todo: ??
464 // do incremental backup if csum is set
465 let mut reader
= None
;
466 let mut incremental
= false;
467 if let Some(csum
) = reuse_csum
{
469 let last_backup
= match &env
.last_backup
{
472 bail
!("cannot reuse index - no valid previous backup exists");
476 let mut last_path
= last_backup
.backup_dir
.relative_path();
477 last_path
.push(&archive_name
);
479 let index
= match env
.datastore
.open_fixed_reader(last_path
) {
482 bail
!("cannot reuse index - no previous backup exists for archive");
486 let (old_csum
, _
) = index
.compute_csum();
487 let old_csum
= hex
::encode(&old_csum
);
488 if old_csum
!= csum
{
490 "expected csum ({}) doesn't match last backup's ({}), cannot do incremental backup",
496 reader
= Some(index
);
499 let mut writer
= env
.datastore
.create_fixed_writer(&path
, size
, chunk_size
)?
;
501 if let Some(reader
) = reader
{
502 writer
.clone_data_from(&reader
)?
;
505 let wid
= env
.register_fixed_writer(writer
, name
, size
, chunk_size
as u32, incremental
)?
;
507 env
.log(format
!("created new fixed index {} ({:?})", wid
, path
));
513 pub const API_METHOD_DYNAMIC_APPEND
: ApiMethod
= ApiMethod
::new(
514 &ApiHandler
::Sync(&dynamic_append
),
516 "Append chunk to dynamic index writer.",
521 &IntegerSchema
::new("Dynamic writer ID.")
529 &ArraySchema
::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA
).schema()
535 "Chunk offset list.",
536 &IntegerSchema
::new("Corresponding chunk offsets.")
549 rpcenv
: &mut dyn RpcEnvironment
,
550 ) -> Result
<Value
, Error
> {
551 let wid
= required_integer_param(¶m
, "wid")?
as usize;
552 let digest_list
= required_array_param(¶m
, "digest-list")?
;
553 let offset_list
= required_array_param(¶m
, "offset-list")?
;
555 if offset_list
.len() != digest_list
.len() {
557 "offset list has wrong length ({} != {})",
563 let env
: &BackupEnvironment
= rpcenv
.as_ref();
565 env
.debug(format
!("dynamic_append {} chunks", digest_list
.len()));
567 for (i
, item
) in digest_list
.iter().enumerate() {
568 let digest_str
= item
.as_str().unwrap();
569 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
570 let offset
= offset_list
[i
].as_u64().unwrap();
572 .lookup_chunk(&digest
)
573 .ok_or_else(|| format_err
!("no such chunk {}", digest_str
))?
;
575 env
.dynamic_writer_append_chunk(wid
, offset
, size
, &digest
)?
;
578 "successfully added chunk {} to dynamic index {} (offset {}, size {})",
579 digest_str
, wid
, offset
, size
587 pub const API_METHOD_FIXED_APPEND
: ApiMethod
= ApiMethod
::new(
588 &ApiHandler
::Sync(&fixed_append
),
590 "Append chunk to fixed index writer.",
595 &IntegerSchema
::new("Fixed writer ID.")
603 &ArraySchema
::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA
).schema()
609 "Chunk offset list.",
610 &IntegerSchema
::new("Corresponding chunk offsets.")
623 rpcenv
: &mut dyn RpcEnvironment
,
624 ) -> Result
<Value
, Error
> {
625 let wid
= required_integer_param(¶m
, "wid")?
as usize;
626 let digest_list
= required_array_param(¶m
, "digest-list")?
;
627 let offset_list
= required_array_param(¶m
, "offset-list")?
;
629 if offset_list
.len() != digest_list
.len() {
631 "offset list has wrong length ({} != {})",
637 let env
: &BackupEnvironment
= rpcenv
.as_ref();
639 env
.debug(format
!("fixed_append {} chunks", digest_list
.len()));
641 for (i
, item
) in digest_list
.iter().enumerate() {
642 let digest_str
= item
.as_str().unwrap();
643 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
644 let offset
= offset_list
[i
].as_u64().unwrap();
646 .lookup_chunk(&digest
)
647 .ok_or_else(|| format_err
!("no such chunk {}", digest_str
))?
;
649 env
.fixed_writer_append_chunk(wid
, offset
, size
, &digest
)?
;
652 "successfully added chunk {} to fixed index {} (offset {}, size {})",
653 digest_str
, wid
, offset
, size
661 pub const API_METHOD_CLOSE_DYNAMIC_INDEX
: ApiMethod
= ApiMethod
::new(
662 &ApiHandler
::Sync(&close_dynamic_index
),
664 "Close dynamic index writer.",
669 &IntegerSchema
::new("Dynamic writer ID.")
678 "Chunk count. This is used to verify that the server got all chunks."
687 "File size. This is used to verify that the server got all data."
695 &StringSchema
::new("Digest list checksum.").schema()
701 fn close_dynamic_index(
704 rpcenv
: &mut dyn RpcEnvironment
,
705 ) -> Result
<Value
, Error
> {
706 let wid
= required_integer_param(¶m
, "wid")?
as usize;
707 let chunk_count
= required_integer_param(¶m
, "chunk-count")?
as u64;
708 let size
= required_integer_param(¶m
, "size")?
as u64;
709 let csum_str
= required_string_param(¶m
, "csum")?
;
710 let csum
= <[u8; 32]>::from_hex(csum_str
)?
;
712 let env
: &BackupEnvironment
= rpcenv
.as_ref();
714 env
.dynamic_writer_close(wid
, chunk_count
, size
, csum
)?
;
716 env
.log(format
!("successfully closed dynamic index {}", wid
));
722 pub const API_METHOD_CLOSE_FIXED_INDEX
: ApiMethod
= ApiMethod
::new(
723 &ApiHandler
::Sync(&close_fixed_index
),
725 "Close fixed index writer.",
730 &IntegerSchema
::new("Fixed writer ID.")
738 &IntegerSchema
::new("Chunk count. This is used to verify that the server got all chunks. Ignored for incremental backups.")
745 &IntegerSchema
::new("File size. This is used to verify that the server got all data. Ignored for incremental backups.")
749 ("csum", false, &StringSchema
::new("Digest list checksum.").schema()),
754 fn close_fixed_index(
757 rpcenv
: &mut dyn RpcEnvironment
,
758 ) -> Result
<Value
, Error
> {
759 let wid
= required_integer_param(¶m
, "wid")?
as usize;
760 let chunk_count
= required_integer_param(¶m
, "chunk-count")?
as u64;
761 let size
= required_integer_param(¶m
, "size")?
as u64;
762 let csum_str
= required_string_param(¶m
, "csum")?
;
763 let csum
= <[u8; 32]>::from_hex(csum_str
)?
;
765 let env
: &BackupEnvironment
= rpcenv
.as_ref();
767 env
.fixed_writer_close(wid
, chunk_count
, size
, csum
)?
;
769 env
.log(format
!("successfully closed fixed index {}", wid
));
777 rpcenv
: &mut dyn RpcEnvironment
,
778 ) -> Result
<Value
, Error
> {
779 let env
: &BackupEnvironment
= rpcenv
.as_ref();
781 env
.finish_backup()?
;
782 env
.log("successfully finished backup");
788 pub const API_METHOD_GET_PREVIOUS_BACKUP_TIME
: ApiMethod
= ApiMethod
::new(
789 &ApiHandler
::Sync(&get_previous_backup_time
),
790 &ObjectSchema
::new("Get previous backup time.", &[]),
793 fn get_previous_backup_time(
796 rpcenv
: &mut dyn RpcEnvironment
,
797 ) -> Result
<Value
, Error
> {
798 let env
: &BackupEnvironment
= rpcenv
.as_ref();
800 let backup_time
= env
803 .map(|info
| info
.backup_dir
.backup_time());
805 Ok(json
!(backup_time
))
809 pub const API_METHOD_DOWNLOAD_PREVIOUS
: ApiMethod
= ApiMethod
::new(
810 &ApiHandler
::AsyncHttp(&download_previous
),
812 "Download archive from previous backup.",
813 &sorted
!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
)]),
817 fn download_previous(
822 rpcenv
: Box
<dyn RpcEnvironment
>,
823 ) -> ApiResponseFuture
{
825 let env
: &BackupEnvironment
= rpcenv
.as_ref();
827 let archive_name
= required_string_param(¶m
, "archive-name")?
.to_owned();
829 let last_backup
= match &env
.last_backup
{
831 None
=> bail
!("no valid previous backup"),
834 let mut path
= last_backup
.backup_dir
.full_path();
835 path
.push(&archive_name
);
838 let index
: Option
<Box
<dyn IndexFile
>> = match archive_type(&archive_name
)?
{
839 ArchiveType
::FixedIndex
=> {
840 let index
= env
.datastore
.open_fixed_reader(&path
)?
;
841 Some(Box
::new(index
))
843 ArchiveType
::DynamicIndex
=> {
844 let index
= env
.datastore
.open_dynamic_reader(&path
)?
;
845 Some(Box
::new(index
))
849 if let Some(index
) = index
{
851 "register chunks in '{}' from previous backup.",
855 for pos
in 0..index
.index_count() {
856 let info
= index
.chunk_info(pos
).unwrap();
857 let size
= info
.range
.end
- info
.range
.start
;
858 env
.register_chunk(info
.digest
, size
as u32)?
;
863 env
.log(format
!("download '{}' from previous backup.", archive_name
));
864 crate::api2
::helpers
::create_download_response(path
).await