6 use hyper
::header
::{HeaderValue, UPGRADE}
;
7 use hyper
::{Body, Response, StatusCode}
;
8 use hyper
::http
::request
::Parts
;
9 use chrono
::{Local, TimeZone}
;
11 use serde_json
::{json, Value}
;
14 use crate::tools
::wrapped_reader_stream
::*;
15 use crate::api_schema
::router
::*;
16 use crate::api_schema
::*;
17 use crate::server
::WorkerTask
;
29 pub fn router() -> Router
{
31 .upgrade(api_method_upgrade_backup())
34 pub fn api_method_upgrade_backup() -> ApiAsyncMethod
{
36 upgrade_to_backup_protocol
,
37 ObjectSchema
::new("Upgraded to backup protocol.")
38 .required("store", StringSchema
::new("Datastore name."))
39 .required("backup-type", StringSchema
::new("Backup type.")
40 .format(Arc
::new(ApiStringFormat
::Enum(&["vm", "ct", "host"]))))
41 .required("backup-id", StringSchema
::new("Backup ID."))
42 .optional("debug", BooleanSchema
::new("Enable verbose debug logging."))
46 fn upgrade_to_backup_protocol(
50 _info
: &ApiAsyncMethod
,
51 rpcenv
: Box
<RpcEnvironment
>,
52 ) -> Result
<BoxFut
, Error
> {
54 static PROXMOX_BACKUP_PROTOCOL_ID
: &str = "proxmox-backup-protocol-h2";
56 let debug
= param
["debug"].as_bool().unwrap_or(false);
58 let store
= tools
::required_string_param(¶m
, "store")?
.to_owned();
59 let datastore
= DataStore
::lookup_datastore(&store
)?
;
61 let backup_type
= tools
::required_string_param(¶m
, "backup-type")?
;
62 let backup_id
= tools
::required_string_param(¶m
, "backup-id")?
;
63 let backup_time
= Local
.timestamp(Local
::now().timestamp(), 0);
68 .ok_or_else(|| format_err
!("missing Upgrade header"))?
71 if protocols
!= PROXMOX_BACKUP_PROTOCOL_ID
{
72 bail
!("invalid protocol name");
75 if parts
.version
>= http
::version
::Version
::HTTP_2
{
76 bail
!("unexpected http version '{:?}' (expected version < 2)", parts
.version
);
79 let worker_id
= format
!("{}_{}_{}", store
, backup_type
, backup_id
);
81 let username
= rpcenv
.get_user().unwrap();
82 let env_type
= rpcenv
.env_type();
84 let backup_group
= BackupGroup
::new(backup_type
, backup_id
);
85 let last_backup
= BackupInfo
::last_backup(&datastore
.base_path(), &backup_group
).unwrap_or(None
);
86 let backup_dir
= BackupDir
::new_with_group(backup_group
, backup_time
.timestamp());
88 let (path
, is_new
) = datastore
.create_backup_dir(&backup_dir
)?
;
89 if !is_new { bail!("backup directorty already exists."); }
91 WorkerTask
::spawn("backup", Some(worker_id
), &username
.clone(), true, move |worker
| {
92 let mut env
= BackupEnvironment
::new(
93 env_type
, username
.clone(), worker
.clone(), datastore
, backup_dir
);
96 env
.last_backup
= last_backup
;
98 env
.log(format
!("starting new backup on datastore '{}': {:?}", store
, path
));
100 let service
= BackupService
::new(env
.clone(), worker
.clone(), debug
);
102 let abort_future
= worker
.abort_future();
104 let env2
= env
.clone();
105 let env3
= env
.clone();
109 .map_err(Error
::from
)
110 .and_then(move |conn
| {
111 env3
.debug("protocol upgrade done");
113 let mut http
= hyper
::server
::conn
::Http
::new();
114 http
.http2_only(true);
115 // increase window size: todo - find optiomal size
116 let window_size
= 32*1024*1024; // max = (1 << 31) - 2
117 http
.http2_initial_stream_window_size(window_size
);
118 http
.http2_initial_connection_window_size(window_size
);
120 http
.serve_connection(conn
, service
)
121 .map_err(Error
::from
)
123 .select(abort_future
.map_err(|_
| {}
).then(move |_
| { bail!("task aborted"); }
))
124 .map_err(|(err
, _
)| err
)
125 .and_then(move |(_result
, _
)| {
126 env
.ensure_finished()?
;
127 env
.log("backup finished sucessfully");
130 .then(move |result
| {
131 if let Err(err
) = result
{
132 match env2
.ensure_finished() {
133 Ok(()) => {}
, // ignore error after finish
135 env2
.log(format
!("backup failed: {}", err
));
136 env2
.log("removing failed backup");
137 env2
.remove_backup()?
;
146 let response
= Response
::builder()
147 .status(StatusCode
::SWITCHING_PROTOCOLS
)
148 .header(UPGRADE
, HeaderValue
::from_static(PROXMOX_BACKUP_PROTOCOL_ID
))
149 .body(Body
::empty())?
;
151 Ok(Box
::new(futures
::future
::ok(response
)))
154 pub fn backup_api() -> Router
{
156 let router
= Router
::new()
158 "config", Router
::new()
159 .upload(api_method_upload_config())
162 "dynamic_chunk", Router
::new()
163 .upload(api_method_upload_dynamic_chunk())
166 "dynamic_index", Router
::new()
167 .download(api_method_dynamic_chunk_index())
168 .post(api_method_create_dynamic_index())
169 .put(api_method_dynamic_append())
172 "dynamic_close", Router
::new()
173 .post(api_method_close_dynamic_index())
176 "fixed_chunk", Router
::new()
177 .upload(api_method_upload_fixed_chunk())
180 "fixed_index", Router
::new()
181 .download(api_method_fixed_chunk_index())
182 .post(api_method_create_fixed_index())
183 .put(api_method_fixed_append())
186 "fixed_close", Router
::new()
187 .post(api_method_close_fixed_index())
190 "finish", Router
::new()
194 ObjectSchema
::new("Mark backup as finished.")
199 "speedtest", Router
::new()
200 .upload(api_method_upload_speedtest())
207 pub fn api_method_create_dynamic_index() -> ApiMethod
{
209 create_dynamic_index
,
210 ObjectSchema
::new("Create dynamic chunk index file.")
211 .required("archive-name", crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
.clone())
215 fn create_dynamic_index(
218 rpcenv
: &mut RpcEnvironment
,
219 ) -> Result
<Value
, Error
> {
221 let env
: &BackupEnvironment
= rpcenv
.as_ref();
223 let name
= tools
::required_string_param(¶m
, "archive-name")?
.to_owned();
225 let mut archive_name
= name
.clone();
226 if !archive_name
.ends_with(".pxar") {
227 bail
!("wrong archive extension: '{}'", archive_name
);
229 archive_name
.push_str(".didx");
232 let mut path
= env
.backup_dir
.relative_path();
233 path
.push(archive_name
);
235 let index
= env
.datastore
.create_dynamic_writer(&path
)?
;
236 let wid
= env
.register_dynamic_writer(index
, name
)?
;
238 env
.log(format
!("created new dynamic index {} ({:?})", wid
, path
));
243 pub fn api_method_create_fixed_index() -> ApiMethod
{
246 ObjectSchema
::new("Create fixed chunk index file.")
247 .required("archive-name", crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
.clone())
248 .required("size", IntegerSchema
::new("File size.")
254 fn create_fixed_index(
257 rpcenv
: &mut RpcEnvironment
,
258 ) -> Result
<Value
, Error
> {
260 let env
: &BackupEnvironment
= rpcenv
.as_ref();
262 println
!("PARAM: {:?}", param
);
264 let name
= tools
::required_string_param(¶m
, "archive-name")?
.to_owned();
265 let size
= tools
::required_integer_param(¶m
, "size")?
as usize;
267 let mut archive_name
= name
.clone();
268 if !archive_name
.ends_with(".img") {
269 bail
!("wrong archive extension: '{}'", archive_name
);
271 archive_name
.push_str(".fidx");
274 let mut path
= env
.backup_dir
.relative_path();
275 path
.push(archive_name
);
277 let chunk_size
= 4096*1024; // todo: ??
279 let index
= env
.datastore
.create_fixed_writer(&path
, size
, chunk_size
)?
;
280 let wid
= env
.register_fixed_writer(index
, name
, size
, chunk_size
as u32)?
;
282 env
.log(format
!("created new fixed index {} ({:?})", wid
, path
));
287 pub fn api_method_dynamic_append() -> ApiMethod
{
290 ObjectSchema
::new("Append chunk to dynamic index writer.")
291 .required("wid", IntegerSchema
::new("Dynamic writer ID.")
295 .required("digest-list", ArraySchema
::new(
296 "Chunk digest list.",
297 StringSchema
::new("Chunk digest.").into())
299 .required("offset-list", ArraySchema
::new(
300 "Chunk offset list.",
301 IntegerSchema
::new("Corresponding chunk offsets.")
311 rpcenv
: &mut RpcEnvironment
,
312 ) -> Result
<Value
, Error
> {
314 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
315 let digest_list
= tools
::required_array_param(¶m
, "digest-list")?
;
316 let offset_list
= tools
::required_array_param(¶m
, "offset-list")?
;
318 if offset_list
.len() != digest_list
.len() {
319 bail
!("offset list has wrong length ({} != {})", offset_list
.len(), digest_list
.len());
322 let env
: &BackupEnvironment
= rpcenv
.as_ref();
324 env
.debug(format
!("dynamic_append {} chunks", digest_list
.len()));
326 for (i
, item
) in digest_list
.iter().enumerate() {
327 let digest_str
= item
.as_str().unwrap();
328 let digest
= crate::tools
::hex_to_digest(digest_str
)?
;
329 let offset
= offset_list
[i
].as_u64().unwrap();
330 let size
= env
.lookup_chunk(&digest
).ok_or_else(|| format_err
!("no such chunk {}", digest_str
))?
;
332 env
.dynamic_writer_append_chunk(wid
, offset
, size
, &digest
)?
;
334 env
.debug(format
!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str
, wid
, offset
, size
));
340 pub fn api_method_fixed_append() -> ApiMethod
{
343 ObjectSchema
::new("Append chunk to fixed index writer.")
344 .required("wid", IntegerSchema
::new("Fixed writer ID.")
348 .required("digest-list", ArraySchema
::new(
349 "Chunk digest list.",
350 StringSchema
::new("Chunk digest.").into())
352 .required("offset-list", ArraySchema
::new(
353 "Chunk offset list.",
354 IntegerSchema
::new("Corresponding chunk offsets.")
364 rpcenv
: &mut RpcEnvironment
,
365 ) -> Result
<Value
, Error
> {
367 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
368 let digest_list
= tools
::required_array_param(¶m
, "digest-list")?
;
369 let offset_list
= tools
::required_array_param(¶m
, "offset-list")?
;
371 if offset_list
.len() != digest_list
.len() {
372 bail
!("offset list has wrong length ({} != {})", offset_list
.len(), digest_list
.len());
375 let env
: &BackupEnvironment
= rpcenv
.as_ref();
377 env
.debug(format
!("fixed_append {} chunks", digest_list
.len()));
379 for (i
, item
) in digest_list
.iter().enumerate() {
380 let digest_str
= item
.as_str().unwrap();
381 let digest
= crate::tools
::hex_to_digest(digest_str
)?
;
382 let offset
= offset_list
[i
].as_u64().unwrap();
383 let size
= env
.lookup_chunk(&digest
).ok_or_else(|| format_err
!("no such chunk {}", digest_str
))?
;
385 env
.fixed_writer_append_chunk(wid
, offset
, size
, &digest
)?
;
387 env
.debug(format
!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str
, wid
, offset
, size
));
393 pub fn api_method_close_dynamic_index() -> ApiMethod
{
396 ObjectSchema
::new("Close dynamic index writer.")
397 .required("wid", IntegerSchema
::new("Dynamic writer ID.")
401 .required("chunk-count", IntegerSchema
::new("Chunk count. This is used to verify that the server got all chunks.")
404 .required("size", IntegerSchema
::new("File size. This is used to verify that the server got all data.")
410 fn close_dynamic_index (
413 rpcenv
: &mut RpcEnvironment
,
414 ) -> Result
<Value
, Error
> {
416 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
417 let chunk_count
= tools
::required_integer_param(¶m
, "chunk-count")?
as u64;
418 let size
= tools
::required_integer_param(¶m
, "size")?
as u64;
420 let env
: &BackupEnvironment
= rpcenv
.as_ref();
422 env
.dynamic_writer_close(wid
, chunk_count
, size
)?
;
424 env
.log(format
!("sucessfully closed dynamic index {}", wid
));
429 pub fn api_method_close_fixed_index() -> ApiMethod
{
432 ObjectSchema
::new("Close fixed index writer.")
433 .required("wid", IntegerSchema
::new("Fixed writer ID.")
437 .required("chunk-count", IntegerSchema
::new("Chunk count. This is used to verify that the server got all chunks.")
440 .required("size", IntegerSchema
::new("File size. This is used to verify that the server got all data.")
446 fn close_fixed_index (
449 rpcenv
: &mut RpcEnvironment
,
450 ) -> Result
<Value
, Error
> {
452 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
453 let chunk_count
= tools
::required_integer_param(¶m
, "chunk-count")?
as u64;
454 let size
= tools
::required_integer_param(¶m
, "size")?
as u64;
456 let env
: &BackupEnvironment
= rpcenv
.as_ref();
458 env
.fixed_writer_close(wid
, chunk_count
, size
)?
;
460 env
.log(format
!("sucessfully closed fixed index {}", wid
));
468 rpcenv
: &mut RpcEnvironment
,
469 ) -> Result
<Value
, Error
> {
471 let env
: &BackupEnvironment
= rpcenv
.as_ref();
473 env
.finish_backup()?
;
474 env
.log("sucessfully finished backup");
479 pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod
{
482 ObjectSchema
::new(r
###"
483 Download the dynamic chunk index from the previous backup.
484 Simply returns an empty list if this is the first backup.
487 .required("archive-name", crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
.clone())
491 fn dynamic_chunk_index(
495 _info
: &ApiAsyncMethod
,
496 rpcenv
: Box
<RpcEnvironment
>,
497 ) -> Result
<BoxFut
, Error
> {
499 let env
: &BackupEnvironment
= rpcenv
.as_ref();
501 let mut archive_name
= tools
::required_string_param(¶m
, "archive-name")?
.to_owned();
503 if !archive_name
.ends_with(".pxar") {
504 bail
!("wrong archive extension: '{}'", archive_name
);
506 archive_name
.push_str(".didx");
509 let empty_response
= {
511 .status(StatusCode
::OK
)
512 .body(Body
::empty())?
515 let last_backup
= match &env
.last_backup
{
517 None
=> return Ok(Box
::new(future
::ok(empty_response
))),
520 let mut path
= last_backup
.backup_dir
.relative_path();
521 path
.push(&archive_name
);
523 let index
= match env
.datastore
.open_dynamic_reader(path
) {
526 env
.log(format
!("there is no last backup for archive '{}'", archive_name
));
527 return Ok(Box
::new(future
::ok(empty_response
)));
531 env
.log(format
!("download last backup index for archive '{}'", archive_name
));
533 let count
= index
.index_count();
534 for pos
in 0..count
{
535 let (start
, end
, digest
) = index
.chunk_info(pos
)?
;
536 let size
= (end
- start
) as u32;
537 env
.register_chunk(digest
, size
)?
;
540 let reader
= DigestListEncoder
::new(Box
::new(index
));
542 let stream
= WrappedReaderStream
::new(reader
);
544 // fixme: set size, content type?
545 let response
= http
::Response
::builder()
547 .body(Body
::wrap_stream(stream
))?
;
549 Ok(Box
::new(future
::ok(response
)))
552 pub fn api_method_fixed_chunk_index() -> ApiAsyncMethod
{
555 ObjectSchema
::new(r
###"
556 Download the fixed chunk index from the previous backup.
557 Simply returns an empty list if this is the first backup.
560 .required("archive-name", crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
.clone())
564 fn fixed_chunk_index(
568 _info
: &ApiAsyncMethod
,
569 rpcenv
: Box
<RpcEnvironment
>,
570 ) -> Result
<BoxFut
, Error
> {
572 let env
: &BackupEnvironment
= rpcenv
.as_ref();
574 let mut archive_name
= tools
::required_string_param(¶m
, "archive-name")?
.to_owned();
576 if !archive_name
.ends_with(".img") {
577 bail
!("wrong archive extension: '{}'", archive_name
);
579 archive_name
.push_str(".fidx");
582 let empty_response
= {
584 .status(StatusCode
::OK
)
585 .body(Body
::empty())?
588 let last_backup
= match &env
.last_backup
{
590 None
=> return Ok(Box
::new(future
::ok(empty_response
))),
593 let mut path
= last_backup
.backup_dir
.relative_path();
594 path
.push(&archive_name
);
596 let index
= match env
.datastore
.open_fixed_reader(path
) {
599 env
.log(format
!("there is no last backup for archive '{}'", archive_name
));
600 return Ok(Box
::new(future
::ok(empty_response
)));
604 env
.log(format
!("download last backup index for archive '{}'", archive_name
));
606 let count
= index
.index_count();
607 for pos
in 0..count
{
608 let digest
= index
.index_digest(pos
).unwrap();
609 let size
= index
.chunk_size
as u32;
610 env
.register_chunk(*digest
, size
)?
;
613 let reader
= DigestListEncoder
::new(Box
::new(index
));
615 let stream
= WrappedReaderStream
::new(reader
);
617 // fixme: set size, content type?
618 let response
= http
::Response
::builder()
620 .body(Body
::wrap_stream(stream
))?
;
622 Ok(Box
::new(future
::ok(response
)))