]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
bump proxmox crate to 0.1.7
[proxmox-backup.git] / src / api2 / backup.rs
1 use failure::*;
2 use futures::*;
3 use hyper::header::{HeaderValue, UPGRADE};
4 use hyper::http::request::Parts;
5 use hyper::{Body, Response, StatusCode};
6 use serde_json::{json, Value};
7
8 use proxmox::{sortable, identity, list_subdirs_api_method};
9 use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment};
10 use proxmox::api::router::SubdirMap;
11 use proxmox::api::schema::*;
12
13 use crate::tools;
14 use crate::tools::wrapped_reader_stream::*;
15 use crate::server::{WorkerTask, H2Service};
16 use crate::backup::*;
17 use crate::api2::types::*;
18
19 mod environment;
20 use environment::*;
21
22 mod upload_chunk;
23 use upload_chunk::*;
24
25 pub const ROUTER: Router = Router::new()
26 .upgrade(&API_METHOD_UPGRADE_BACKUP);
27
28 #[sortable]
29 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
30 &ApiHandler::AsyncHttp(&upgrade_to_backup_protocol),
31 &ObjectSchema::new(
32 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
33 &sorted!([
34 ("store", false, &DATASTORE_SCHEMA),
35 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
36 ("backup-id", false, &BACKUP_ID_SCHEMA),
37 ("backup-time", false, &BACKUP_TIME_SCHEMA),
38 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
39 ]),
40 )
41 );
42
43 fn upgrade_to_backup_protocol(
44 parts: Parts,
45 req_body: Body,
46 param: Value,
47 _info: &ApiMethod,
48 rpcenv: Box<dyn RpcEnvironment>,
49 ) -> ApiResponseFuture {
50
51 async move {
52 let debug = param["debug"].as_bool().unwrap_or(false);
53
54 let store = tools::required_string_param(&param, "store")?.to_owned();
55 let datastore = DataStore::lookup_datastore(&store)?;
56
57 let backup_type = tools::required_string_param(&param, "backup-type")?;
58 let backup_id = tools::required_string_param(&param, "backup-id")?;
59 let backup_time = tools::required_integer_param(&param, "backup-time")?;
60
61 let protocols = parts
62 .headers
63 .get("UPGRADE")
64 .ok_or_else(|| format_err!("missing Upgrade header"))?
65 .to_str()?;
66
67 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
68 bail!("invalid protocol name");
69 }
70
71 if parts.version >= http::version::Version::HTTP_2 {
72 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
73 }
74
75 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
76
77 let username = rpcenv.get_user().unwrap();
78 let env_type = rpcenv.env_type();
79
80 let backup_group = BackupGroup::new(backup_type, backup_id);
81 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
82 let backup_dir = BackupDir::new_with_group(backup_group, backup_time);
83
84 if let Some(last) = &last_backup {
85 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
86 bail!("backup timestamp is older than last backup.");
87 }
88 // fixme: abort if last backup is still running - howto test?
89 // Idea: write upid into a file inside snapshot dir. then test if
90 // it is still running here.
91 }
92
93 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
94 if !is_new { bail!("backup directorty already exists."); }
95
96 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
97 let mut env = BackupEnvironment::new(
98 env_type, username.clone(), worker.clone(), datastore, backup_dir);
99
100 env.debug = debug;
101 env.last_backup = last_backup;
102
103 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
104
105 let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_API_ROUTER, debug);
106
107 let abort_future = worker.abort_future();
108
109 let env2 = env.clone();
110 let env3 = env.clone();
111
112 let req_fut = req_body
113 .on_upgrade()
114 .map_err(Error::from)
115 .and_then(move |conn| {
116 env3.debug("protocol upgrade done");
117
118 let mut http = hyper::server::conn::Http::new();
119 http.http2_only(true);
120 // increase window size: todo - find optiomal size
121 let window_size = 32*1024*1024; // max = (1 << 31) - 2
122 http.http2_initial_stream_window_size(window_size);
123 http.http2_initial_connection_window_size(window_size);
124
125 http.serve_connection(conn, service)
126 .map_err(Error::from)
127 });
128 let abort_future = abort_future
129 .map(|_| Err(format_err!("task aborted")));
130
131 use futures::future::Either;
132 future::select(req_fut, abort_future)
133 .map(|res| match res {
134 Either::Left((Ok(res), _)) => Ok(res),
135 Either::Left((Err(err), _)) => Err(err),
136 Either::Right((Ok(res), _)) => Ok(res),
137 Either::Right((Err(err), _)) => Err(err),
138 })
139 .and_then(move |_result| async move {
140 env.ensure_finished()?;
141 env.log("backup finished sucessfully");
142 Ok(())
143 })
144 .then(move |result| async move {
145 if let Err(err) = result {
146 match env2.ensure_finished() {
147 Ok(()) => {}, // ignore error after finish
148 _ => {
149 env2.log(format!("backup failed: {}", err));
150 env2.log("removing failed backup");
151 env2.remove_backup()?;
152 return Err(err);
153 }
154 }
155 }
156 Ok(())
157 })
158 })?;
159
160 let response = Response::builder()
161 .status(StatusCode::SWITCHING_PROTOCOLS)
162 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
163 .body(Body::empty())?;
164
165 Ok(response)
166 }.boxed()
167 }
168
169 pub const BACKUP_API_SUBDIRS: SubdirMap = &[
170 (
171 "blob", &Router::new()
172 .upload(&API_METHOD_UPLOAD_BLOB)
173 ),
174 (
175 "dynamic_chunk", &Router::new()
176 .upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK)
177 ),
178 (
179 "dynamic_close", &Router::new()
180 .post(&API_METHOD_CLOSE_DYNAMIC_INDEX)
181 ),
182 (
183 "dynamic_index", &Router::new()
184 .download(&API_METHOD_DYNAMIC_CHUNK_INDEX)
185 .post(&API_METHOD_CREATE_DYNAMIC_INDEX)
186 .put(&API_METHOD_DYNAMIC_APPEND)
187 ),
188 (
189 "finish", &Router::new()
190 .post(
191 &ApiMethod::new(
192 &ApiHandler::Sync(&finish_backup),
193 &ObjectSchema::new("Mark backup as finished.", &[])
194 )
195 )
196 ),
197 (
198 "fixed_chunk", &Router::new()
199 .upload(&API_METHOD_UPLOAD_FIXED_CHUNK)
200 ),
201 (
202 "fixed_close", &Router::new()
203 .post(&API_METHOD_CLOSE_FIXED_INDEX)
204 ),
205 (
206 "fixed_index", &Router::new()
207 .download(&API_METHOD_FIXED_CHUNK_INDEX)
208 .post(&API_METHOD_CREATE_FIXED_INDEX)
209 .put(&API_METHOD_FIXED_APPEND)
210 ),
211 (
212 "speedtest", &Router::new()
213 .upload(&API_METHOD_UPLOAD_SPEEDTEST)
214 ),
215 ];
216
217 pub const BACKUP_API_ROUTER: Router = Router::new()
218 .get(&list_subdirs_api_method!(BACKUP_API_SUBDIRS))
219 .subdirs(BACKUP_API_SUBDIRS);
220
221 #[sortable]
222 pub const API_METHOD_CREATE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
223 &ApiHandler::Sync(&create_dynamic_index),
224 &ObjectSchema::new(
225 "Create dynamic chunk index file.",
226 &sorted!([
227 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
228 ]),
229 )
230 );
231
232 fn create_dynamic_index(
233 param: Value,
234 _info: &ApiMethod,
235 rpcenv: &mut dyn RpcEnvironment,
236 ) -> Result<Value, Error> {
237
238 let env: &BackupEnvironment = rpcenv.as_ref();
239
240 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
241
242 let archive_name = name.clone();
243 if !archive_name.ends_with(".didx") {
244 bail!("wrong archive extension: '{}'", archive_name);
245 }
246
247 let mut path = env.backup_dir.relative_path();
248 path.push(archive_name);
249
250 let index = env.datastore.create_dynamic_writer(&path)?;
251 let wid = env.register_dynamic_writer(index, name)?;
252
253 env.log(format!("created new dynamic index {} ({:?})", wid, path));
254
255 Ok(json!(wid))
256 }
257
258 #[sortable]
259 pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
260 &ApiHandler::Sync(&create_fixed_index),
261 &ObjectSchema::new(
262 "Create fixed chunk index file.",
263 &sorted!([
264 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
265 ("size", false, &IntegerSchema::new("File size.")
266 .minimum(1)
267 .schema()
268 ),
269 ]),
270 )
271 );
272
273 fn create_fixed_index(
274 param: Value,
275 _info: &ApiMethod,
276 rpcenv: &mut dyn RpcEnvironment,
277 ) -> Result<Value, Error> {
278
279 let env: &BackupEnvironment = rpcenv.as_ref();
280
281 println!("PARAM: {:?}", param);
282
283 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
284 let size = tools::required_integer_param(&param, "size")? as usize;
285
286 let archive_name = name.clone();
287 if !archive_name.ends_with(".fidx") {
288 bail!("wrong archive extension: '{}'", archive_name);
289 }
290
291 let mut path = env.backup_dir.relative_path();
292 path.push(archive_name);
293
294 let chunk_size = 4096*1024; // todo: ??
295
296 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
297 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
298
299 env.log(format!("created new fixed index {} ({:?})", wid, path));
300
301 Ok(json!(wid))
302 }
303
304 #[sortable]
305 pub const API_METHOD_DYNAMIC_APPEND: ApiMethod = ApiMethod::new(
306 &ApiHandler::Sync(&dynamic_append),
307 &ObjectSchema::new(
308 "Append chunk to dynamic index writer.",
309 &sorted!([
310 (
311 "wid",
312 false,
313 &IntegerSchema::new("Dynamic writer ID.")
314 .minimum(1)
315 .maximum(256)
316 .schema()
317 ),
318 (
319 "digest-list",
320 false,
321 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
322 ),
323 (
324 "offset-list",
325 false,
326 &ArraySchema::new(
327 "Chunk offset list.",
328 &IntegerSchema::new("Corresponding chunk offsets.")
329 .minimum(0)
330 .schema()
331 ).schema()
332 ),
333 ]),
334 )
335 );
336
337 fn dynamic_append (
338 param: Value,
339 _info: &ApiMethod,
340 rpcenv: &mut dyn RpcEnvironment,
341 ) -> Result<Value, Error> {
342
343 let wid = tools::required_integer_param(&param, "wid")? as usize;
344 let digest_list = tools::required_array_param(&param, "digest-list")?;
345 let offset_list = tools::required_array_param(&param, "offset-list")?;
346
347 if offset_list.len() != digest_list.len() {
348 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
349 }
350
351 let env: &BackupEnvironment = rpcenv.as_ref();
352
353 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
354
355 for (i, item) in digest_list.iter().enumerate() {
356 let digest_str = item.as_str().unwrap();
357 let digest = proxmox::tools::hex_to_digest(digest_str)?;
358 let offset = offset_list[i].as_u64().unwrap();
359 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
360
361 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
362
363 env.debug(format!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
364 }
365
366 Ok(Value::Null)
367 }
368
369 #[sortable]
370 pub const API_METHOD_FIXED_APPEND: ApiMethod = ApiMethod::new(
371 &ApiHandler::Sync(&fixed_append),
372 &ObjectSchema::new(
373 "Append chunk to fixed index writer.",
374 &sorted!([
375 (
376 "wid",
377 false,
378 &IntegerSchema::new("Fixed writer ID.")
379 .minimum(1)
380 .maximum(256)
381 .schema()
382 ),
383 (
384 "digest-list",
385 false,
386 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
387 ),
388 (
389 "offset-list",
390 false,
391 &ArraySchema::new(
392 "Chunk offset list.",
393 &IntegerSchema::new("Corresponding chunk offsets.")
394 .minimum(0)
395 .schema()
396 ).schema()
397 )
398 ]),
399 )
400 );
401
402 fn fixed_append (
403 param: Value,
404 _info: &ApiMethod,
405 rpcenv: &mut dyn RpcEnvironment,
406 ) -> Result<Value, Error> {
407
408 let wid = tools::required_integer_param(&param, "wid")? as usize;
409 let digest_list = tools::required_array_param(&param, "digest-list")?;
410 let offset_list = tools::required_array_param(&param, "offset-list")?;
411
412 if offset_list.len() != digest_list.len() {
413 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
414 }
415
416 let env: &BackupEnvironment = rpcenv.as_ref();
417
418 env.debug(format!("fixed_append {} chunks", digest_list.len()));
419
420 for (i, item) in digest_list.iter().enumerate() {
421 let digest_str = item.as_str().unwrap();
422 let digest = proxmox::tools::hex_to_digest(digest_str)?;
423 let offset = offset_list[i].as_u64().unwrap();
424 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
425
426 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
427
428 env.debug(format!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
429 }
430
431 Ok(Value::Null)
432 }
433
434 #[sortable]
435 pub const API_METHOD_CLOSE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
436 &ApiHandler::Sync(&close_dynamic_index),
437 &ObjectSchema::new(
438 "Close dynamic index writer.",
439 &sorted!([
440 (
441 "wid",
442 false,
443 &IntegerSchema::new("Dynamic writer ID.")
444 .minimum(1)
445 .maximum(256)
446 .schema()
447 ),
448 (
449 "chunk-count",
450 false,
451 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
452 .minimum(1)
453 .schema()
454 ),
455 (
456 "size",
457 false,
458 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
459 .minimum(1)
460 .schema()
461 ),
462 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
463 ]),
464 )
465 );
466
467 fn close_dynamic_index (
468 param: Value,
469 _info: &ApiMethod,
470 rpcenv: &mut dyn RpcEnvironment,
471 ) -> Result<Value, Error> {
472
473 let wid = tools::required_integer_param(&param, "wid")? as usize;
474 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
475 let size = tools::required_integer_param(&param, "size")? as u64;
476 let csum_str = tools::required_string_param(&param, "csum")?;
477 let csum = proxmox::tools::hex_to_digest(csum_str)?;
478
479 let env: &BackupEnvironment = rpcenv.as_ref();
480
481 env.dynamic_writer_close(wid, chunk_count, size, csum)?;
482
483 env.log(format!("sucessfully closed dynamic index {}", wid));
484
485 Ok(Value::Null)
486 }
487
488 #[sortable]
489 pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
490 &ApiHandler::Sync(&close_fixed_index),
491 &ObjectSchema::new(
492 "Close fixed index writer.",
493 &sorted!([
494 (
495 "wid",
496 false,
497 &IntegerSchema::new("Fixed writer ID.")
498 .minimum(1)
499 .maximum(256)
500 .schema()
501 ),
502 (
503 "chunk-count",
504 false,
505 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
506 .minimum(1)
507 .schema()
508 ),
509 (
510 "size",
511 false,
512 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
513 .minimum(1)
514 .schema()
515 ),
516 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
517 ]),
518 )
519 );
520
521 fn close_fixed_index (
522 param: Value,
523 _info: &ApiMethod,
524 rpcenv: &mut dyn RpcEnvironment,
525 ) -> Result<Value, Error> {
526
527 let wid = tools::required_integer_param(&param, "wid")? as usize;
528 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
529 let size = tools::required_integer_param(&param, "size")? as u64;
530 let csum_str = tools::required_string_param(&param, "csum")?;
531 let csum = proxmox::tools::hex_to_digest(csum_str)?;
532
533 let env: &BackupEnvironment = rpcenv.as_ref();
534
535 env.fixed_writer_close(wid, chunk_count, size, csum)?;
536
537 env.log(format!("sucessfully closed fixed index {}", wid));
538
539 Ok(Value::Null)
540 }
541
542 fn finish_backup (
543 _param: Value,
544 _info: &ApiMethod,
545 rpcenv: &mut dyn RpcEnvironment,
546 ) -> Result<Value, Error> {
547
548 let env: &BackupEnvironment = rpcenv.as_ref();
549
550 env.finish_backup()?;
551 env.log("sucessfully finished backup");
552
553 Ok(Value::Null)
554 }
555
556 #[sortable]
557 pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
558 &ApiHandler::AsyncHttp(&dynamic_chunk_index),
559 &ObjectSchema::new(
560 r###"
561 Download the dynamic chunk index from the previous backup.
562 Simply returns an empty list if this is the first backup.
563 "### ,
564 &sorted!([
565 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
566 ]),
567 )
568 );
569
570 fn dynamic_chunk_index(
571 _parts: Parts,
572 _req_body: Body,
573 param: Value,
574 _info: &ApiMethod,
575 rpcenv: Box<dyn RpcEnvironment>,
576 ) -> ApiResponseFuture {
577
578 async move {
579 let env: &BackupEnvironment = rpcenv.as_ref();
580
581 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
582
583 if !archive_name.ends_with(".didx") {
584 bail!("wrong archive extension: '{}'", archive_name);
585 }
586
587 let empty_response = {
588 Response::builder()
589 .status(StatusCode::OK)
590 .body(Body::empty())?
591 };
592
593 let last_backup = match &env.last_backup {
594 Some(info) => info,
595 None => return Ok(empty_response),
596 };
597
598 let mut path = last_backup.backup_dir.relative_path();
599 path.push(&archive_name);
600
601 let index = match env.datastore.open_dynamic_reader(path) {
602 Ok(index) => index,
603 Err(_) => {
604 env.log(format!("there is no last backup for archive '{}'", archive_name));
605 return Ok(empty_response);
606 }
607 };
608
609 env.log(format!("download last backup index for archive '{}'", archive_name));
610
611 let count = index.index_count();
612 for pos in 0..count {
613 let (start, end, digest) = index.chunk_info(pos)?;
614 let size = (end - start) as u32;
615 env.register_chunk(digest, size)?;
616 }
617
618 let reader = DigestListEncoder::new(Box::new(index));
619
620 let stream = WrappedReaderStream::new(reader);
621
622 // fixme: set size, content type?
623 let response = http::Response::builder()
624 .status(200)
625 .body(Body::wrap_stream(stream))?;
626
627 Ok(response)
628 }.boxed()
629 }
630
631 #[sortable]
632 pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
633 &ApiHandler::AsyncHttp(&fixed_chunk_index),
634 &ObjectSchema::new(
635 r###"
636 Download the fixed chunk index from the previous backup.
637 Simply returns an empty list if this is the first backup.
638 "### ,
639 &sorted!([
640 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
641 ]),
642 )
643 );
644
645 fn fixed_chunk_index(
646 _parts: Parts,
647 _req_body: Body,
648 param: Value,
649 _info: &ApiMethod,
650 rpcenv: Box<dyn RpcEnvironment>,
651 ) -> ApiResponseFuture {
652
653 async move {
654 let env: &BackupEnvironment = rpcenv.as_ref();
655
656 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
657
658 if !archive_name.ends_with(".fidx") {
659 bail!("wrong archive extension: '{}'", archive_name);
660 }
661
662 let empty_response = {
663 Response::builder()
664 .status(StatusCode::OK)
665 .body(Body::empty())?
666 };
667
668 let last_backup = match &env.last_backup {
669 Some(info) => info,
670 None => return Ok(empty_response),
671 };
672
673 let mut path = last_backup.backup_dir.relative_path();
674 path.push(&archive_name);
675
676 let index = match env.datastore.open_fixed_reader(path) {
677 Ok(index) => index,
678 Err(_) => {
679 env.log(format!("there is no last backup for archive '{}'", archive_name));
680 return Ok(empty_response);
681 }
682 };
683
684 env.log(format!("download last backup index for archive '{}'", archive_name));
685
686 let count = index.index_count();
687 let image_size = index.index_bytes();
688 for pos in 0..count {
689 let digest = index.index_digest(pos).unwrap();
690 // Note: last chunk can be smaller
691 let start = (pos*index.chunk_size) as u64;
692 let mut end = start + index.chunk_size as u64;
693 if end > image_size { end = image_size; }
694 let size = (end - start) as u32;
695 env.register_chunk(*digest, size)?;
696 }
697
698 let reader = DigestListEncoder::new(Box::new(index));
699
700 let stream = WrappedReaderStream::new(reader);
701
702 // fixme: set size, content type?
703 let response = http::Response::builder()
704 .status(200)
705 .body(Body::wrap_stream(stream))?;
706
707 Ok(response)
708 }.boxed()
709 }