]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/mod.rs
d484cdb4ad333b43e8f28eac702eb9d0c5af6518
[proxmox-backup.git] / src / api2 / backup / mod.rs
1 //! Backup protocol (HTTP2 upgrade)
2
3 use anyhow::{bail, format_err, Error};
4 use futures::*;
5 use hex::FromHex;
6 use hyper::header::{HeaderValue, UPGRADE};
7 use hyper::http::request::Parts;
8 use hyper::{Body, Request, Response, StatusCode};
9 use serde::Deserialize;
10 use serde_json::{json, Value};
11
12 use proxmox_router::list_subdirs_api_method;
13 use proxmox_router::{
14 ApiHandler, ApiMethod, ApiResponseFuture, Permission, Router, RpcEnvironment, SubdirMap,
15 };
16 use proxmox_schema::*;
17 use proxmox_sys::sortable;
18
19 use pbs_api_types::{
20 Authid, BackupNamespace, BackupType, DatastoreWithNamespace, Operation, SnapshotVerifyState,
21 VerifyState, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
22 BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA,
23 PRIV_DATASTORE_BACKUP,
24 };
25 use pbs_config::CachedUserInfo;
26 use pbs_datastore::index::IndexFile;
27 use pbs_datastore::manifest::{archive_type, ArchiveType};
28 use pbs_datastore::{DataStore, PROXMOX_BACKUP_PROTOCOL_ID_V1};
29 use pbs_tools::json::{required_array_param, required_integer_param, required_string_param};
30 use proxmox_rest_server::{H2Service, WorkerTask};
31 use proxmox_sys::fs::lock_dir_noblock_shared;
32
33 mod environment;
34 use environment::*;
35
36 mod upload_chunk;
37 use upload_chunk::*;
38
39 pub const ROUTER: Router = Router::new().upgrade(&API_METHOD_UPGRADE_BACKUP);
40
41 #[sortable]
42 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
43 &ApiHandler::AsyncHttp(&upgrade_to_backup_protocol),
44 &ObjectSchema::new(
45 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
46 &sorted!([
47 ("store", false, &DATASTORE_SCHEMA),
48 ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
49 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
50 ("backup-id", false, &BACKUP_ID_SCHEMA),
51 ("backup-time", false, &BACKUP_TIME_SCHEMA),
52 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
53 ("benchmark", true, &BooleanSchema::new("Job is a benchmark (do not keep data).").schema()),
54 ]),
55 )
56 ).access(
57 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
58 Some("Requires on /datastore/{store}[/{namespace}] DATASTORE_BACKUP and being the owner of the group"),
59 &Permission::Anybody
60 );
61
62 pub(crate) fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error> {
63 match param.get("ns") {
64 Some(Value::String(ns)) => ns.parse(),
65 None => Ok(BackupNamespace::root()),
66 _ => bail!("invalid ns parameter"),
67 }
68 }
69
70 fn upgrade_to_backup_protocol(
71 parts: Parts,
72 req_body: Body,
73 param: Value,
74 _info: &ApiMethod,
75 rpcenv: Box<dyn RpcEnvironment>,
76 ) -> ApiResponseFuture {
77 async move {
78 let debug = param["debug"].as_bool().unwrap_or(false);
79 let benchmark = param["benchmark"].as_bool().unwrap_or(false);
80
81 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
82
83 let store = required_string_param(&param, "store")?.to_owned();
84 let backup_ns = optional_ns_param(&param)?;
85 let store_with_ns = DatastoreWithNamespace {
86 store: store.clone(),
87 ns: backup_ns.clone(),
88 };
89 let backup_dir_arg = pbs_api_types::BackupDir::deserialize(&param)?;
90
91 let user_info = CachedUserInfo::new()?;
92
93 let privs = user_info.lookup_privs(&auth_id, &store_with_ns.acl_path());
94 if privs & PRIV_DATASTORE_BACKUP == 0 {
95 proxmox_router::http_bail!(FORBIDDEN, "permission check failed");
96 }
97
98 let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
99
100 let protocols = parts
101 .headers
102 .get("UPGRADE")
103 .ok_or_else(|| format_err!("missing Upgrade header"))?
104 .to_str()?;
105
106 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
107 bail!("invalid protocol name");
108 }
109
110 if parts.version >= http::version::Version::HTTP_2 {
111 bail!(
112 "unexpected http version '{:?}' (expected version < 2)",
113 parts.version
114 );
115 }
116
117 if !datastore.namespace_path(&backup_ns).exists() {
118 proxmox_router::http_bail!(NOT_FOUND, "namespace not found");
119 }
120
121 // FIXME: include namespace here?
122 let worker_id = format!("{}:{}/{}", store, backup_dir_arg.ty(), backup_dir_arg.id());
123
124 let env_type = rpcenv.env_type();
125
126 let backup_group = datastore.backup_group(backup_ns, backup_dir_arg.group.clone());
127
128 let worker_type = if backup_group.backup_type() == BackupType::Host
129 && backup_group.backup_id() == "benchmark"
130 {
131 if !benchmark {
132 bail!("unable to run benchmark without --benchmark flags");
133 }
134 "benchmark"
135 } else {
136 if benchmark {
137 bail!("benchmark flags is only allowed on 'host/benchmark'");
138 }
139 "backup"
140 };
141
142 // lock backup group to only allow one backup per group at a time
143 let (owner, _group_guard) = datastore.create_locked_backup_group(
144 backup_group.backup_ns(),
145 backup_group.as_ref(),
146 &auth_id,
147 )?;
148
149 // permission check
150 let correct_owner =
151 owner == auth_id || (owner.is_token() && Authid::from(owner.user().clone()) == auth_id);
152 if !correct_owner && worker_type != "benchmark" {
153 // only the owner is allowed to create additional snapshots
154 bail!("backup owner check failed ({} != {})", auth_id, owner);
155 }
156
157 let last_backup = {
158 let info = backup_group.last_backup(true).unwrap_or(None);
159 if let Some(info) = info {
160 let (manifest, _) = info.backup_dir.load_manifest()?;
161 let verify = manifest.unprotected["verify_state"].clone();
162 match serde_json::from_value::<SnapshotVerifyState>(verify) {
163 Ok(verify) => match verify.state {
164 VerifyState::Ok => Some(info),
165 VerifyState::Failed => None,
166 },
167 Err(_) => {
168 // no verify state found, treat as valid
169 Some(info)
170 }
171 }
172 } else {
173 None
174 }
175 };
176
177 let backup_dir = backup_group.backup_dir(backup_dir_arg.time)?;
178
179 let _last_guard = if let Some(last) = &last_backup {
180 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
181 bail!("backup timestamp is older than last backup.");
182 }
183
184 // lock last snapshot to prevent forgetting/pruning it during backup
185 let full_path = last.backup_dir.full_path();
186 Some(lock_dir_noblock_shared(
187 &full_path,
188 "snapshot",
189 "base snapshot is already locked by another operation",
190 )?)
191 } else {
192 None
193 };
194
195 let (path, is_new, snap_guard) =
196 datastore.create_locked_backup_dir(backup_dir.backup_ns(), backup_dir.as_ref())?;
197 if !is_new {
198 bail!("backup directory already exists.");
199 }
200
201 WorkerTask::spawn(
202 worker_type,
203 Some(worker_id),
204 auth_id.to_string(),
205 true,
206 move |worker| {
207 let mut env = BackupEnvironment::new(
208 env_type,
209 auth_id,
210 worker.clone(),
211 datastore,
212 backup_dir,
213 );
214
215 env.debug = debug;
216 env.last_backup = last_backup;
217
218 env.log(format!(
219 "starting new {} on datastore '{}': {:?}",
220 worker_type, store, path
221 ));
222
223 let service =
224 H2Service::new(env.clone(), worker.clone(), &BACKUP_API_ROUTER, debug);
225
226 let abort_future = worker.abort_future();
227
228 let env2 = env.clone();
229
230 let mut req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
231 .map_err(Error::from)
232 .and_then(move |conn| {
233 env2.debug("protocol upgrade done");
234
235 let mut http = hyper::server::conn::Http::new();
236 http.http2_only(true);
237 // increase window size: todo - find optiomal size
238 let window_size = 32 * 1024 * 1024; // max = (1 << 31) - 2
239 http.http2_initial_stream_window_size(window_size);
240 http.http2_initial_connection_window_size(window_size);
241 http.http2_max_frame_size(4 * 1024 * 1024);
242
243 let env3 = env2.clone();
244 http.serve_connection(conn, service).map(move |result| {
245 match result {
246 Err(err) => {
247 // Avoid Transport endpoint is not connected (os error 107)
248 // fixme: find a better way to test for that error
249 if err.to_string().starts_with("connection error")
250 && env3.finished()
251 {
252 Ok(())
253 } else {
254 Err(Error::from(err))
255 }
256 }
257 Ok(()) => Ok(()),
258 }
259 })
260 });
261 let mut abort_future = abort_future.map(|_| Err(format_err!("task aborted")));
262
263 async move {
264 // keep flock until task ends
265 let _group_guard = _group_guard;
266 let snap_guard = snap_guard;
267 let _last_guard = _last_guard;
268
269 let res = select! {
270 req = req_fut => req,
271 abrt = abort_future => abrt,
272 };
273 if benchmark {
274 env.log("benchmark finished successfully");
275 proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
276 return Ok(());
277 }
278
279 let verify = |env: BackupEnvironment| {
280 if let Err(err) = env.verify_after_complete(snap_guard) {
281 env.log(format!(
282 "backup finished, but starting the requested verify task failed: {}",
283 err
284 ));
285 }
286 };
287
288 match (res, env.ensure_finished()) {
289 (Ok(_), Ok(())) => {
290 env.log("backup finished successfully");
291 verify(env);
292 Ok(())
293 }
294 (Err(err), Ok(())) => {
295 // ignore errors after finish
296 env.log(format!("backup had errors but finished: {}", err));
297 verify(env);
298 Ok(())
299 }
300 (Ok(_), Err(err)) => {
301 env.log(format!("backup ended and finish failed: {}", err));
302 env.log("removing unfinished backup");
303 proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
304 Err(err)
305 }
306 (Err(err), Err(_)) => {
307 env.log(format!("backup failed: {}", err));
308 env.log("removing failed backup");
309 proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
310 Err(err)
311 }
312 }
313 }
314 },
315 )?;
316
317 let response = Response::builder()
318 .status(StatusCode::SWITCHING_PROTOCOLS)
319 .header(
320 UPGRADE,
321 HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()),
322 )
323 .body(Body::empty())?;
324
325 Ok(response)
326 }
327 .boxed()
328 }
329
330 const BACKUP_API_SUBDIRS: SubdirMap = &[
331 ("blob", &Router::new().upload(&API_METHOD_UPLOAD_BLOB)),
332 (
333 "dynamic_chunk",
334 &Router::new().upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK),
335 ),
336 (
337 "dynamic_close",
338 &Router::new().post(&API_METHOD_CLOSE_DYNAMIC_INDEX),
339 ),
340 (
341 "dynamic_index",
342 &Router::new()
343 .post(&API_METHOD_CREATE_DYNAMIC_INDEX)
344 .put(&API_METHOD_DYNAMIC_APPEND),
345 ),
346 (
347 "finish",
348 &Router::new().post(&ApiMethod::new(
349 &ApiHandler::Sync(&finish_backup),
350 &ObjectSchema::new("Mark backup as finished.", &[]),
351 )),
352 ),
353 (
354 "fixed_chunk",
355 &Router::new().upload(&API_METHOD_UPLOAD_FIXED_CHUNK),
356 ),
357 (
358 "fixed_close",
359 &Router::new().post(&API_METHOD_CLOSE_FIXED_INDEX),
360 ),
361 (
362 "fixed_index",
363 &Router::new()
364 .post(&API_METHOD_CREATE_FIXED_INDEX)
365 .put(&API_METHOD_FIXED_APPEND),
366 ),
367 (
368 "previous",
369 &Router::new().download(&API_METHOD_DOWNLOAD_PREVIOUS),
370 ),
371 (
372 "previous_backup_time",
373 &Router::new().get(&API_METHOD_GET_PREVIOUS_BACKUP_TIME),
374 ),
375 (
376 "speedtest",
377 &Router::new().upload(&API_METHOD_UPLOAD_SPEEDTEST),
378 ),
379 ];
380
381 pub const BACKUP_API_ROUTER: Router = Router::new()
382 .get(&list_subdirs_api_method!(BACKUP_API_SUBDIRS))
383 .subdirs(BACKUP_API_SUBDIRS);
384
385 #[sortable]
386 pub const API_METHOD_CREATE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
387 &ApiHandler::Sync(&create_dynamic_index),
388 &ObjectSchema::new(
389 "Create dynamic chunk index file.",
390 &sorted!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),]),
391 ),
392 );
393
394 fn create_dynamic_index(
395 param: Value,
396 _info: &ApiMethod,
397 rpcenv: &mut dyn RpcEnvironment,
398 ) -> Result<Value, Error> {
399 let env: &BackupEnvironment = rpcenv.as_ref();
400
401 let name = required_string_param(&param, "archive-name")?.to_owned();
402
403 let archive_name = name.clone();
404 if !archive_name.ends_with(".didx") {
405 bail!("wrong archive extension: '{}'", archive_name);
406 }
407
408 let mut path = env.backup_dir.relative_path();
409 path.push(archive_name);
410
411 let index = env.datastore.create_dynamic_writer(&path)?;
412 let wid = env.register_dynamic_writer(index, name)?;
413
414 env.log(format!("created new dynamic index {} ({:?})", wid, path));
415
416 Ok(json!(wid))
417 }
418
419 #[sortable]
420 pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
421 &ApiHandler::Sync(&create_fixed_index),
422 &ObjectSchema::new(
423 "Create fixed chunk index file.",
424 &sorted!([
425 ("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),
426 (
427 "size",
428 false,
429 &IntegerSchema::new("File size.").minimum(1).schema()
430 ),
431 (
432 "reuse-csum",
433 true,
434 &StringSchema::new(
435 "If set, compare last backup's \
436 csum and reuse index for incremental backup if it matches."
437 )
438 .schema()
439 ),
440 ]),
441 ),
442 );
443
444 fn create_fixed_index(
445 param: Value,
446 _info: &ApiMethod,
447 rpcenv: &mut dyn RpcEnvironment,
448 ) -> Result<Value, Error> {
449 let env: &BackupEnvironment = rpcenv.as_ref();
450
451 let name = required_string_param(&param, "archive-name")?.to_owned();
452 let size = required_integer_param(&param, "size")? as usize;
453 let reuse_csum = param["reuse-csum"].as_str();
454
455 let archive_name = name.clone();
456 if !archive_name.ends_with(".fidx") {
457 bail!("wrong archive extension: '{}'", archive_name);
458 }
459
460 let mut path = env.backup_dir.relative_path();
461 path.push(&archive_name);
462
463 let chunk_size = 4096 * 1024; // todo: ??
464
465 // do incremental backup if csum is set
466 let mut reader = None;
467 let mut incremental = false;
468 if let Some(csum) = reuse_csum {
469 incremental = true;
470 let last_backup = match &env.last_backup {
471 Some(info) => info,
472 None => {
473 bail!("cannot reuse index - no valid previous backup exists");
474 }
475 };
476
477 let mut last_path = last_backup.backup_dir.relative_path();
478 last_path.push(&archive_name);
479
480 let index = match env.datastore.open_fixed_reader(last_path) {
481 Ok(index) => index,
482 Err(_) => {
483 bail!("cannot reuse index - no previous backup exists for archive");
484 }
485 };
486
487 let (old_csum, _) = index.compute_csum();
488 let old_csum = hex::encode(&old_csum);
489 if old_csum != csum {
490 bail!(
491 "expected csum ({}) doesn't match last backup's ({}), cannot do incremental backup",
492 csum,
493 old_csum
494 );
495 }
496
497 reader = Some(index);
498 }
499
500 let mut writer = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
501
502 if let Some(reader) = reader {
503 writer.clone_data_from(&reader)?;
504 }
505
506 let wid = env.register_fixed_writer(writer, name, size, chunk_size as u32, incremental)?;
507
508 env.log(format!("created new fixed index {} ({:?})", wid, path));
509
510 Ok(json!(wid))
511 }
512
513 #[sortable]
514 pub const API_METHOD_DYNAMIC_APPEND: ApiMethod = ApiMethod::new(
515 &ApiHandler::Sync(&dynamic_append),
516 &ObjectSchema::new(
517 "Append chunk to dynamic index writer.",
518 &sorted!([
519 (
520 "wid",
521 false,
522 &IntegerSchema::new("Dynamic writer ID.")
523 .minimum(1)
524 .maximum(256)
525 .schema()
526 ),
527 (
528 "digest-list",
529 false,
530 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
531 ),
532 (
533 "offset-list",
534 false,
535 &ArraySchema::new(
536 "Chunk offset list.",
537 &IntegerSchema::new("Corresponding chunk offsets.")
538 .minimum(0)
539 .schema()
540 )
541 .schema()
542 ),
543 ]),
544 ),
545 );
546
547 fn dynamic_append(
548 param: Value,
549 _info: &ApiMethod,
550 rpcenv: &mut dyn RpcEnvironment,
551 ) -> Result<Value, Error> {
552 let wid = required_integer_param(&param, "wid")? as usize;
553 let digest_list = required_array_param(&param, "digest-list")?;
554 let offset_list = required_array_param(&param, "offset-list")?;
555
556 if offset_list.len() != digest_list.len() {
557 bail!(
558 "offset list has wrong length ({} != {})",
559 offset_list.len(),
560 digest_list.len()
561 );
562 }
563
564 let env: &BackupEnvironment = rpcenv.as_ref();
565
566 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
567
568 for (i, item) in digest_list.iter().enumerate() {
569 let digest_str = item.as_str().unwrap();
570 let digest = <[u8; 32]>::from_hex(digest_str)?;
571 let offset = offset_list[i].as_u64().unwrap();
572 let size = env
573 .lookup_chunk(&digest)
574 .ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
575
576 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
577
578 env.debug(format!(
579 "successfully added chunk {} to dynamic index {} (offset {}, size {})",
580 digest_str, wid, offset, size
581 ));
582 }
583
584 Ok(Value::Null)
585 }
586
587 #[sortable]
588 pub const API_METHOD_FIXED_APPEND: ApiMethod = ApiMethod::new(
589 &ApiHandler::Sync(&fixed_append),
590 &ObjectSchema::new(
591 "Append chunk to fixed index writer.",
592 &sorted!([
593 (
594 "wid",
595 false,
596 &IntegerSchema::new("Fixed writer ID.")
597 .minimum(1)
598 .maximum(256)
599 .schema()
600 ),
601 (
602 "digest-list",
603 false,
604 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
605 ),
606 (
607 "offset-list",
608 false,
609 &ArraySchema::new(
610 "Chunk offset list.",
611 &IntegerSchema::new("Corresponding chunk offsets.")
612 .minimum(0)
613 .schema()
614 )
615 .schema()
616 )
617 ]),
618 ),
619 );
620
621 fn fixed_append(
622 param: Value,
623 _info: &ApiMethod,
624 rpcenv: &mut dyn RpcEnvironment,
625 ) -> Result<Value, Error> {
626 let wid = required_integer_param(&param, "wid")? as usize;
627 let digest_list = required_array_param(&param, "digest-list")?;
628 let offset_list = required_array_param(&param, "offset-list")?;
629
630 if offset_list.len() != digest_list.len() {
631 bail!(
632 "offset list has wrong length ({} != {})",
633 offset_list.len(),
634 digest_list.len()
635 );
636 }
637
638 let env: &BackupEnvironment = rpcenv.as_ref();
639
640 env.debug(format!("fixed_append {} chunks", digest_list.len()));
641
642 for (i, item) in digest_list.iter().enumerate() {
643 let digest_str = item.as_str().unwrap();
644 let digest = <[u8; 32]>::from_hex(digest_str)?;
645 let offset = offset_list[i].as_u64().unwrap();
646 let size = env
647 .lookup_chunk(&digest)
648 .ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
649
650 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
651
652 env.debug(format!(
653 "successfully added chunk {} to fixed index {} (offset {}, size {})",
654 digest_str, wid, offset, size
655 ));
656 }
657
658 Ok(Value::Null)
659 }
660
661 #[sortable]
662 pub const API_METHOD_CLOSE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
663 &ApiHandler::Sync(&close_dynamic_index),
664 &ObjectSchema::new(
665 "Close dynamic index writer.",
666 &sorted!([
667 (
668 "wid",
669 false,
670 &IntegerSchema::new("Dynamic writer ID.")
671 .minimum(1)
672 .maximum(256)
673 .schema()
674 ),
675 (
676 "chunk-count",
677 false,
678 &IntegerSchema::new(
679 "Chunk count. This is used to verify that the server got all chunks."
680 )
681 .minimum(1)
682 .schema()
683 ),
684 (
685 "size",
686 false,
687 &IntegerSchema::new(
688 "File size. This is used to verify that the server got all data."
689 )
690 .minimum(1)
691 .schema()
692 ),
693 (
694 "csum",
695 false,
696 &StringSchema::new("Digest list checksum.").schema()
697 ),
698 ]),
699 ),
700 );
701
702 fn close_dynamic_index(
703 param: Value,
704 _info: &ApiMethod,
705 rpcenv: &mut dyn RpcEnvironment,
706 ) -> Result<Value, Error> {
707 let wid = required_integer_param(&param, "wid")? as usize;
708 let chunk_count = required_integer_param(&param, "chunk-count")? as u64;
709 let size = required_integer_param(&param, "size")? as u64;
710 let csum_str = required_string_param(&param, "csum")?;
711 let csum = <[u8; 32]>::from_hex(csum_str)?;
712
713 let env: &BackupEnvironment = rpcenv.as_ref();
714
715 env.dynamic_writer_close(wid, chunk_count, size, csum)?;
716
717 env.log(format!("successfully closed dynamic index {}", wid));
718
719 Ok(Value::Null)
720 }
721
722 #[sortable]
723 pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
724 &ApiHandler::Sync(&close_fixed_index),
725 &ObjectSchema::new(
726 "Close fixed index writer.",
727 &sorted!([
728 (
729 "wid",
730 false,
731 &IntegerSchema::new("Fixed writer ID.")
732 .minimum(1)
733 .maximum(256)
734 .schema()
735 ),
736 (
737 "chunk-count",
738 false,
739 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks. Ignored for incremental backups.")
740 .minimum(0)
741 .schema()
742 ),
743 (
744 "size",
745 false,
746 &IntegerSchema::new("File size. This is used to verify that the server got all data. Ignored for incremental backups.")
747 .minimum(0)
748 .schema()
749 ),
750 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
751 ]),
752 )
753 );
754
755 fn close_fixed_index(
756 param: Value,
757 _info: &ApiMethod,
758 rpcenv: &mut dyn RpcEnvironment,
759 ) -> Result<Value, Error> {
760 let wid = required_integer_param(&param, "wid")? as usize;
761 let chunk_count = required_integer_param(&param, "chunk-count")? as u64;
762 let size = required_integer_param(&param, "size")? as u64;
763 let csum_str = required_string_param(&param, "csum")?;
764 let csum = <[u8; 32]>::from_hex(csum_str)?;
765
766 let env: &BackupEnvironment = rpcenv.as_ref();
767
768 env.fixed_writer_close(wid, chunk_count, size, csum)?;
769
770 env.log(format!("successfully closed fixed index {}", wid));
771
772 Ok(Value::Null)
773 }
774
775 fn finish_backup(
776 _param: Value,
777 _info: &ApiMethod,
778 rpcenv: &mut dyn RpcEnvironment,
779 ) -> Result<Value, Error> {
780 let env: &BackupEnvironment = rpcenv.as_ref();
781
782 env.finish_backup()?;
783 env.log("successfully finished backup");
784
785 Ok(Value::Null)
786 }
787
788 #[sortable]
789 pub const API_METHOD_GET_PREVIOUS_BACKUP_TIME: ApiMethod = ApiMethod::new(
790 &ApiHandler::Sync(&get_previous_backup_time),
791 &ObjectSchema::new("Get previous backup time.", &[]),
792 );
793
794 fn get_previous_backup_time(
795 _param: Value,
796 _info: &ApiMethod,
797 rpcenv: &mut dyn RpcEnvironment,
798 ) -> Result<Value, Error> {
799 let env: &BackupEnvironment = rpcenv.as_ref();
800
801 let backup_time = env
802 .last_backup
803 .as_ref()
804 .map(|info| info.backup_dir.backup_time());
805
806 Ok(json!(backup_time))
807 }
808
809 #[sortable]
810 pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
811 &ApiHandler::AsyncHttp(&download_previous),
812 &ObjectSchema::new(
813 "Download archive from previous backup.",
814 &sorted!([("archive-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA)]),
815 ),
816 );
817
818 fn download_previous(
819 _parts: Parts,
820 _req_body: Body,
821 param: Value,
822 _info: &ApiMethod,
823 rpcenv: Box<dyn RpcEnvironment>,
824 ) -> ApiResponseFuture {
825 async move {
826 let env: &BackupEnvironment = rpcenv.as_ref();
827
828 let archive_name = required_string_param(&param, "archive-name")?.to_owned();
829
830 let last_backup = match &env.last_backup {
831 Some(info) => info,
832 None => bail!("no valid previous backup"),
833 };
834
835 let mut path = last_backup.backup_dir.full_path();
836 path.push(&archive_name);
837
838 {
839 let index: Option<Box<dyn IndexFile>> = match archive_type(&archive_name)? {
840 ArchiveType::FixedIndex => {
841 let index = env.datastore.open_fixed_reader(&path)?;
842 Some(Box::new(index))
843 }
844 ArchiveType::DynamicIndex => {
845 let index = env.datastore.open_dynamic_reader(&path)?;
846 Some(Box::new(index))
847 }
848 _ => None,
849 };
850 if let Some(index) = index {
851 env.log(format!(
852 "register chunks in '{}' from previous backup.",
853 archive_name
854 ));
855
856 for pos in 0..index.index_count() {
857 let info = index.chunk_info(pos).unwrap();
858 let size = info.range.end - info.range.start;
859 env.register_chunk(info.digest, size as u32)?;
860 }
861 }
862 }
863
864 env.log(format!("download '{}' from previous backup.", archive_name));
865 crate::api2::helpers::create_download_response(path).await
866 }
867 .boxed()
868 }