]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/pull.rs
use RateLimitConfig for HttpClient and pull
[proxmox-backup.git] / src / server / pull.rs
1 //! Sync datastore from remote server
2
3 use std::collections::{HashMap, HashSet};
4 use std::convert::TryFrom;
5 use std::io::{Seek, SeekFrom};
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::{Arc, Mutex};
8 use std::time::SystemTime;
9
10 use anyhow::{bail, format_err, Error};
11 use serde_json::json;
12 use http::StatusCode;
13
14 use proxmox_router::HttpError;
15 use proxmox_sys::task_log;
16
17 use pbs_api_types::{
18 Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote,
19 SnapshotListItem,
20 };
21
22 use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
23 use pbs_datastore::data_blob::DataBlob;
24 use pbs_datastore::dynamic_index::DynamicIndexReader;
25 use pbs_datastore::fixed_index::FixedIndexReader;
26 use pbs_datastore::index::IndexFile;
27 use pbs_datastore::manifest::{
28 CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
29 };
30 use pbs_tools::sha::sha256;
31 use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
32 use proxmox_rest_server::WorkerTask;
33
34 use crate::tools::ParallelHandler;
35
36 // fixme: implement filters
37 // fixme: delete vanished groups
38 // Todo: correctly lock backup groups
39
40 pub struct PullParameters {
41 remote: Remote,
42 source: BackupRepository,
43 store: Arc<DataStore>,
44 owner: Authid,
45 remove_vanished: bool,
46 group_filter: Option<Vec<GroupFilter>>,
47 limit: RateLimitConfig,
48 }
49
50 impl PullParameters {
51 pub fn new(
52 store: &str,
53 remote: &str,
54 remote_store: &str,
55 owner: Authid,
56 remove_vanished: Option<bool>,
57 group_filter: Option<Vec<GroupFilter>>,
58 limit: RateLimitConfig,
59 ) -> Result<Self, Error> {
60 let store = DataStore::lookup_datastore(store)?;
61
62 let (remote_config, _digest) = pbs_config::remote::config()?;
63 let remote: Remote = remote_config.lookup("remote", remote)?;
64
65 let remove_vanished = remove_vanished.unwrap_or(false);
66
67 let source = BackupRepository::new(
68 Some(remote.config.auth_id.clone()),
69 Some(remote.config.host.clone()),
70 remote.config.port,
71 remote_store.to_string(),
72 );
73
74 Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit })
75 }
76
77 pub async fn client(&self) -> Result<HttpClient, Error> {
78 crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
79 }
80 }
81
82 async fn pull_index_chunks<I: IndexFile>(
83 worker: &WorkerTask,
84 chunk_reader: RemoteChunkReader,
85 target: Arc<DataStore>,
86 index: I,
87 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
88 ) -> Result<(), Error> {
89 use futures::stream::{self, StreamExt, TryStreamExt};
90
91 let start_time = SystemTime::now();
92
93 let stream = stream::iter(
94 (0..index.index_count())
95 .map(|pos| index.chunk_info(pos).unwrap())
96 .filter(|info| {
97 let mut guard = downloaded_chunks.lock().unwrap();
98 let done = guard.contains(&info.digest);
99 if !done {
100 // Note: We mark a chunk as downloaded before its actually downloaded
101 // to avoid duplicate downloads.
102 guard.insert(info.digest);
103 }
104 !done
105 }),
106 );
107
108 let target2 = target.clone();
109 let verify_pool = ParallelHandler::new(
110 "sync chunk writer",
111 4,
112 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
113 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
114 chunk.verify_unencrypted(size as usize, &digest)?;
115 target2.insert_chunk(&chunk, &digest)?;
116 Ok(())
117 },
118 );
119
120 let verify_and_write_channel = verify_pool.channel();
121
122 let bytes = Arc::new(AtomicUsize::new(0));
123
124 stream
125 .map(|info| {
126 let target = Arc::clone(&target);
127 let chunk_reader = chunk_reader.clone();
128 let bytes = Arc::clone(&bytes);
129 let verify_and_write_channel = verify_and_write_channel.clone();
130
131 Ok::<_, Error>(async move {
132 let chunk_exists = proxmox_async::runtime::block_in_place(|| {
133 target.cond_touch_chunk(&info.digest, false)
134 })?;
135 if chunk_exists {
136 //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
137 return Ok::<_, Error>(());
138 }
139 //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
140 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
141 let raw_size = chunk.raw_size() as usize;
142
143 // decode, verify and write in a separate threads to maximize throughput
144 proxmox_async::runtime::block_in_place(|| {
145 verify_and_write_channel.send((chunk, info.digest, info.size()))
146 })?;
147
148 bytes.fetch_add(raw_size, Ordering::SeqCst);
149
150 Ok(())
151 })
152 })
153 .try_buffer_unordered(20)
154 .try_for_each(|_res| futures::future::ok(()))
155 .await?;
156
157 drop(verify_and_write_channel);
158
159 verify_pool.complete()?;
160
161 let elapsed = start_time.elapsed()?.as_secs_f64();
162
163 let bytes = bytes.load(Ordering::SeqCst);
164
165 task_log!(
166 worker,
167 "downloaded {} bytes ({:.2} MiB/s)",
168 bytes,
169 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
170 );
171
172 Ok(())
173 }
174
175 async fn download_manifest(
176 reader: &BackupReader,
177 filename: &std::path::Path,
178 ) -> Result<std::fs::File, Error> {
179 let mut tmp_manifest_file = std::fs::OpenOptions::new()
180 .write(true)
181 .create(true)
182 .truncate(true)
183 .read(true)
184 .open(&filename)?;
185
186 reader
187 .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
188 .await?;
189
190 tmp_manifest_file.seek(SeekFrom::Start(0))?;
191
192 Ok(tmp_manifest_file)
193 }
194
195 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
196 if size != info.size {
197 bail!(
198 "wrong size for file '{}' ({} != {})",
199 info.filename,
200 info.size,
201 size
202 );
203 }
204
205 if csum != &info.csum {
206 bail!("wrong checksum for file '{}'", info.filename);
207 }
208
209 Ok(())
210 }
211
212 async fn pull_single_archive(
213 worker: &WorkerTask,
214 reader: &BackupReader,
215 chunk_reader: &mut RemoteChunkReader,
216 tgt_store: Arc<DataStore>,
217 snapshot: &BackupDir,
218 archive_info: &FileInfo,
219 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
220 ) -> Result<(), Error> {
221 let archive_name = &archive_info.filename;
222 let mut path = tgt_store.base_path();
223 path.push(snapshot.relative_path());
224 path.push(archive_name);
225
226 let mut tmp_path = path.clone();
227 tmp_path.set_extension("tmp");
228
229 task_log!(worker, "sync archive {}", archive_name);
230
231 let mut tmpfile = std::fs::OpenOptions::new()
232 .write(true)
233 .create(true)
234 .read(true)
235 .open(&tmp_path)?;
236
237 reader.download(archive_name, &mut tmpfile).await?;
238
239 match archive_type(archive_name)? {
240 ArchiveType::DynamicIndex => {
241 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
242 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
243 })?;
244 let (csum, size) = index.compute_csum();
245 verify_archive(archive_info, &csum, size)?;
246
247 pull_index_chunks(
248 worker,
249 chunk_reader.clone(),
250 tgt_store.clone(),
251 index,
252 downloaded_chunks,
253 )
254 .await?;
255 }
256 ArchiveType::FixedIndex => {
257 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
258 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
259 })?;
260 let (csum, size) = index.compute_csum();
261 verify_archive(archive_info, &csum, size)?;
262
263 pull_index_chunks(
264 worker,
265 chunk_reader.clone(),
266 tgt_store.clone(),
267 index,
268 downloaded_chunks,
269 )
270 .await?;
271 }
272 ArchiveType::Blob => {
273 tmpfile.seek(SeekFrom::Start(0))?;
274 let (csum, size) = sha256(&mut tmpfile)?;
275 verify_archive(archive_info, &csum, size)?;
276 }
277 }
278 if let Err(err) = std::fs::rename(&tmp_path, &path) {
279 bail!("Atomic rename file {:?} failed - {}", path, err);
280 }
281 Ok(())
282 }
283
284 // Note: The client.log.blob is uploaded after the backup, so it is
285 // not mentioned in the manifest.
286 async fn try_client_log_download(
287 worker: &WorkerTask,
288 reader: Arc<BackupReader>,
289 path: &std::path::Path,
290 ) -> Result<(), Error> {
291 let mut tmp_path = path.to_owned();
292 tmp_path.set_extension("tmp");
293
294 let tmpfile = std::fs::OpenOptions::new()
295 .write(true)
296 .create(true)
297 .read(true)
298 .open(&tmp_path)?;
299
300 // Note: be silent if there is no log - only log successful download
301 if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
302 if let Err(err) = std::fs::rename(&tmp_path, &path) {
303 bail!("Atomic rename file {:?} failed - {}", path, err);
304 }
305 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
306 }
307
308 Ok(())
309 }
310
311 async fn pull_snapshot(
312 worker: &WorkerTask,
313 reader: Arc<BackupReader>,
314 tgt_store: Arc<DataStore>,
315 snapshot: &BackupDir,
316 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
317 ) -> Result<(), Error> {
318 let mut manifest_name = tgt_store.base_path();
319 manifest_name.push(snapshot.relative_path());
320 manifest_name.push(MANIFEST_BLOB_NAME);
321
322 let mut client_log_name = tgt_store.base_path();
323 client_log_name.push(snapshot.relative_path());
324 client_log_name.push(CLIENT_LOG_BLOB_NAME);
325
326 let mut tmp_manifest_name = manifest_name.clone();
327 tmp_manifest_name.set_extension("tmp");
328
329 let download_res = download_manifest(&reader, &tmp_manifest_name).await;
330 let mut tmp_manifest_file = match download_res {
331 Ok(manifest_file) => manifest_file,
332 Err(err) => {
333 match err.downcast_ref::<HttpError>() {
334 Some(HttpError { code, message }) => match *code {
335 StatusCode::NOT_FOUND => {
336 task_log!(
337 worker,
338 "skipping snapshot {} - vanished since start of sync",
339 snapshot
340 );
341 return Ok(());
342 }
343 _ => {
344 bail!("HTTP error {} - {}", code, message);
345 }
346 },
347 None => {
348 return Err(err);
349 }
350 };
351 }
352 };
353 let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
354
355 if manifest_name.exists() {
356 let manifest_blob = proxmox_lang::try_block!({
357 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
358 format_err!(
359 "unable to open local manifest {:?} - {}",
360 manifest_name,
361 err
362 )
363 })?;
364
365 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
366 Ok(manifest_blob)
367 })
368 .map_err(|err: Error| {
369 format_err!(
370 "unable to read local manifest {:?} - {}",
371 manifest_name,
372 err
373 )
374 })?;
375
376 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
377 if !client_log_name.exists() {
378 try_client_log_download(worker, reader, &client_log_name).await?;
379 }
380 task_log!(worker, "no data changes");
381 let _ = std::fs::remove_file(&tmp_manifest_name);
382 return Ok(()); // nothing changed
383 }
384 }
385
386 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
387
388 for item in manifest.files() {
389 let mut path = tgt_store.base_path();
390 path.push(snapshot.relative_path());
391 path.push(&item.filename);
392
393 if path.exists() {
394 match archive_type(&item.filename)? {
395 ArchiveType::DynamicIndex => {
396 let index = DynamicIndexReader::open(&path)?;
397 let (csum, size) = index.compute_csum();
398 match manifest.verify_file(&item.filename, &csum, size) {
399 Ok(_) => continue,
400 Err(err) => {
401 task_log!(worker, "detected changed file {:?} - {}", path, err);
402 }
403 }
404 }
405 ArchiveType::FixedIndex => {
406 let index = FixedIndexReader::open(&path)?;
407 let (csum, size) = index.compute_csum();
408 match manifest.verify_file(&item.filename, &csum, size) {
409 Ok(_) => continue,
410 Err(err) => {
411 task_log!(worker, "detected changed file {:?} - {}", path, err);
412 }
413 }
414 }
415 ArchiveType::Blob => {
416 let mut tmpfile = std::fs::File::open(&path)?;
417 let (csum, size) = sha256(&mut tmpfile)?;
418 match manifest.verify_file(&item.filename, &csum, size) {
419 Ok(_) => continue,
420 Err(err) => {
421 task_log!(worker, "detected changed file {:?} - {}", path, err);
422 }
423 }
424 }
425 }
426 }
427
428 let mut chunk_reader = RemoteChunkReader::new(
429 reader.clone(),
430 None,
431 item.chunk_crypt_mode(),
432 HashMap::new(),
433 );
434
435 pull_single_archive(
436 worker,
437 &reader,
438 &mut chunk_reader,
439 tgt_store.clone(),
440 snapshot,
441 &item,
442 downloaded_chunks.clone(),
443 )
444 .await?;
445 }
446
447 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
448 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
449 }
450
451 if !client_log_name.exists() {
452 try_client_log_download(worker, reader, &client_log_name).await?;
453 }
454
455 // cleanup - remove stale files
456 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
457
458 Ok(())
459 }
460
461 pub async fn pull_snapshot_from(
462 worker: &WorkerTask,
463 reader: Arc<BackupReader>,
464 tgt_store: Arc<DataStore>,
465 snapshot: &BackupDir,
466 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
467 ) -> Result<(), Error> {
468 let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
469
470 if is_new {
471 task_log!(worker, "sync snapshot {:?}", snapshot.relative_path());
472
473 if let Err(err) = pull_snapshot(
474 worker,
475 reader,
476 tgt_store.clone(),
477 &snapshot,
478 downloaded_chunks,
479 )
480 .await
481 {
482 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
483 task_log!(worker, "cleanup error - {}", cleanup_err);
484 }
485 return Err(err);
486 }
487 task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path());
488 } else {
489 task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path());
490 pull_snapshot(
491 worker,
492 reader,
493 tgt_store.clone(),
494 &snapshot,
495 downloaded_chunks,
496 )
497 .await?;
498 task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path());
499 }
500
501 Ok(())
502 }
503
504 struct SkipInfo {
505 oldest: i64,
506 newest: i64,
507 count: u64,
508 }
509
510 impl SkipInfo {
511 fn update(&mut self, backup_time: i64) {
512 self.count += 1;
513
514 if backup_time < self.oldest {
515 self.oldest = backup_time;
516 }
517
518 if backup_time > self.newest {
519 self.newest = backup_time;
520 }
521 }
522
523 fn affected(&self) -> Result<String, Error> {
524 match self.count {
525 0 => Ok(String::new()),
526 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
527 _ => {
528 Ok(format!(
529 "{} .. {}",
530 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
531 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
532 ))
533 }
534 }
535 }
536 }
537
538 impl std::fmt::Display for SkipInfo {
539 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540 write!(
541 f,
542 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
543 self.count,
544 self.affected().map_err(|_| std::fmt::Error)?
545 )
546 }
547 }
548
549 pub async fn pull_group(
550 worker: &WorkerTask,
551 client: &HttpClient,
552 params: &PullParameters,
553 group: &BackupGroup,
554 progress: &mut StoreProgress,
555 ) -> Result<(), Error> {
556 let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
557
558 let args = json!({
559 "backup-type": group.backup_type(),
560 "backup-id": group.backup_id(),
561 });
562
563 let mut result = client.get(&path, Some(args)).await?;
564 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
565
566 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
567
568 client.login().await?; // make sure auth is complete
569
570 let fingerprint = client.fingerprint();
571
572 let last_sync = params.store.last_successful_backup(group)?;
573
574 let mut remote_snapshots = std::collections::HashSet::new();
575
576 // start with 16384 chunks (up to 65GB)
577 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
578
579 progress.group_snapshots = list.len() as u64;
580
581 let mut skip_info = SkipInfo {
582 oldest: i64::MAX,
583 newest: i64::MIN,
584 count: 0,
585 };
586
587 for (pos, item) in list.into_iter().enumerate() {
588 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
589
590 // in-progress backups can't be synced
591 if item.size.is_none() {
592 task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot);
593 continue;
594 }
595
596 let backup_time = snapshot.backup_time();
597
598 remote_snapshots.insert(backup_time);
599
600 if let Some(last_sync_time) = last_sync {
601 if last_sync_time > backup_time {
602 skip_info.update(backup_time);
603 continue;
604 }
605 }
606
607 // get updated auth_info (new tickets)
608 let auth_info = client.login().await?;
609
610 let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
611
612 let new_client = HttpClient::new(
613 params.source.host(),
614 params.source.port(),
615 params.source.auth_id(),
616 options,
617 )?;
618
619 let reader = BackupReader::start(
620 new_client,
621 None,
622 params.source.store(),
623 snapshot.group().backup_type(),
624 snapshot.group().backup_id(),
625 backup_time,
626 true,
627 )
628 .await?;
629
630 let result = pull_snapshot_from(
631 worker,
632 reader,
633 params.store.clone(),
634 &snapshot,
635 downloaded_chunks.clone(),
636 )
637 .await;
638
639 progress.done_snapshots = pos as u64 + 1;
640 task_log!(worker, "percentage done: {}", progress);
641
642 result?; // stop on error
643 }
644
645 if params.remove_vanished {
646 let local_list = group.list_backups(&params.store.base_path())?;
647 for info in local_list {
648 let backup_time = info.backup_dir.backup_time();
649 if remote_snapshots.contains(&backup_time) {
650 continue;
651 }
652 if info.backup_dir.is_protected(params.store.base_path()) {
653 task_log!(
654 worker,
655 "don't delete vanished snapshot {:?} (protected)",
656 info.backup_dir.relative_path()
657 );
658 continue;
659 }
660 task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
661 params.store.remove_backup_dir(&info.backup_dir, false)?;
662 }
663 }
664
665 if skip_info.count > 0 {
666 task_log!(worker, "{}", skip_info);
667 }
668
669 Ok(())
670 }
671
672 pub async fn pull_store(
673 worker: &WorkerTask,
674 client: &HttpClient,
675 params: &PullParameters,
676 ) -> Result<(), Error> {
677 // explicit create shared lock to prevent GC on newly created chunks
678 let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
679
680 let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
681
682 let mut result = client
683 .get(&path, None)
684 .await
685 .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
686
687 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
688
689 let total_count = list.len();
690 list.sort_unstable_by(|a, b| {
691 let type_order = a.backup_type.cmp(&b.backup_type);
692 if type_order == std::cmp::Ordering::Equal {
693 a.backup_id.cmp(&b.backup_id)
694 } else {
695 type_order
696 }
697 });
698
699 let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
700 filters
701 .iter()
702 .any(|filter| group.matches(filter))
703 };
704
705 let list:Vec<BackupGroup> = list
706 .into_iter()
707 .map(|item| BackupGroup::new(item.backup_type, item.backup_id))
708 .collect();
709
710 let list = if let Some(ref group_filter) = &params.group_filter {
711 let unfiltered_count = list.len();
712 let list:Vec<BackupGroup> = list
713 .into_iter()
714 .filter(|group| {
715 apply_filters(&group, group_filter)
716 })
717 .collect();
718 task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count);
719 list
720 } else {
721 task_log!(worker, "found {} groups to sync", total_count);
722 list
723 };
724
725 let mut errors = false;
726
727 let mut new_groups = std::collections::HashSet::new();
728 for group in list.iter() {
729 new_groups.insert(group.clone());
730 }
731
732 let mut progress = StoreProgress::new(list.len() as u64);
733
734 for (done, group) in list.into_iter().enumerate() {
735 progress.done_groups = done as u64;
736 progress.done_snapshots = 0;
737 progress.group_snapshots = 0;
738
739 let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, &params.owner) {
740 Ok(result) => result,
741 Err(err) => {
742 task_log!(
743 worker,
744 "sync group {} failed - group lock failed: {}",
745 &group, err
746 );
747 errors = true; // do not stop here, instead continue
748 continue;
749 }
750 };
751
752 // permission check
753 if params.owner != owner {
754 // only the owner is allowed to create additional snapshots
755 task_log!(
756 worker,
757 "sync group {} failed - owner check failed ({} != {})",
758 &group, params.owner, owner
759 );
760 errors = true; // do not stop here, instead continue
761 } else if let Err(err) = pull_group(
762 worker,
763 client,
764 params,
765 &group,
766 &mut progress,
767 )
768 .await
769 {
770 task_log!(
771 worker,
772 "sync group {} failed - {}",
773 &group, err,
774 );
775 errors = true; // do not stop here, instead continue
776 }
777 }
778
779 if params.remove_vanished {
780 let result: Result<(), Error> = proxmox_lang::try_block!({
781 let local_groups = BackupInfo::list_backup_groups(&params.store.base_path())?;
782 for local_group in local_groups {
783 if new_groups.contains(&local_group) {
784 continue;
785 }
786 if let Some(ref group_filter) = &params.group_filter {
787 if !apply_filters(&local_group, group_filter) {
788 continue;
789 }
790 }
791 task_log!(
792 worker,
793 "delete vanished group '{}/{}'",
794 local_group.backup_type(),
795 local_group.backup_id()
796 );
797 match params.store.remove_backup_group(&local_group) {
798 Ok(true) => {},
799 Ok(false) => {
800 task_log!(worker, "kept some protected snapshots of group '{}'", local_group);
801 },
802 Err(err) => {
803 task_log!(worker, "{}", err);
804 errors = true;
805 }
806 }
807 }
808 Ok(())
809 });
810 if let Err(err) = result {
811 task_log!(worker, "error during cleanup: {}", err);
812 errors = true;
813 };
814 }
815
816 if errors {
817 bail!("sync failed with some errors.");
818 }
819
820 Ok(())
821 }