]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
switch from failure to anyhow
[proxmox-backup.git] / src / api2 / backup.rs
1 use anyhow::{bail, format_err, Error};
2 use futures::*;
3 use hyper::header::{HeaderValue, UPGRADE};
4 use hyper::http::request::Parts;
5 use hyper::{Body, Response, StatusCode};
6 use serde_json::{json, Value};
7
8 use proxmox::{sortable, identity, list_subdirs_api_method};
9 use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
10 use proxmox::api::router::SubdirMap;
11 use proxmox::api::schema::*;
12
13 use crate::tools::{self, WrappedReaderStream};
14 use crate::server::{WorkerTask, H2Service};
15 use crate::backup::*;
16 use crate::api2::types::*;
17 use crate::config::acl::PRIV_DATASTORE_ALLOCATE_SPACE;
18
19 mod environment;
20 use environment::*;
21
22 mod upload_chunk;
23 use upload_chunk::*;
24
25 pub const ROUTER: Router = Router::new()
26 .upgrade(&API_METHOD_UPGRADE_BACKUP);
27
28 #[sortable]
29 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
30 &ApiHandler::AsyncHttp(&upgrade_to_backup_protocol),
31 &ObjectSchema::new(
32 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
33 &sorted!([
34 ("store", false, &DATASTORE_SCHEMA),
35 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
36 ("backup-id", false, &BACKUP_ID_SCHEMA),
37 ("backup-time", false, &BACKUP_TIME_SCHEMA),
38 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
39 ]),
40 )
41 ).access(None, &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_ALLOCATE_SPACE, false));
42
43 fn upgrade_to_backup_protocol(
44 parts: Parts,
45 req_body: Body,
46 param: Value,
47 _info: &ApiMethod,
48 rpcenv: Box<dyn RpcEnvironment>,
49 ) -> ApiResponseFuture {
50
51 async move {
52 let debug = param["debug"].as_bool().unwrap_or(false);
53
54 let store = tools::required_string_param(&param, "store")?.to_owned();
55 let datastore = DataStore::lookup_datastore(&store)?;
56
57 let backup_type = tools::required_string_param(&param, "backup-type")?;
58 let backup_id = tools::required_string_param(&param, "backup-id")?;
59 let backup_time = tools::required_integer_param(&param, "backup-time")?;
60
61 let protocols = parts
62 .headers
63 .get("UPGRADE")
64 .ok_or_else(|| format_err!("missing Upgrade header"))?
65 .to_str()?;
66
67 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
68 bail!("invalid protocol name");
69 }
70
71 if parts.version >= http::version::Version::HTTP_2 {
72 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
73 }
74
75 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
76
77 let username = rpcenv.get_user().unwrap();
78 let env_type = rpcenv.env_type();
79
80 let backup_group = BackupGroup::new(backup_type, backup_id);
81 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
82 let backup_dir = BackupDir::new_with_group(backup_group, backup_time);
83
84 if let Some(last) = &last_backup {
85 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
86 bail!("backup timestamp is older than last backup.");
87 }
88 // fixme: abort if last backup is still running - howto test?
89 // Idea: write upid into a file inside snapshot dir. then test if
90 // it is still running here.
91 }
92
93 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
94 if !is_new { bail!("backup directorty already exists."); }
95
96 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
97 let mut env = BackupEnvironment::new(
98 env_type, username.clone(), worker.clone(), datastore, backup_dir);
99
100 env.debug = debug;
101 env.last_backup = last_backup;
102
103 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
104
105 let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_API_ROUTER, debug);
106
107 let abort_future = worker.abort_future();
108
109 let env2 = env.clone();
110
111 let mut req_fut = req_body
112 .on_upgrade()
113 .map_err(Error::from)
114 .and_then(move |conn| {
115 env2.debug("protocol upgrade done");
116
117 let mut http = hyper::server::conn::Http::new();
118 http.http2_only(true);
119 // increase window size: todo - find optiomal size
120 let window_size = 32*1024*1024; // max = (1 << 31) - 2
121 http.http2_initial_stream_window_size(window_size);
122 http.http2_initial_connection_window_size(window_size);
123
124 http.serve_connection(conn, service)
125 .map_err(Error::from)
126 });
127 let mut abort_future = abort_future
128 .map(|_| Err(format_err!("task aborted")));
129
130 async move {
131 let res = select!{
132 req = req_fut => req,
133 abrt = abort_future => abrt,
134 };
135
136 match (res, env.ensure_finished()) {
137 (Ok(_), Ok(())) => {
138 env.log("backup finished sucessfully");
139 Ok(())
140 },
141 (Err(err), Ok(())) => {
142 // ignore errors after finish
143 env.log(format!("backup had errors but finished: {}", err));
144 Ok(())
145 },
146 (Ok(_), Err(err)) => {
147 env.log(format!("backup ended and finish failed: {}", err));
148 env.log("removing unfinished backup");
149 env.remove_backup()?;
150 Err(err)
151 },
152 (Err(err), Err(_)) => {
153 env.log(format!("backup failed: {}", err));
154 env.log("removing failed backup");
155 env.remove_backup()?;
156 Err(err)
157 },
158 }
159 }
160 })?;
161
162 let response = Response::builder()
163 .status(StatusCode::SWITCHING_PROTOCOLS)
164 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
165 .body(Body::empty())?;
166
167 Ok(response)
168 }.boxed()
169 }
170
171 pub const BACKUP_API_SUBDIRS: SubdirMap = &[
172 (
173 "blob", &Router::new()
174 .upload(&API_METHOD_UPLOAD_BLOB)
175 ),
176 (
177 "dynamic_chunk", &Router::new()
178 .upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK)
179 ),
180 (
181 "dynamic_close", &Router::new()
182 .post(&API_METHOD_CLOSE_DYNAMIC_INDEX)
183 ),
184 (
185 "dynamic_index", &Router::new()
186 .download(&API_METHOD_DYNAMIC_CHUNK_INDEX)
187 .post(&API_METHOD_CREATE_DYNAMIC_INDEX)
188 .put(&API_METHOD_DYNAMIC_APPEND)
189 ),
190 (
191 "finish", &Router::new()
192 .post(
193 &ApiMethod::new(
194 &ApiHandler::Sync(&finish_backup),
195 &ObjectSchema::new("Mark backup as finished.", &[])
196 )
197 )
198 ),
199 (
200 "fixed_chunk", &Router::new()
201 .upload(&API_METHOD_UPLOAD_FIXED_CHUNK)
202 ),
203 (
204 "fixed_close", &Router::new()
205 .post(&API_METHOD_CLOSE_FIXED_INDEX)
206 ),
207 (
208 "fixed_index", &Router::new()
209 .download(&API_METHOD_FIXED_CHUNK_INDEX)
210 .post(&API_METHOD_CREATE_FIXED_INDEX)
211 .put(&API_METHOD_FIXED_APPEND)
212 ),
213 (
214 "speedtest", &Router::new()
215 .upload(&API_METHOD_UPLOAD_SPEEDTEST)
216 ),
217 ];
218
219 pub const BACKUP_API_ROUTER: Router = Router::new()
220 .get(&list_subdirs_api_method!(BACKUP_API_SUBDIRS))
221 .subdirs(BACKUP_API_SUBDIRS);
222
223 #[sortable]
224 pub const API_METHOD_CREATE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
225 &ApiHandler::Sync(&create_dynamic_index),
226 &ObjectSchema::new(
227 "Create dynamic chunk index file.",
228 &sorted!([
229 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
230 ]),
231 )
232 );
233
234 fn create_dynamic_index(
235 param: Value,
236 _info: &ApiMethod,
237 rpcenv: &mut dyn RpcEnvironment,
238 ) -> Result<Value, Error> {
239
240 let env: &BackupEnvironment = rpcenv.as_ref();
241
242 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
243
244 let archive_name = name.clone();
245 if !archive_name.ends_with(".didx") {
246 bail!("wrong archive extension: '{}'", archive_name);
247 }
248
249 let mut path = env.backup_dir.relative_path();
250 path.push(archive_name);
251
252 let index = env.datastore.create_dynamic_writer(&path)?;
253 let wid = env.register_dynamic_writer(index, name)?;
254
255 env.log(format!("created new dynamic index {} ({:?})", wid, path));
256
257 Ok(json!(wid))
258 }
259
260 #[sortable]
261 pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
262 &ApiHandler::Sync(&create_fixed_index),
263 &ObjectSchema::new(
264 "Create fixed chunk index file.",
265 &sorted!([
266 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
267 ("size", false, &IntegerSchema::new("File size.")
268 .minimum(1)
269 .schema()
270 ),
271 ]),
272 )
273 );
274
275 fn create_fixed_index(
276 param: Value,
277 _info: &ApiMethod,
278 rpcenv: &mut dyn RpcEnvironment,
279 ) -> Result<Value, Error> {
280
281 let env: &BackupEnvironment = rpcenv.as_ref();
282
283 println!("PARAM: {:?}", param);
284
285 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
286 let size = tools::required_integer_param(&param, "size")? as usize;
287
288 let archive_name = name.clone();
289 if !archive_name.ends_with(".fidx") {
290 bail!("wrong archive extension: '{}'", archive_name);
291 }
292
293 let mut path = env.backup_dir.relative_path();
294 path.push(archive_name);
295
296 let chunk_size = 4096*1024; // todo: ??
297
298 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
299 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
300
301 env.log(format!("created new fixed index {} ({:?})", wid, path));
302
303 Ok(json!(wid))
304 }
305
306 #[sortable]
307 pub const API_METHOD_DYNAMIC_APPEND: ApiMethod = ApiMethod::new(
308 &ApiHandler::Sync(&dynamic_append),
309 &ObjectSchema::new(
310 "Append chunk to dynamic index writer.",
311 &sorted!([
312 (
313 "wid",
314 false,
315 &IntegerSchema::new("Dynamic writer ID.")
316 .minimum(1)
317 .maximum(256)
318 .schema()
319 ),
320 (
321 "digest-list",
322 false,
323 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
324 ),
325 (
326 "offset-list",
327 false,
328 &ArraySchema::new(
329 "Chunk offset list.",
330 &IntegerSchema::new("Corresponding chunk offsets.")
331 .minimum(0)
332 .schema()
333 ).schema()
334 ),
335 ]),
336 )
337 );
338
339 fn dynamic_append (
340 param: Value,
341 _info: &ApiMethod,
342 rpcenv: &mut dyn RpcEnvironment,
343 ) -> Result<Value, Error> {
344
345 let wid = tools::required_integer_param(&param, "wid")? as usize;
346 let digest_list = tools::required_array_param(&param, "digest-list")?;
347 let offset_list = tools::required_array_param(&param, "offset-list")?;
348
349 if offset_list.len() != digest_list.len() {
350 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
351 }
352
353 let env: &BackupEnvironment = rpcenv.as_ref();
354
355 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
356
357 for (i, item) in digest_list.iter().enumerate() {
358 let digest_str = item.as_str().unwrap();
359 let digest = proxmox::tools::hex_to_digest(digest_str)?;
360 let offset = offset_list[i].as_u64().unwrap();
361 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
362
363 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
364
365 env.debug(format!("sucessfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
366 }
367
368 Ok(Value::Null)
369 }
370
371 #[sortable]
372 pub const API_METHOD_FIXED_APPEND: ApiMethod = ApiMethod::new(
373 &ApiHandler::Sync(&fixed_append),
374 &ObjectSchema::new(
375 "Append chunk to fixed index writer.",
376 &sorted!([
377 (
378 "wid",
379 false,
380 &IntegerSchema::new("Fixed writer ID.")
381 .minimum(1)
382 .maximum(256)
383 .schema()
384 ),
385 (
386 "digest-list",
387 false,
388 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
389 ),
390 (
391 "offset-list",
392 false,
393 &ArraySchema::new(
394 "Chunk offset list.",
395 &IntegerSchema::new("Corresponding chunk offsets.")
396 .minimum(0)
397 .schema()
398 ).schema()
399 )
400 ]),
401 )
402 );
403
404 fn fixed_append (
405 param: Value,
406 _info: &ApiMethod,
407 rpcenv: &mut dyn RpcEnvironment,
408 ) -> Result<Value, Error> {
409
410 let wid = tools::required_integer_param(&param, "wid")? as usize;
411 let digest_list = tools::required_array_param(&param, "digest-list")?;
412 let offset_list = tools::required_array_param(&param, "offset-list")?;
413
414 if offset_list.len() != digest_list.len() {
415 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
416 }
417
418 let env: &BackupEnvironment = rpcenv.as_ref();
419
420 env.debug(format!("fixed_append {} chunks", digest_list.len()));
421
422 for (i, item) in digest_list.iter().enumerate() {
423 let digest_str = item.as_str().unwrap();
424 let digest = proxmox::tools::hex_to_digest(digest_str)?;
425 let offset = offset_list[i].as_u64().unwrap();
426 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
427
428 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
429
430 env.debug(format!("sucessfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
431 }
432
433 Ok(Value::Null)
434 }
435
436 #[sortable]
437 pub const API_METHOD_CLOSE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
438 &ApiHandler::Sync(&close_dynamic_index),
439 &ObjectSchema::new(
440 "Close dynamic index writer.",
441 &sorted!([
442 (
443 "wid",
444 false,
445 &IntegerSchema::new("Dynamic writer ID.")
446 .minimum(1)
447 .maximum(256)
448 .schema()
449 ),
450 (
451 "chunk-count",
452 false,
453 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
454 .minimum(1)
455 .schema()
456 ),
457 (
458 "size",
459 false,
460 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
461 .minimum(1)
462 .schema()
463 ),
464 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
465 ]),
466 )
467 );
468
469 fn close_dynamic_index (
470 param: Value,
471 _info: &ApiMethod,
472 rpcenv: &mut dyn RpcEnvironment,
473 ) -> Result<Value, Error> {
474
475 let wid = tools::required_integer_param(&param, "wid")? as usize;
476 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
477 let size = tools::required_integer_param(&param, "size")? as u64;
478 let csum_str = tools::required_string_param(&param, "csum")?;
479 let csum = proxmox::tools::hex_to_digest(csum_str)?;
480
481 let env: &BackupEnvironment = rpcenv.as_ref();
482
483 env.dynamic_writer_close(wid, chunk_count, size, csum)?;
484
485 env.log(format!("sucessfully closed dynamic index {}", wid));
486
487 Ok(Value::Null)
488 }
489
490 #[sortable]
491 pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
492 &ApiHandler::Sync(&close_fixed_index),
493 &ObjectSchema::new(
494 "Close fixed index writer.",
495 &sorted!([
496 (
497 "wid",
498 false,
499 &IntegerSchema::new("Fixed writer ID.")
500 .minimum(1)
501 .maximum(256)
502 .schema()
503 ),
504 (
505 "chunk-count",
506 false,
507 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
508 .minimum(1)
509 .schema()
510 ),
511 (
512 "size",
513 false,
514 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
515 .minimum(1)
516 .schema()
517 ),
518 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
519 ]),
520 )
521 );
522
523 fn close_fixed_index (
524 param: Value,
525 _info: &ApiMethod,
526 rpcenv: &mut dyn RpcEnvironment,
527 ) -> Result<Value, Error> {
528
529 let wid = tools::required_integer_param(&param, "wid")? as usize;
530 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
531 let size = tools::required_integer_param(&param, "size")? as u64;
532 let csum_str = tools::required_string_param(&param, "csum")?;
533 let csum = proxmox::tools::hex_to_digest(csum_str)?;
534
535 let env: &BackupEnvironment = rpcenv.as_ref();
536
537 env.fixed_writer_close(wid, chunk_count, size, csum)?;
538
539 env.log(format!("sucessfully closed fixed index {}", wid));
540
541 Ok(Value::Null)
542 }
543
544 fn finish_backup (
545 _param: Value,
546 _info: &ApiMethod,
547 rpcenv: &mut dyn RpcEnvironment,
548 ) -> Result<Value, Error> {
549
550 let env: &BackupEnvironment = rpcenv.as_ref();
551
552 env.finish_backup()?;
553 env.log("sucessfully finished backup");
554
555 Ok(Value::Null)
556 }
557
558 #[sortable]
559 pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
560 &ApiHandler::AsyncHttp(&dynamic_chunk_index),
561 &ObjectSchema::new(
562 r###"
563 Download the dynamic chunk index from the previous backup.
564 Simply returns an empty list if this is the first backup.
565 "### ,
566 &sorted!([
567 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
568 ]),
569 )
570 );
571
572 fn dynamic_chunk_index(
573 _parts: Parts,
574 _req_body: Body,
575 param: Value,
576 _info: &ApiMethod,
577 rpcenv: Box<dyn RpcEnvironment>,
578 ) -> ApiResponseFuture {
579
580 async move {
581 let env: &BackupEnvironment = rpcenv.as_ref();
582
583 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
584
585 if !archive_name.ends_with(".didx") {
586 bail!("wrong archive extension: '{}'", archive_name);
587 }
588
589 let empty_response = {
590 Response::builder()
591 .status(StatusCode::OK)
592 .body(Body::empty())?
593 };
594
595 let last_backup = match &env.last_backup {
596 Some(info) => info,
597 None => return Ok(empty_response),
598 };
599
600 let mut path = last_backup.backup_dir.relative_path();
601 path.push(&archive_name);
602
603 let index = match env.datastore.open_dynamic_reader(path) {
604 Ok(index) => index,
605 Err(_) => {
606 env.log(format!("there is no last backup for archive '{}'", archive_name));
607 return Ok(empty_response);
608 }
609 };
610
611 env.log(format!("download last backup index for archive '{}'", archive_name));
612
613 let count = index.index_count();
614 for pos in 0..count {
615 let (start, end, digest) = index.chunk_info(pos)?;
616 let size = (end - start) as u32;
617 env.register_chunk(digest, size)?;
618 }
619
620 let reader = DigestListEncoder::new(Box::new(index));
621
622 let stream = WrappedReaderStream::new(reader);
623
624 // fixme: set size, content type?
625 let response = http::Response::builder()
626 .status(200)
627 .body(Body::wrap_stream(stream))?;
628
629 Ok(response)
630 }.boxed()
631 }
632
633 #[sortable]
634 pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
635 &ApiHandler::AsyncHttp(&fixed_chunk_index),
636 &ObjectSchema::new(
637 r###"
638 Download the fixed chunk index from the previous backup.
639 Simply returns an empty list if this is the first backup.
640 "### ,
641 &sorted!([
642 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
643 ]),
644 )
645 );
646
647 fn fixed_chunk_index(
648 _parts: Parts,
649 _req_body: Body,
650 param: Value,
651 _info: &ApiMethod,
652 rpcenv: Box<dyn RpcEnvironment>,
653 ) -> ApiResponseFuture {
654
655 async move {
656 let env: &BackupEnvironment = rpcenv.as_ref();
657
658 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
659
660 if !archive_name.ends_with(".fidx") {
661 bail!("wrong archive extension: '{}'", archive_name);
662 }
663
664 let empty_response = {
665 Response::builder()
666 .status(StatusCode::OK)
667 .body(Body::empty())?
668 };
669
670 let last_backup = match &env.last_backup {
671 Some(info) => info,
672 None => return Ok(empty_response),
673 };
674
675 let mut path = last_backup.backup_dir.relative_path();
676 path.push(&archive_name);
677
678 let index = match env.datastore.open_fixed_reader(path) {
679 Ok(index) => index,
680 Err(_) => {
681 env.log(format!("there is no last backup for archive '{}'", archive_name));
682 return Ok(empty_response);
683 }
684 };
685
686 env.log(format!("download last backup index for archive '{}'", archive_name));
687
688 let count = index.index_count();
689 let image_size = index.index_bytes();
690 for pos in 0..count {
691 let digest = index.index_digest(pos).unwrap();
692 // Note: last chunk can be smaller
693 let start = (pos*index.chunk_size) as u64;
694 let mut end = start + index.chunk_size as u64;
695 if end > image_size { end = image_size; }
696 let size = (end - start) as u32;
697 env.register_chunk(*digest, size)?;
698 }
699
700 let reader = DigestListEncoder::new(Box::new(index));
701
702 let stream = WrappedReaderStream::new(reader);
703
704 // fixme: set size, content type?
705 let response = http::Response::builder()
706 .status(200)
707 .body(Body::wrap_stream(stream))?;
708
709 Ok(response)
710 }.boxed()
711 }