]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
rewrite future select in upgrade_to_backup_protocol using select macro
[proxmox-backup.git] / src / api2 / backup.rs
1 use failure::*;
2 use futures::*;
3 use hyper::header::{HeaderValue, UPGRADE};
4 use hyper::http::request::Parts;
5 use hyper::{Body, Response, StatusCode};
6 use serde_json::{json, Value};
7
8 use proxmox::{sortable, identity, list_subdirs_api_method};
9 use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment};
10 use proxmox::api::router::SubdirMap;
11 use proxmox::api::schema::*;
12
13 use crate::tools::{self, WrappedReaderStream};
14 use crate::server::{WorkerTask, H2Service};
15 use crate::backup::*;
16 use crate::api2::types::*;
17
18 mod environment;
19 use environment::*;
20
21 mod upload_chunk;
22 use upload_chunk::*;
23
24 pub const ROUTER: Router = Router::new()
25 .upgrade(&API_METHOD_UPGRADE_BACKUP);
26
27 #[sortable]
28 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
29 &ApiHandler::AsyncHttp(&upgrade_to_backup_protocol),
30 &ObjectSchema::new(
31 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
32 &sorted!([
33 ("store", false, &DATASTORE_SCHEMA),
34 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
35 ("backup-id", false, &BACKUP_ID_SCHEMA),
36 ("backup-time", false, &BACKUP_TIME_SCHEMA),
37 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
38 ]),
39 )
40 );
41
42 fn upgrade_to_backup_protocol(
43 parts: Parts,
44 req_body: Body,
45 param: Value,
46 _info: &ApiMethod,
47 rpcenv: Box<dyn RpcEnvironment>,
48 ) -> ApiResponseFuture {
49
50 async move {
51 let debug = param["debug"].as_bool().unwrap_or(false);
52
53 let store = tools::required_string_param(&param, "store")?.to_owned();
54 let datastore = DataStore::lookup_datastore(&store)?;
55
56 let backup_type = tools::required_string_param(&param, "backup-type")?;
57 let backup_id = tools::required_string_param(&param, "backup-id")?;
58 let backup_time = tools::required_integer_param(&param, "backup-time")?;
59
60 let protocols = parts
61 .headers
62 .get("UPGRADE")
63 .ok_or_else(|| format_err!("missing Upgrade header"))?
64 .to_str()?;
65
66 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
67 bail!("invalid protocol name");
68 }
69
70 if parts.version >= http::version::Version::HTTP_2 {
71 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
72 }
73
74 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
75
76 let username = rpcenv.get_user().unwrap();
77 let env_type = rpcenv.env_type();
78
79 let backup_group = BackupGroup::new(backup_type, backup_id);
80 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
81 let backup_dir = BackupDir::new_with_group(backup_group, backup_time);
82
83 if let Some(last) = &last_backup {
84 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
85 bail!("backup timestamp is older than last backup.");
86 }
87 // fixme: abort if last backup is still running - howto test?
88 // Idea: write upid into a file inside snapshot dir. then test if
89 // it is still running here.
90 }
91
92 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
93 if !is_new { bail!("backup directorty already exists."); }
94
95 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
96 let mut env = BackupEnvironment::new(
97 env_type, username.clone(), worker.clone(), datastore, backup_dir);
98
99 env.debug = debug;
100 env.last_backup = last_backup;
101
102 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
103
104 let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_API_ROUTER, debug);
105
106 let abort_future = worker.abort_future();
107
108 let env2 = env.clone();
109
110 let mut req_fut = req_body
111 .on_upgrade()
112 .map_err(Error::from)
113 .and_then(move |conn| {
114 env2.debug("protocol upgrade done");
115
116 let mut http = hyper::server::conn::Http::new();
117 http.http2_only(true);
118 // increase window size: todo - find optiomal size
119 let window_size = 32*1024*1024; // max = (1 << 31) - 2
120 http.http2_initial_stream_window_size(window_size);
121 http.http2_initial_connection_window_size(window_size);
122
123 http.serve_connection(conn, service)
124 .map_err(Error::from)
125 });
126 let mut abort_future = abort_future
127 .map(|_| Err(format_err!("task aborted")));
128
129 async move {
130 let res = select!{
131 req = req_fut => req,
132 abrt = abort_future => abrt,
133 };
134
135 match (res, env.ensure_finished()) {
136 (Ok(_), Ok(())) => {
137 env.log("backup finished sucessfully");
138 Ok(())
139 },
140 (Err(err), Ok(())) => {
141 // ignore errors after finish
142 env.log(format!("backup had errors but finished: {}", err));
143 Ok(())
144 },
145 (Ok(_), Err(err)) => {
146 env.log(format!("backup ended and finish failed: {}", err));
147 env.log("removing unfinished backup");
148 env.remove_backup()?;
149 Err(err)
150 },
151 (Err(err), Err(_)) => {
152 env.log(format!("backup failed: {}", err));
153 env.log("removing failed backup");
154 env.remove_backup()?;
155 Err(err)
156 },
157 }
158 }
159 })?;
160
161 let response = Response::builder()
162 .status(StatusCode::SWITCHING_PROTOCOLS)
163 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
164 .body(Body::empty())?;
165
166 Ok(response)
167 }.boxed()
168 }
169
170 pub const BACKUP_API_SUBDIRS: SubdirMap = &[
171 (
172 "blob", &Router::new()
173 .upload(&API_METHOD_UPLOAD_BLOB)
174 ),
175 (
176 "dynamic_chunk", &Router::new()
177 .upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK)
178 ),
179 (
180 "dynamic_close", &Router::new()
181 .post(&API_METHOD_CLOSE_DYNAMIC_INDEX)
182 ),
183 (
184 "dynamic_index", &Router::new()
185 .download(&API_METHOD_DYNAMIC_CHUNK_INDEX)
186 .post(&API_METHOD_CREATE_DYNAMIC_INDEX)
187 .put(&API_METHOD_DYNAMIC_APPEND)
188 ),
189 (
190 "finish", &Router::new()
191 .post(
192 &ApiMethod::new(
193 &ApiHandler::Sync(&finish_backup),
194 &ObjectSchema::new("Mark backup as finished.", &[])
195 )
196 )
197 ),
198 (
199 "fixed_chunk", &Router::new()
200 .upload(&API_METHOD_UPLOAD_FIXED_CHUNK)
201 ),
202 (
203 "fixed_close", &Router::new()
204 .post(&API_METHOD_CLOSE_FIXED_INDEX)
205 ),
206 (
207 "fixed_index", &Router::new()
208 .download(&API_METHOD_FIXED_CHUNK_INDEX)
209 .post(&API_METHOD_CREATE_FIXED_INDEX)
210 .put(&API_METHOD_FIXED_APPEND)
211 ),
212 (
213 "speedtest", &Router::new()
214 .upload(&API_METHOD_UPLOAD_SPEEDTEST)
215 ),
216 ];
217
218 pub const BACKUP_API_ROUTER: Router = Router::new()
219 .get(&list_subdirs_api_method!(BACKUP_API_SUBDIRS))
220 .subdirs(BACKUP_API_SUBDIRS);
221
222 #[sortable]
223 pub const API_METHOD_CREATE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
224 &ApiHandler::Sync(&create_dynamic_index),
225 &ObjectSchema::new(
226 "Create dynamic chunk index file.",
227 &sorted!([
228 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
229 ]),
230 )
231 );
232
233 fn create_dynamic_index(
234 param: Value,
235 _info: &ApiMethod,
236 rpcenv: &mut dyn RpcEnvironment,
237 ) -> Result<Value, Error> {
238
239 let env: &BackupEnvironment = rpcenv.as_ref();
240
241 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
242
243 let archive_name = name.clone();
244 if !archive_name.ends_with(".didx") {
245 bail!("wrong archive extension: '{}'", archive_name);
246 }
247
248 let mut path = env.backup_dir.relative_path();
249 path.push(archive_name);
250
251 let index = env.datastore.create_dynamic_writer(&path)?;
252 let wid = env.register_dynamic_writer(index, name)?;
253
254 env.log(format!("created new dynamic index {} ({:?})", wid, path));
255
256 Ok(json!(wid))
257 }
258
259 #[sortable]
260 pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
261 &ApiHandler::Sync(&create_fixed_index),
262 &ObjectSchema::new(
263 "Create fixed chunk index file.",
264 &sorted!([
265 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
266 ("size", false, &IntegerSchema::new("File size.")
267 .minimum(1)
268 .schema()
269 ),
270 ]),
271 )
272 );
273
274 fn create_fixed_index(
275 param: Value,
276 _info: &ApiMethod,
277 rpcenv: &mut dyn RpcEnvironment,
278 ) -> Result<Value, Error> {
279
280 let env: &BackupEnvironment = rpcenv.as_ref();
281
282 println!("PARAM: {:?}", param);
283
284 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
285 let size = tools::required_integer_param(&param, "size")? as usize;
286
287 let archive_name = name.clone();
288 if !archive_name.ends_with(".fidx") {
289 bail!("wrong archive extension: '{}'", archive_name);
290 }
291
292 let mut path = env.backup_dir.relative_path();
293 path.push(archive_name);
294
295 let chunk_size = 4096*1024; // todo: ??
296
297 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
298 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
299
300 env.log(format!("created new fixed index {} ({:?})", wid, path));
301
302 Ok(json!(wid))
303 }
304
305 #[sortable]
306 pub const API_METHOD_DYNAMIC_APPEND: ApiMethod = ApiMethod::new(
307 &ApiHandler::Sync(&dynamic_append),
308 &ObjectSchema::new(
309 "Append chunk to dynamic index writer.",
310 &sorted!([
311 (
312 "wid",
313 false,
314 &IntegerSchema::new("Dynamic writer ID.")
315 .minimum(1)
316 .maximum(256)
317 .schema()
318 ),
319 (
320 "digest-list",
321 false,
322 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
323 ),
324 (
325 "offset-list",
326 false,
327 &ArraySchema::new(
328 "Chunk offset list.",
329 &IntegerSchema::new("Corresponding chunk offsets.")
330 .minimum(0)
331 .schema()
332 ).schema()
333 ),
334 ]),
335 )
336 );
337
338 fn dynamic_append (
339 param: Value,
340 _info: &ApiMethod,
341 rpcenv: &mut dyn RpcEnvironment,
342 ) -> Result<Value, Error> {
343
344 let wid = tools::required_integer_param(&param, "wid")? as usize;
345 let digest_list = tools::required_array_param(&param, "digest-list")?;
346 let offset_list = tools::required_array_param(&param, "offset-list")?;
347
348 if offset_list.len() != digest_list.len() {
349 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
350 }
351
352 let env: &BackupEnvironment = rpcenv.as_ref();
353
354 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
355
356 for (i, item) in digest_list.iter().enumerate() {
357 let digest_str = item.as_str().unwrap();
358 let digest = proxmox::tools::hex_to_digest(digest_str)?;
359 let offset = offset_list[i].as_u64().unwrap();
360 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
361
362 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
363
364 env.debug(format!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
365 }
366
367 Ok(Value::Null)
368 }
369
370 #[sortable]
371 pub const API_METHOD_FIXED_APPEND: ApiMethod = ApiMethod::new(
372 &ApiHandler::Sync(&fixed_append),
373 &ObjectSchema::new(
374 "Append chunk to fixed index writer.",
375 &sorted!([
376 (
377 "wid",
378 false,
379 &IntegerSchema::new("Fixed writer ID.")
380 .minimum(1)
381 .maximum(256)
382 .schema()
383 ),
384 (
385 "digest-list",
386 false,
387 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
388 ),
389 (
390 "offset-list",
391 false,
392 &ArraySchema::new(
393 "Chunk offset list.",
394 &IntegerSchema::new("Corresponding chunk offsets.")
395 .minimum(0)
396 .schema()
397 ).schema()
398 )
399 ]),
400 )
401 );
402
403 fn fixed_append (
404 param: Value,
405 _info: &ApiMethod,
406 rpcenv: &mut dyn RpcEnvironment,
407 ) -> Result<Value, Error> {
408
409 let wid = tools::required_integer_param(&param, "wid")? as usize;
410 let digest_list = tools::required_array_param(&param, "digest-list")?;
411 let offset_list = tools::required_array_param(&param, "offset-list")?;
412
413 if offset_list.len() != digest_list.len() {
414 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
415 }
416
417 let env: &BackupEnvironment = rpcenv.as_ref();
418
419 env.debug(format!("fixed_append {} chunks", digest_list.len()));
420
421 for (i, item) in digest_list.iter().enumerate() {
422 let digest_str = item.as_str().unwrap();
423 let digest = proxmox::tools::hex_to_digest(digest_str)?;
424 let offset = offset_list[i].as_u64().unwrap();
425 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
426
427 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
428
429 env.debug(format!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
430 }
431
432 Ok(Value::Null)
433 }
434
435 #[sortable]
436 pub const API_METHOD_CLOSE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
437 &ApiHandler::Sync(&close_dynamic_index),
438 &ObjectSchema::new(
439 "Close dynamic index writer.",
440 &sorted!([
441 (
442 "wid",
443 false,
444 &IntegerSchema::new("Dynamic writer ID.")
445 .minimum(1)
446 .maximum(256)
447 .schema()
448 ),
449 (
450 "chunk-count",
451 false,
452 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
453 .minimum(1)
454 .schema()
455 ),
456 (
457 "size",
458 false,
459 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
460 .minimum(1)
461 .schema()
462 ),
463 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
464 ]),
465 )
466 );
467
468 fn close_dynamic_index (
469 param: Value,
470 _info: &ApiMethod,
471 rpcenv: &mut dyn RpcEnvironment,
472 ) -> Result<Value, Error> {
473
474 let wid = tools::required_integer_param(&param, "wid")? as usize;
475 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
476 let size = tools::required_integer_param(&param, "size")? as u64;
477 let csum_str = tools::required_string_param(&param, "csum")?;
478 let csum = proxmox::tools::hex_to_digest(csum_str)?;
479
480 let env: &BackupEnvironment = rpcenv.as_ref();
481
482 env.dynamic_writer_close(wid, chunk_count, size, csum)?;
483
484 env.log(format!("sucessfully closed dynamic index {}", wid));
485
486 Ok(Value::Null)
487 }
488
489 #[sortable]
490 pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
491 &ApiHandler::Sync(&close_fixed_index),
492 &ObjectSchema::new(
493 "Close fixed index writer.",
494 &sorted!([
495 (
496 "wid",
497 false,
498 &IntegerSchema::new("Fixed writer ID.")
499 .minimum(1)
500 .maximum(256)
501 .schema()
502 ),
503 (
504 "chunk-count",
505 false,
506 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
507 .minimum(1)
508 .schema()
509 ),
510 (
511 "size",
512 false,
513 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
514 .minimum(1)
515 .schema()
516 ),
517 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
518 ]),
519 )
520 );
521
522 fn close_fixed_index (
523 param: Value,
524 _info: &ApiMethod,
525 rpcenv: &mut dyn RpcEnvironment,
526 ) -> Result<Value, Error> {
527
528 let wid = tools::required_integer_param(&param, "wid")? as usize;
529 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
530 let size = tools::required_integer_param(&param, "size")? as u64;
531 let csum_str = tools::required_string_param(&param, "csum")?;
532 let csum = proxmox::tools::hex_to_digest(csum_str)?;
533
534 let env: &BackupEnvironment = rpcenv.as_ref();
535
536 env.fixed_writer_close(wid, chunk_count, size, csum)?;
537
538 env.log(format!("sucessfully closed fixed index {}", wid));
539
540 Ok(Value::Null)
541 }
542
543 fn finish_backup (
544 _param: Value,
545 _info: &ApiMethod,
546 rpcenv: &mut dyn RpcEnvironment,
547 ) -> Result<Value, Error> {
548
549 let env: &BackupEnvironment = rpcenv.as_ref();
550
551 env.finish_backup()?;
552 env.log("sucessfully finished backup");
553
554 Ok(Value::Null)
555 }
556
557 #[sortable]
558 pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
559 &ApiHandler::AsyncHttp(&dynamic_chunk_index),
560 &ObjectSchema::new(
561 r###"
562 Download the dynamic chunk index from the previous backup.
563 Simply returns an empty list if this is the first backup.
564 "### ,
565 &sorted!([
566 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
567 ]),
568 )
569 );
570
571 fn dynamic_chunk_index(
572 _parts: Parts,
573 _req_body: Body,
574 param: Value,
575 _info: &ApiMethod,
576 rpcenv: Box<dyn RpcEnvironment>,
577 ) -> ApiResponseFuture {
578
579 async move {
580 let env: &BackupEnvironment = rpcenv.as_ref();
581
582 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
583
584 if !archive_name.ends_with(".didx") {
585 bail!("wrong archive extension: '{}'", archive_name);
586 }
587
588 let empty_response = {
589 Response::builder()
590 .status(StatusCode::OK)
591 .body(Body::empty())?
592 };
593
594 let last_backup = match &env.last_backup {
595 Some(info) => info,
596 None => return Ok(empty_response),
597 };
598
599 let mut path = last_backup.backup_dir.relative_path();
600 path.push(&archive_name);
601
602 let index = match env.datastore.open_dynamic_reader(path) {
603 Ok(index) => index,
604 Err(_) => {
605 env.log(format!("there is no last backup for archive '{}'", archive_name));
606 return Ok(empty_response);
607 }
608 };
609
610 env.log(format!("download last backup index for archive '{}'", archive_name));
611
612 let count = index.index_count();
613 for pos in 0..count {
614 let (start, end, digest) = index.chunk_info(pos)?;
615 let size = (end - start) as u32;
616 env.register_chunk(digest, size)?;
617 }
618
619 let reader = DigestListEncoder::new(Box::new(index));
620
621 let stream = WrappedReaderStream::new(reader);
622
623 // fixme: set size, content type?
624 let response = http::Response::builder()
625 .status(200)
626 .body(Body::wrap_stream(stream))?;
627
628 Ok(response)
629 }.boxed()
630 }
631
632 #[sortable]
633 pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
634 &ApiHandler::AsyncHttp(&fixed_chunk_index),
635 &ObjectSchema::new(
636 r###"
637 Download the fixed chunk index from the previous backup.
638 Simply returns an empty list if this is the first backup.
639 "### ,
640 &sorted!([
641 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
642 ]),
643 )
644 );
645
646 fn fixed_chunk_index(
647 _parts: Parts,
648 _req_body: Body,
649 param: Value,
650 _info: &ApiMethod,
651 rpcenv: Box<dyn RpcEnvironment>,
652 ) -> ApiResponseFuture {
653
654 async move {
655 let env: &BackupEnvironment = rpcenv.as_ref();
656
657 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
658
659 if !archive_name.ends_with(".fidx") {
660 bail!("wrong archive extension: '{}'", archive_name);
661 }
662
663 let empty_response = {
664 Response::builder()
665 .status(StatusCode::OK)
666 .body(Body::empty())?
667 };
668
669 let last_backup = match &env.last_backup {
670 Some(info) => info,
671 None => return Ok(empty_response),
672 };
673
674 let mut path = last_backup.backup_dir.relative_path();
675 path.push(&archive_name);
676
677 let index = match env.datastore.open_fixed_reader(path) {
678 Ok(index) => index,
679 Err(_) => {
680 env.log(format!("there is no last backup for archive '{}'", archive_name));
681 return Ok(empty_response);
682 }
683 };
684
685 env.log(format!("download last backup index for archive '{}'", archive_name));
686
687 let count = index.index_count();
688 let image_size = index.index_bytes();
689 for pos in 0..count {
690 let digest = index.index_digest(pos).unwrap();
691 // Note: last chunk can be smaller
692 let start = (pos*index.chunk_size) as u64;
693 let mut end = start + index.chunk_size as u64;
694 if end > image_size { end = image_size; }
695 let size = (end - start) as u32;
696 env.register_chunk(*digest, size)?;
697 }
698
699 let reader = DigestListEncoder::new(Box::new(index));
700
701 let stream = WrappedReaderStream::new(reader);
702
703 // fixme: set size, content type?
704 let response = http::Response::builder()
705 .status(200)
706 .body(Body::wrap_stream(stream))?;
707
708 Ok(response)
709 }.boxed()
710 }