]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/mod.rs
sync job: fix worker ID parsing
[proxmox-backup.git] / src / api2 / backup / mod.rs
1 //! Backup protocol (HTTP2 upgrade)
2
3 use anyhow::{bail, format_err, Error};
4 use futures::*;
5 use hex::FromHex;
6 use hyper::header::{HeaderValue, UPGRADE};
7 use hyper::http::request::Parts;
8 use hyper::{Body, Request, Response, StatusCode};
9 use serde::Deserialize;
10 use serde_json::{json, Value};
11
12 use proxmox_router::list_subdirs_api_method;
13 use proxmox_router::{
14 ApiHandler, ApiMethod, ApiResponseFuture, Permission, Router, RpcEnvironment, SubdirMap,
15 };
16 use proxmox_schema::*;
17 use proxmox_sys::sortable;
18
19 use pbs_api_types::{
20 Authid, BackupNamespace, BackupType, Operation, SnapshotVerifyState, VerifyState,
21 BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
22 BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP,
23 };
24 use pbs_config::CachedUserInfo;
25 use pbs_datastore::index::IndexFile;
26 use pbs_datastore::manifest::{archive_type, ArchiveType};
27 use pbs_datastore::{DataStore, PROXMOX_BACKUP_PROTOCOL_ID_V1};
28 use pbs_tools::json::{required_array_param, required_integer_param, required_string_param};
29 use proxmox_rest_server::{H2Service, WorkerTask};
30 use proxmox_sys::fs::lock_dir_noblock_shared;
31
32 mod environment;
33 use environment::*;
34
35 mod upload_chunk;
36 use upload_chunk::*;
37
38 pub const ROUTER: Router = Router::new().upgrade(&API_METHOD_UPGRADE_BACKUP);
39
40 #[sortable]
41 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
42 &ApiHandler::AsyncHttp(&upgrade_to_backup_protocol),
43 &ObjectSchema::new(
44 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
45 &sorted!([
46 ("store", false, &DATASTORE_SCHEMA),
47 ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
48 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
49 ("backup-id", false, &BACKUP_ID_SCHEMA),
50 ("backup-time", false, &BACKUP_TIME_SCHEMA),
51 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
52 ("benchmark", true, &BooleanSchema::new("Job is a benchmark (do not keep data).").schema()),
53 ]),
54 )
55 ).access(
56 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
57 Some("Requires on /datastore/{store}[/{namespace}] DATASTORE_BACKUP and being the owner of the group"),
58 &Permission::Anybody
59 );
60
61 pub(crate) fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error> {
62 match param.get("ns") {
63 Some(Value::String(ns)) => ns.parse(),
64 None => Ok(BackupNamespace::root()),
65 _ => bail!("invalid ns parameter"),
66 }
67 }
68
69 fn upgrade_to_backup_protocol(
70 parts: Parts,
71 req_body: Body,
72 param: Value,
73 _info: &ApiMethod,
74 rpcenv: Box<dyn RpcEnvironment>,
75 ) -> ApiResponseFuture {
76 async move {
77 let debug = param["debug"].as_bool().unwrap_or(false);
78 let benchmark = param["benchmark"].as_bool().unwrap_or(false);
79
80 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
81
82 let store = required_string_param(&param, "store")?.to_owned();
83 let backup_ns = optional_ns_param(&param)?;
84 let backup_dir_arg = pbs_api_types::BackupDir::deserialize(&param)?;
85
86 let user_info = CachedUserInfo::new()?;
87
88 let privs = if backup_ns.is_root() {
89 user_info.lookup_privs(&auth_id, &["datastore", &store])
90 } else {
91 user_info.lookup_privs(&auth_id, &["datastore", &store, &backup_ns.to_string()])
92 };
93 if privs & PRIV_DATASTORE_BACKUP == 0 {
94 proxmox_router::http_bail!(FORBIDDEN, "permission check failed");
95 }
96
97 let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
98
99 let protocols = parts
100 .headers
101 .get("UPGRADE")
102 .ok_or_else(|| format_err!("missing Upgrade header"))?
103 .to_str()?;
104
105 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
106 bail!("invalid protocol name");
107 }
108
109 if parts.version >= http::version::Version::HTTP_2 {
110 bail!(
111 "unexpected http version '{:?}' (expected version < 2)",
112 parts.version
113 );
114 }
115
116 if !datastore.namespace_path(&backup_ns).exists() {
117 proxmox_router::http_bail!(NOT_FOUND, "namespace not found");
118 }
119
120 // FIXME: include namespace here?
121 let worker_id = format!("{}:{}/{}", store, backup_dir_arg.ty(), backup_dir_arg.id());
122
123 let env_type = rpcenv.env_type();
124
125 let backup_group = datastore.backup_group(backup_ns, backup_dir_arg.group.clone());
126
127 let worker_type = if backup_group.backup_type() == BackupType::Host
128 && backup_group.backup_id() == "benchmark"
129 {
130 if !benchmark {
131 bail!("unable to run benchmark without --benchmark flags");
132 }
133 "benchmark"
134 } else {
135 if benchmark {
136 bail!("benchmark flags is only allowed on 'host/benchmark'");
137 }
138 "backup"
139 };
140
141 // lock backup group to only allow one backup per group at a time
142 let (owner, _group_guard) = datastore.create_locked_backup_group(
143 backup_group.backup_ns(),
144 backup_group.as_ref(),
145 &auth_id,
146 )?;
147
148 // permission check
149 let correct_owner =
150 owner == auth_id || (owner.is_token() && Authid::from(owner.user().clone()) == auth_id);
151 if !correct_owner && worker_type != "benchmark" {
152 // only the owner is allowed to create additional snapshots
153 bail!("backup owner check failed ({} != {})", auth_id, owner);
154 }
155
156 let last_backup = {
157 let info = backup_group.last_backup(true).unwrap_or(None);
158 if let Some(info) = info {
159 let (manifest, _) = info.backup_dir.load_manifest()?;
160 let verify = manifest.unprotected["verify_state"].clone();
161 match serde_json::from_value::<SnapshotVerifyState>(verify) {
162 Ok(verify) => match verify.state {
163 VerifyState::Ok => Some(info),
164 VerifyState::Failed => None,
165 },
166 Err(_) => {
167 // no verify state found, treat as valid
168 Some(info)
169 }
170 }
171 } else {
172 None
173 }
174 };
175
176 let backup_dir = backup_group.backup_dir(backup_dir_arg.time)?;
177
178 let _last_guard = if let Some(last) = &last_backup {
179 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
180 bail!("backup timestamp is older than last backup.");
181 }
182
183 // lock last snapshot to prevent forgetting/pruning it during backup
184 let full_path = last.backup_dir.full_path();
185 Some(lock_dir_noblock_shared(
186 &full_path,
187 "snapshot",
188 "base snapshot is already locked by another operation",
189 )?)
190 } else {
191 None
192 };
193
194 let (path, is_new, snap_guard) =
195 datastore.create_locked_backup_dir(backup_dir.backup_ns(), backup_dir.as_ref())?;
196 if !is_new {
197 bail!("backup directory already exists.");
198 }
199
200 WorkerTask::spawn(
201 worker_type,
202 Some(worker_id),
203 auth_id.to_string(),
204 true,
205 move |worker| {
206 let mut env = BackupEnvironment::new(
207 env_type,
208 auth_id,
209 worker.clone(),
210 datastore,
211 backup_dir,
212 );
213
214 env.debug = debug;
215 env.last_backup = last_backup;
216
217 env.log(format!(
218 "starting new {} on datastore '{}': {:?}",
219 worker_type, store, path
220 ));
221
222 let service =
223 H2Service::new(env.clone(), worker.clone(), &BACKUP_API_ROUTER, debug);
224
225 let abort_future = worker.abort_future();
226
227 let env2 = env.clone();
228
229 let mut req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
230 .map_err(Error::from)
231 .and_then(move |conn| {
232 env2.debug("protocol upgrade done");
233
234 let mut http = hyper::server::conn::Http::new();
235 http.http2_only(true);
236 // increase window size: todo - find optiomal size
237 let window_size = 32 * 1024 * 1024; // max = (1 << 31) - 2
238 http.http2_initial_stream_window_size(window_size);
239 http.http2_initial_connection_window_size(window_size);
240 http.http2_max_frame_size(4 * 1024 * 1024);
241
242 let env3 = env2.clone();
243 http.serve_connection(conn, service).map(move |result| {
244 match result {
245 Err(err) => {
246 // Avoid Transport endpoint is not connected (os error 107)
247 // fixme: find a better way to test for that error
248 if err.to_string().starts_with("connection error")
249 && env3.finished()
250 {
251 Ok(())
252 } else {
253 Err(Error::from(err))
254 }
255 }
256 Ok(()) => Ok(()),
257 }
258 })
259 });
260 let mut abort_future = abort_future.map(|_| Err(format_err!("task aborted")));
261
262 async move {
263 // keep flock until task ends
264 let _group_guard = _group_guard;
265 let snap_guard = snap_guard;
266 let _last_guard = _last_guard;
267
268 let res = select! {
269 req = req_fut => req,
270 abrt = abort_future => abrt,
271 };
272 if benchmark {
273 env.log("benchmark finished successfully");
274 proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
275 return Ok(());
276 }
277
278 let verify = |env: BackupEnvironment| {
279 if let Err(err) = env.verify_after_complete(snap_guard) {
280 env.log(format!(
281 "backup finished, but starting the requested verify task failed: {}",
282 err
283 ));
284 }
285 };
286
287 match (res, env.ensure_finished()) {
288 (Ok(_), Ok(())) => {
289 env.log("backup finished successfully");
290 verify(env);
291 Ok(())
292 }
293 (Err(err), Ok(())) => {
294 // ignore errors after finish
295 env.log(format!("backup had errors but finished: {}", err));
296 verify(env);
297 Ok(())
298 }
299 (Ok(_), Err(err)) => {
300 env.log(format!("backup ended and finish failed: {}", err));
301 env.log("removing unfinished backup");
302 proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
303 Err(err)
304 }
305 (Err(err), Err(_)) => {
306 env.log(format!("backup failed: {}", err));
307 env.log("removing failed backup");
308 proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
309 Err(err)
310 }
311 }
312 }
313 },
314 )?;
315
316 let response = Response::builder()
317 .status(StatusCode::SWITCHING_PROTOCOLS)
318 .header(
319 UPGRADE,
320 HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()),
321 )
322 .body(Body::empty())?;
323
324 Ok(response)
325 }
326 .boxed()
327 }
328
329 const BACKUP_API_SUBDIRS: SubdirMap = &[
330 ("blob", &Router::new().upload(&API_METHOD_UPLOAD_BLOB)),
331 (
332 "dynamic_chunk",
333 &Router::new().upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK),
334 ),
335 (
336 "dynamic_close",
337 &Router::new().post(&API_METHOD_CLOSE_DYNAMIC_INDEX),
338 ),
339 (
340 "dynamic_index",
341 &Router::new()
342 .post(&API_METHOD_CREATE_DYNAMIC_INDEX)
343 .put(&API_METHOD_DYNAMIC_APPEND),
344 ),
345 (
346 "finish",
347 &Router::new().post(&ApiMethod::new(
348 &ApiHandler::Sync(&finish_backup),
349 &ObjectSchema::new("Mark backup as finished.", &[]),
350 )),
351 ),
352 (
353 "fixed_chunk",
354 &Router::new().upload(&API_METHOD_UPLOAD_FIXED_CHUNK),
355 ),
356 (
357 "fixed_close",
358 &Router::new().post(&API_METHOD_CLOSE_FIXED_INDEX),
359 ),
360 (
361 "fixed_index",
362 &Router::new()
363 .post(&API_METHOD_CREATE_FIXED_INDEX)
364 .put(&API_METHOD_FIXED_APPEND),
365 ),
366 (
367 "previous",
368 &Router::new().download(&API_METHOD_DOWNLOAD_PREVIOUS),
369 ),
370 (
371 "previous_backup_time",
372 &Router::new().get(&API_METHOD_GET_PREVIOUS_BACKUP_TIME),
373 ),
374 (
375 "speedtest",
376 &Router::new().upload(&API_METHOD_UPLOAD_SPEEDTEST),
377 ),
378 ];
379
380 pub const BACKUP_API_ROUTER: Router = Router::new()
381 .get(&list_subdirs_api_method!(BACKUP_API_SUBDIRS))
382 .subdirs(BACKUP_API_SUBDIRS);
383
384 #[sortable]
385 pub const API_METHOD_CREATE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
386 &ApiHandler::Sync(&create_dynamic_index),
387 &ObjectSchema::new(
388 "Create dynamic chunk index file.",
389 &sorted!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),]),
390 ),
391 );
392
393 fn create_dynamic_index(
394 param: Value,
395 _info: &ApiMethod,
396 rpcenv: &mut dyn RpcEnvironment,
397 ) -> Result<Value, Error> {
398 let env: &BackupEnvironment = rpcenv.as_ref();
399
400 let name = required_string_param(&param, "archive-name")?.to_owned();
401
402 let archive_name = name.clone();
403 if !archive_name.ends_with(".didx") {
404 bail!("wrong archive extension: '{}'", archive_name);
405 }
406
407 let mut path = env.backup_dir.relative_path();
408 path.push(archive_name);
409
410 let index = env.datastore.create_dynamic_writer(&path)?;
411 let wid = env.register_dynamic_writer(index, name)?;
412
413 env.log(format!("created new dynamic index {} ({:?})", wid, path));
414
415 Ok(json!(wid))
416 }
417
418 #[sortable]
419 pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
420 &ApiHandler::Sync(&create_fixed_index),
421 &ObjectSchema::new(
422 "Create fixed chunk index file.",
423 &sorted!([
424 ("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),
425 (
426 "size",
427 false,
428 &IntegerSchema::new("File size.").minimum(1).schema()
429 ),
430 (
431 "reuse-csum",
432 true,
433 &StringSchema::new(
434 "If set, compare last backup's \
435 csum and reuse index for incremental backup if it matches."
436 )
437 .schema()
438 ),
439 ]),
440 ),
441 );
442
443 fn create_fixed_index(
444 param: Value,
445 _info: &ApiMethod,
446 rpcenv: &mut dyn RpcEnvironment,
447 ) -> Result<Value, Error> {
448 let env: &BackupEnvironment = rpcenv.as_ref();
449
450 let name = required_string_param(&param, "archive-name")?.to_owned();
451 let size = required_integer_param(&param, "size")? as usize;
452 let reuse_csum = param["reuse-csum"].as_str();
453
454 let archive_name = name.clone();
455 if !archive_name.ends_with(".fidx") {
456 bail!("wrong archive extension: '{}'", archive_name);
457 }
458
459 let mut path = env.backup_dir.relative_path();
460 path.push(&archive_name);
461
462 let chunk_size = 4096 * 1024; // todo: ??
463
464 // do incremental backup if csum is set
465 let mut reader = None;
466 let mut incremental = false;
467 if let Some(csum) = reuse_csum {
468 incremental = true;
469 let last_backup = match &env.last_backup {
470 Some(info) => info,
471 None => {
472 bail!("cannot reuse index - no valid previous backup exists");
473 }
474 };
475
476 let mut last_path = last_backup.backup_dir.relative_path();
477 last_path.push(&archive_name);
478
479 let index = match env.datastore.open_fixed_reader(last_path) {
480 Ok(index) => index,
481 Err(_) => {
482 bail!("cannot reuse index - no previous backup exists for archive");
483 }
484 };
485
486 let (old_csum, _) = index.compute_csum();
487 let old_csum = hex::encode(&old_csum);
488 if old_csum != csum {
489 bail!(
490 "expected csum ({}) doesn't match last backup's ({}), cannot do incremental backup",
491 csum,
492 old_csum
493 );
494 }
495
496 reader = Some(index);
497 }
498
499 let mut writer = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
500
501 if let Some(reader) = reader {
502 writer.clone_data_from(&reader)?;
503 }
504
505 let wid = env.register_fixed_writer(writer, name, size, chunk_size as u32, incremental)?;
506
507 env.log(format!("created new fixed index {} ({:?})", wid, path));
508
509 Ok(json!(wid))
510 }
511
512 #[sortable]
513 pub const API_METHOD_DYNAMIC_APPEND: ApiMethod = ApiMethod::new(
514 &ApiHandler::Sync(&dynamic_append),
515 &ObjectSchema::new(
516 "Append chunk to dynamic index writer.",
517 &sorted!([
518 (
519 "wid",
520 false,
521 &IntegerSchema::new("Dynamic writer ID.")
522 .minimum(1)
523 .maximum(256)
524 .schema()
525 ),
526 (
527 "digest-list",
528 false,
529 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
530 ),
531 (
532 "offset-list",
533 false,
534 &ArraySchema::new(
535 "Chunk offset list.",
536 &IntegerSchema::new("Corresponding chunk offsets.")
537 .minimum(0)
538 .schema()
539 )
540 .schema()
541 ),
542 ]),
543 ),
544 );
545
546 fn dynamic_append(
547 param: Value,
548 _info: &ApiMethod,
549 rpcenv: &mut dyn RpcEnvironment,
550 ) -> Result<Value, Error> {
551 let wid = required_integer_param(&param, "wid")? as usize;
552 let digest_list = required_array_param(&param, "digest-list")?;
553 let offset_list = required_array_param(&param, "offset-list")?;
554
555 if offset_list.len() != digest_list.len() {
556 bail!(
557 "offset list has wrong length ({} != {})",
558 offset_list.len(),
559 digest_list.len()
560 );
561 }
562
563 let env: &BackupEnvironment = rpcenv.as_ref();
564
565 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
566
567 for (i, item) in digest_list.iter().enumerate() {
568 let digest_str = item.as_str().unwrap();
569 let digest = <[u8; 32]>::from_hex(digest_str)?;
570 let offset = offset_list[i].as_u64().unwrap();
571 let size = env
572 .lookup_chunk(&digest)
573 .ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
574
575 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
576
577 env.debug(format!(
578 "successfully added chunk {} to dynamic index {} (offset {}, size {})",
579 digest_str, wid, offset, size
580 ));
581 }
582
583 Ok(Value::Null)
584 }
585
586 #[sortable]
587 pub const API_METHOD_FIXED_APPEND: ApiMethod = ApiMethod::new(
588 &ApiHandler::Sync(&fixed_append),
589 &ObjectSchema::new(
590 "Append chunk to fixed index writer.",
591 &sorted!([
592 (
593 "wid",
594 false,
595 &IntegerSchema::new("Fixed writer ID.")
596 .minimum(1)
597 .maximum(256)
598 .schema()
599 ),
600 (
601 "digest-list",
602 false,
603 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
604 ),
605 (
606 "offset-list",
607 false,
608 &ArraySchema::new(
609 "Chunk offset list.",
610 &IntegerSchema::new("Corresponding chunk offsets.")
611 .minimum(0)
612 .schema()
613 )
614 .schema()
615 )
616 ]),
617 ),
618 );
619
620 fn fixed_append(
621 param: Value,
622 _info: &ApiMethod,
623 rpcenv: &mut dyn RpcEnvironment,
624 ) -> Result<Value, Error> {
625 let wid = required_integer_param(&param, "wid")? as usize;
626 let digest_list = required_array_param(&param, "digest-list")?;
627 let offset_list = required_array_param(&param, "offset-list")?;
628
629 if offset_list.len() != digest_list.len() {
630 bail!(
631 "offset list has wrong length ({} != {})",
632 offset_list.len(),
633 digest_list.len()
634 );
635 }
636
637 let env: &BackupEnvironment = rpcenv.as_ref();
638
639 env.debug(format!("fixed_append {} chunks", digest_list.len()));
640
641 for (i, item) in digest_list.iter().enumerate() {
642 let digest_str = item.as_str().unwrap();
643 let digest = <[u8; 32]>::from_hex(digest_str)?;
644 let offset = offset_list[i].as_u64().unwrap();
645 let size = env
646 .lookup_chunk(&digest)
647 .ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
648
649 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
650
651 env.debug(format!(
652 "successfully added chunk {} to fixed index {} (offset {}, size {})",
653 digest_str, wid, offset, size
654 ));
655 }
656
657 Ok(Value::Null)
658 }
659
660 #[sortable]
661 pub const API_METHOD_CLOSE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
662 &ApiHandler::Sync(&close_dynamic_index),
663 &ObjectSchema::new(
664 "Close dynamic index writer.",
665 &sorted!([
666 (
667 "wid",
668 false,
669 &IntegerSchema::new("Dynamic writer ID.")
670 .minimum(1)
671 .maximum(256)
672 .schema()
673 ),
674 (
675 "chunk-count",
676 false,
677 &IntegerSchema::new(
678 "Chunk count. This is used to verify that the server got all chunks."
679 )
680 .minimum(1)
681 .schema()
682 ),
683 (
684 "size",
685 false,
686 &IntegerSchema::new(
687 "File size. This is used to verify that the server got all data."
688 )
689 .minimum(1)
690 .schema()
691 ),
692 (
693 "csum",
694 false,
695 &StringSchema::new("Digest list checksum.").schema()
696 ),
697 ]),
698 ),
699 );
700
701 fn close_dynamic_index(
702 param: Value,
703 _info: &ApiMethod,
704 rpcenv: &mut dyn RpcEnvironment,
705 ) -> Result<Value, Error> {
706 let wid = required_integer_param(&param, "wid")? as usize;
707 let chunk_count = required_integer_param(&param, "chunk-count")? as u64;
708 let size = required_integer_param(&param, "size")? as u64;
709 let csum_str = required_string_param(&param, "csum")?;
710 let csum = <[u8; 32]>::from_hex(csum_str)?;
711
712 let env: &BackupEnvironment = rpcenv.as_ref();
713
714 env.dynamic_writer_close(wid, chunk_count, size, csum)?;
715
716 env.log(format!("successfully closed dynamic index {}", wid));
717
718 Ok(Value::Null)
719 }
720
721 #[sortable]
722 pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
723 &ApiHandler::Sync(&close_fixed_index),
724 &ObjectSchema::new(
725 "Close fixed index writer.",
726 &sorted!([
727 (
728 "wid",
729 false,
730 &IntegerSchema::new("Fixed writer ID.")
731 .minimum(1)
732 .maximum(256)
733 .schema()
734 ),
735 (
736 "chunk-count",
737 false,
738 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks. Ignored for incremental backups.")
739 .minimum(0)
740 .schema()
741 ),
742 (
743 "size",
744 false,
745 &IntegerSchema::new("File size. This is used to verify that the server got all data. Ignored for incremental backups.")
746 .minimum(0)
747 .schema()
748 ),
749 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
750 ]),
751 )
752 );
753
754 fn close_fixed_index(
755 param: Value,
756 _info: &ApiMethod,
757 rpcenv: &mut dyn RpcEnvironment,
758 ) -> Result<Value, Error> {
759 let wid = required_integer_param(&param, "wid")? as usize;
760 let chunk_count = required_integer_param(&param, "chunk-count")? as u64;
761 let size = required_integer_param(&param, "size")? as u64;
762 let csum_str = required_string_param(&param, "csum")?;
763 let csum = <[u8; 32]>::from_hex(csum_str)?;
764
765 let env: &BackupEnvironment = rpcenv.as_ref();
766
767 env.fixed_writer_close(wid, chunk_count, size, csum)?;
768
769 env.log(format!("successfully closed fixed index {}", wid));
770
771 Ok(Value::Null)
772 }
773
774 fn finish_backup(
775 _param: Value,
776 _info: &ApiMethod,
777 rpcenv: &mut dyn RpcEnvironment,
778 ) -> Result<Value, Error> {
779 let env: &BackupEnvironment = rpcenv.as_ref();
780
781 env.finish_backup()?;
782 env.log("successfully finished backup");
783
784 Ok(Value::Null)
785 }
786
787 #[sortable]
788 pub const API_METHOD_GET_PREVIOUS_BACKUP_TIME: ApiMethod = ApiMethod::new(
789 &ApiHandler::Sync(&get_previous_backup_time),
790 &ObjectSchema::new("Get previous backup time.", &[]),
791 );
792
793 fn get_previous_backup_time(
794 _param: Value,
795 _info: &ApiMethod,
796 rpcenv: &mut dyn RpcEnvironment,
797 ) -> Result<Value, Error> {
798 let env: &BackupEnvironment = rpcenv.as_ref();
799
800 let backup_time = env
801 .last_backup
802 .as_ref()
803 .map(|info| info.backup_dir.backup_time());
804
805 Ok(json!(backup_time))
806 }
807
808 #[sortable]
809 pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
810 &ApiHandler::AsyncHttp(&download_previous),
811 &ObjectSchema::new(
812 "Download archive from previous backup.",
813 &sorted!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA)]),
814 ),
815 );
816
817 fn download_previous(
818 _parts: Parts,
819 _req_body: Body,
820 param: Value,
821 _info: &ApiMethod,
822 rpcenv: Box<dyn RpcEnvironment>,
823 ) -> ApiResponseFuture {
824 async move {
825 let env: &BackupEnvironment = rpcenv.as_ref();
826
827 let archive_name = required_string_param(&param, "archive-name")?.to_owned();
828
829 let last_backup = match &env.last_backup {
830 Some(info) => info,
831 None => bail!("no valid previous backup"),
832 };
833
834 let mut path = last_backup.backup_dir.full_path();
835 path.push(&archive_name);
836
837 {
838 let index: Option<Box<dyn IndexFile>> = match archive_type(&archive_name)? {
839 ArchiveType::FixedIndex => {
840 let index = env.datastore.open_fixed_reader(&path)?;
841 Some(Box::new(index))
842 }
843 ArchiveType::DynamicIndex => {
844 let index = env.datastore.open_dynamic_reader(&path)?;
845 Some(Box::new(index))
846 }
847 _ => None,
848 };
849 if let Some(index) = index {
850 env.log(format!(
851 "register chunks in '{}' from previous backup.",
852 archive_name
853 ));
854
855 for pos in 0..index.index_count() {
856 let info = index.chunk_info(pos).unwrap();
857 let size = info.range.end - info.range.start;
858 env.register_chunk(info.digest, size as u32)?;
859 }
860 }
861 }
862
863 env.log(format!("download '{}' from previous backup.", archive_name));
864 crate::api2::helpers::create_download_response(path).await
865 }
866 .boxed()
867 }