]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/pull.rs
split out pbs-runtime module
[proxmox-backup.git] / src / client / pull.rs
CommitLineData
07ad6470
DM
1//! Sync datastore from remote server
2
3use anyhow::{bail, format_err, Error};
4use serde_json::json;
e2956c60 5use std::collections::{HashMap, HashSet};
07ad6470 6use std::convert::TryFrom;
07ad6470 7use std::io::{Seek, SeekFrom};
54417086 8use std::sync::atomic::{AtomicUsize, Ordering};
e2956c60
FG
9use std::sync::{Arc, Mutex};
10use std::time::SystemTime;
07ad6470 11
1bc1d81a 12use crate::{
1bc1d81a 13 api2::types::*,
e2956c60 14 backup::*,
1bc1d81a 15 client::*,
e2956c60 16 server::WorkerTask,
d2354a16 17 task_log,
e2956c60 18 tools::{compute_file_csum, ParallelHandler},
1bc1d81a 19};
e2956c60 20use proxmox::api::error::{HttpError, StatusCode};
07ad6470
DM
21
22// fixme: implement filters
23// fixme: delete vanished groups
24// Todo: correctly lock backup groups
25
26async fn pull_index_chunks<I: IndexFile>(
998db639 27 worker: &WorkerTask,
73b2cc49 28 chunk_reader: RemoteChunkReader,
07ad6470
DM
29 target: Arc<DataStore>,
30 index: I,
e2956c60 31 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 32) -> Result<(), Error> {
73b2cc49 33 use futures::stream::{self, StreamExt, TryStreamExt};
07ad6470 34
998db639
DM
35 let start_time = SystemTime::now();
36
ebbe4958
DM
37 let stream = stream::iter(
38 (0..index.index_count())
39 .map(|pos| index.chunk_info(pos).unwrap())
40 .filter(|info| {
41 let mut guard = downloaded_chunks.lock().unwrap();
42 let done = guard.contains(&info.digest);
43 if !done {
44 // Note: We mark a chunk as downloaded before its actually downloaded
45 // to avoid duplicate downloads.
46 guard.insert(info.digest);
47 }
48 !done
e2956c60 49 }),
ebbe4958 50 );
07ad6470 51
a71bc08f 52 let target2 = target.clone();
54417086 53 let verify_pool = ParallelHandler::new(
e2956c60
FG
54 "sync chunk writer",
55 4,
56 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
54417086
DM
57 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
58 chunk.verify_unencrypted(size as usize, &digest)?;
a71bc08f 59 target2.insert_chunk(&chunk, &digest)?;
54417086 60 Ok(())
e2956c60 61 },
54417086
DM
62 );
63
64 let verify_and_write_channel = verify_pool.channel();
998db639 65
54417086
DM
66 let bytes = Arc::new(AtomicUsize::new(0));
67
68 stream
73b2cc49 69 .map(|info| {
73b2cc49
DM
70 let target = Arc::clone(&target);
71 let chunk_reader = chunk_reader.clone();
54417086
DM
72 let bytes = Arc::clone(&bytes);
73 let verify_and_write_channel = verify_and_write_channel.clone();
73b2cc49
DM
74
75 Ok::<_, Error>(async move {
d420962f 76 let chunk_exists = pbs_runtime::block_in_place(|| {
e2956c60
FG
77 target.cond_touch_chunk(&info.digest, false)
78 })?;
73b2cc49
DM
79 if chunk_exists {
80 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
81 return Ok::<_, Error>(());
82 }
83 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
84 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
54417086 85 let raw_size = chunk.raw_size() as usize;
73b2cc49 86
998db639 87 // decode, verify and write in a separate threads to maximize throughput
d420962f 88 pbs_runtime::block_in_place(|| {
e2956c60
FG
89 verify_and_write_channel.send((chunk, info.digest, info.size()))
90 })?;
54417086
DM
91
92 bytes.fetch_add(raw_size, Ordering::SeqCst);
998db639
DM
93
94 Ok(())
e2956c60 95 })
73b2cc49
DM
96 })
97 .try_buffer_unordered(20)
98 .try_for_each(|_res| futures::future::ok(()))
54417086 99 .await?;
998db639 100
54417086 101 drop(verify_and_write_channel);
998db639 102
54417086 103 verify_pool.complete()?;
998db639
DM
104
105 let elapsed = start_time.elapsed()?.as_secs_f64();
106
54417086
DM
107 let bytes = bytes.load(Ordering::SeqCst);
108
e2956c60
FG
109 worker.log(format!(
110 "downloaded {} bytes ({:.2} MiB/s)",
111 bytes,
112 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
113 ));
07ad6470
DM
114
115 Ok(())
116}
117
118async fn download_manifest(
119 reader: &BackupReader,
120 filename: &std::path::Path,
121) -> Result<std::fs::File, Error> {
3d571d55 122 let mut tmp_manifest_file = std::fs::OpenOptions::new()
07ad6470
DM
123 .write(true)
124 .create(true)
194da6f8 125 .truncate(true)
07ad6470
DM
126 .read(true)
127 .open(&filename)?;
128
e2956c60
FG
129 reader
130 .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
131 .await?;
07ad6470
DM
132
133 tmp_manifest_file.seek(SeekFrom::Start(0))?;
134
135 Ok(tmp_manifest_file)
136}
137
e2956c60 138fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
2ce15934 139 if size != info.size {
e2956c60
FG
140 bail!(
141 "wrong size for file '{}' ({} != {})",
142 info.filename,
143 info.size,
144 size
145 );
2ce15934
FG
146 }
147
148 if csum != &info.csum {
149 bail!("wrong checksum for file '{}'", info.filename);
150 }
151
152 Ok(())
153}
154
07ad6470
DM
155async fn pull_single_archive(
156 worker: &WorkerTask,
157 reader: &BackupReader,
158 chunk_reader: &mut RemoteChunkReader,
159 tgt_store: Arc<DataStore>,
160 snapshot: &BackupDir,
2ce15934 161 archive_info: &FileInfo,
e2956c60 162 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 163) -> Result<(), Error> {
2ce15934 164 let archive_name = &archive_info.filename;
07ad6470
DM
165 let mut path = tgt_store.base_path();
166 path.push(snapshot.relative_path());
167 path.push(archive_name);
168
169 let mut tmp_path = path.clone();
170 tmp_path.set_extension("tmp");
171
172 worker.log(format!("sync archive {}", archive_name));
3d571d55 173 let mut tmpfile = std::fs::OpenOptions::new()
07ad6470
DM
174 .write(true)
175 .create(true)
176 .read(true)
177 .open(&tmp_path)?;
178
3d571d55 179 reader.download(archive_name, &mut tmpfile).await?;
07ad6470
DM
180
181 match archive_type(archive_name)? {
182 ArchiveType::DynamicIndex => {
e2956c60
FG
183 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
184 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
185 })?;
2ce15934
FG
186 let (csum, size) = index.compute_csum();
187 verify_archive(archive_info, &csum, size)?;
07ad6470 188
e2956c60
FG
189 pull_index_chunks(
190 worker,
191 chunk_reader.clone(),
192 tgt_store.clone(),
193 index,
194 downloaded_chunks,
195 )
196 .await?;
07ad6470
DM
197 }
198 ArchiveType::FixedIndex => {
e2956c60
FG
199 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
200 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
201 })?;
2ce15934
FG
202 let (csum, size) = index.compute_csum();
203 verify_archive(archive_info, &csum, size)?;
07ad6470 204
e2956c60
FG
205 pull_index_chunks(
206 worker,
207 chunk_reader.clone(),
208 tgt_store.clone(),
209 index,
210 downloaded_chunks,
211 )
212 .await?;
07ad6470 213 }
2ce15934
FG
214 ArchiveType::Blob => {
215 let (csum, size) = compute_file_csum(&mut tmpfile)?;
216 verify_archive(archive_info, &csum, size)?;
217 }
07ad6470
DM
218 }
219 if let Err(err) = std::fs::rename(&tmp_path, &path) {
220 bail!("Atomic rename file {:?} failed - {}", path, err);
221 }
222 Ok(())
223}
224
1610c45a
DM
225// Note: The client.log.blob is uploaded after the backup, so it is
226// not mentioned in the manifest.
227async fn try_client_log_download(
228 worker: &WorkerTask,
229 reader: Arc<BackupReader>,
230 path: &std::path::Path,
231) -> Result<(), Error> {
1610c45a
DM
232 let mut tmp_path = path.to_owned();
233 tmp_path.set_extension("tmp");
234
235 let tmpfile = std::fs::OpenOptions::new()
236 .write(true)
237 .create(true)
238 .read(true)
239 .open(&tmp_path)?;
240
add5861e 241 // Note: be silent if there is no log - only log successful download
3d571d55 242 if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
1610c45a
DM
243 if let Err(err) = std::fs::rename(&tmp_path, &path) {
244 bail!("Atomic rename file {:?} failed - {}", path, err);
245 }
add5861e 246 worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME));
1610c45a
DM
247 }
248
249 Ok(())
250}
251
07ad6470
DM
252async fn pull_snapshot(
253 worker: &WorkerTask,
254 reader: Arc<BackupReader>,
255 tgt_store: Arc<DataStore>,
256 snapshot: &BackupDir,
e2956c60 257 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 258) -> Result<(), Error> {
07ad6470
DM
259 let mut manifest_name = tgt_store.base_path();
260 manifest_name.push(snapshot.relative_path());
261 manifest_name.push(MANIFEST_BLOB_NAME);
262
1610c45a
DM
263 let mut client_log_name = tgt_store.base_path();
264 client_log_name.push(snapshot.relative_path());
265 client_log_name.push(CLIENT_LOG_BLOB_NAME);
266
07ad6470
DM
267 let mut tmp_manifest_name = manifest_name.clone();
268 tmp_manifest_name.set_extension("tmp");
269
c1c4a18f
FG
270 let download_res = download_manifest(&reader, &tmp_manifest_name).await;
271 let mut tmp_manifest_file = match download_res {
272 Ok(manifest_file) => manifest_file,
273 Err(err) => {
274 match err.downcast_ref::<HttpError>() {
e2956c60
FG
275 Some(HttpError { code, message }) => match *code {
276 StatusCode::NOT_FOUND => {
277 worker.log(format!(
278 "skipping snapshot {} - vanished since start of sync",
279 snapshot
280 ));
281 return Ok(());
282 }
283 _ => {
284 bail!("HTTP error {} - {}", code, message);
c1c4a18f
FG
285 }
286 },
287 None => {
288 return Err(err);
e2956c60 289 }
c1c4a18f 290 };
e2956c60 291 }
c1c4a18f 292 };
39f18b30 293 let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
07ad6470
DM
294
295 if manifest_name.exists() {
296 let manifest_blob = proxmox::try_block!({
e2956c60
FG
297 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
298 format_err!(
299 "unable to open local manifest {:?} - {}",
300 manifest_name,
301 err
302 )
303 })?;
07ad6470 304
39f18b30 305 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
07ad6470 306 Ok(manifest_blob)
e2956c60
FG
307 })
308 .map_err(|err: Error| {
309 format_err!(
310 "unable to read local manifest {:?} - {}",
311 manifest_name,
312 err
313 )
07ad6470
DM
314 })?;
315
316 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
1610c45a
DM
317 if !client_log_name.exists() {
318 try_client_log_download(worker, reader, &client_log_name).await?;
319 }
320 worker.log("no data changes");
e0085e66 321 let _ = std::fs::remove_file(&tmp_manifest_name);
07ad6470
DM
322 return Ok(()); // nothing changed
323 }
324 }
325
326 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
327
07ad6470
DM
328 for item in manifest.files() {
329 let mut path = tgt_store.base_path();
330 path.push(snapshot.relative_path());
331 path.push(&item.filename);
332
333 if path.exists() {
334 match archive_type(&item.filename)? {
335 ArchiveType::DynamicIndex => {
336 let index = DynamicIndexReader::open(&path)?;
337 let (csum, size) = index.compute_csum();
338 match manifest.verify_file(&item.filename, &csum, size) {
339 Ok(_) => continue,
340 Err(err) => {
341 worker.log(format!("detected changed file {:?} - {}", path, err));
342 }
343 }
344 }
345 ArchiveType::FixedIndex => {
346 let index = FixedIndexReader::open(&path)?;
347 let (csum, size) = index.compute_csum();
348 match manifest.verify_file(&item.filename, &csum, size) {
349 Ok(_) => continue,
350 Err(err) => {
351 worker.log(format!("detected changed file {:?} - {}", path, err));
352 }
353 }
354 }
355 ArchiveType::Blob => {
356 let mut tmpfile = std::fs::File::open(&path)?;
357 let (csum, size) = compute_file_csum(&mut tmpfile)?;
358 match manifest.verify_file(&item.filename, &csum, size) {
359 Ok(_) => continue,
360 Err(err) => {
361 worker.log(format!("detected changed file {:?} - {}", path, err));
362 }
363 }
364 }
365 }
366 }
367
e2956c60
FG
368 let mut chunk_reader = RemoteChunkReader::new(
369 reader.clone(),
370 None,
371 item.chunk_crypt_mode(),
372 HashMap::new(),
373 );
14f6c9cb 374
07ad6470
DM
375 pull_single_archive(
376 worker,
377 &reader,
378 &mut chunk_reader,
379 tgt_store.clone(),
380 snapshot,
2ce15934 381 &item,
ebbe4958 382 downloaded_chunks.clone(),
e2956c60
FG
383 )
384 .await?;
07ad6470
DM
385 }
386
387 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
388 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
389 }
390
1610c45a
DM
391 if !client_log_name.exists() {
392 try_client_log_download(worker, reader, &client_log_name).await?;
393 }
394
07ad6470
DM
395 // cleanup - remove stale files
396 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
397
398 Ok(())
399}
400
401pub async fn pull_snapshot_from(
402 worker: &WorkerTask,
403 reader: Arc<BackupReader>,
404 tgt_store: Arc<DataStore>,
405 snapshot: &BackupDir,
e2956c60 406 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 407) -> Result<(), Error> {
f23f7543 408 let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
07ad6470
DM
409
410 if is_new {
411 worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
412
e2956c60
FG
413 if let Err(err) = pull_snapshot(
414 worker,
415 reader,
416 tgt_store.clone(),
417 &snapshot,
418 downloaded_chunks,
419 )
420 .await
421 {
c9756b40 422 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
07ad6470
DM
423 worker.log(format!("cleanup error - {}", cleanup_err));
424 }
425 return Err(err);
426 }
4856a218 427 worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
07ad6470
DM
428 } else {
429 worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
e2956c60
FG
430 pull_snapshot(
431 worker,
432 reader,
433 tgt_store.clone(),
434 &snapshot,
435 downloaded_chunks,
436 )
437 .await?;
438 worker.log(format!(
439 "re-sync snapshot {:?} done",
440 snapshot.relative_path()
441 ));
07ad6470
DM
442 }
443
444 Ok(())
445}
446
d2354a16
DC
447struct SkipInfo {
448 oldest: i64,
449 newest: i64,
450 count: u64,
451}
452
453impl SkipInfo {
454 fn update(&mut self, backup_time: i64) {
455 self.count += 1;
456
457 if backup_time < self.oldest {
458 self.oldest = backup_time;
459 }
460
461 if backup_time > self.newest {
462 self.newest = backup_time;
463 }
464 }
465
466 fn affected(&self) -> Result<String, Error> {
467 match self.count {
468 0 => Ok(String::new()),
469 1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest),
470 _ => {
471 Ok(format!(
472 "{} .. {}",
473 proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?,
474 proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?,
475 ))
476 }
477 }
478 }
479}
480
481impl std::fmt::Display for SkipInfo {
482 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
483 write!(
484 f,
485 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
486 self.count,
487 self.affected().map_err(|_| std::fmt::Error)?
488 )
489 }
490}
491
07ad6470
DM
492pub async fn pull_group(
493 worker: &WorkerTask,
494 client: &HttpClient,
495 src_repo: &BackupRepository,
496 tgt_store: Arc<DataStore>,
497 group: &BackupGroup,
498 delete: bool,
fc8920e3 499 progress: &mut StoreProgress,
07ad6470 500) -> Result<(), Error> {
07ad6470
DM
501 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
502
503 let args = json!({
504 "backup-type": group.backup_type(),
505 "backup-id": group.backup_id(),
506 });
507
508 let mut result = client.get(&path, Some(args)).await?;
509 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
510
511 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
512
0081903f
DM
513 client.login().await?; // make sure auth is complete
514
07ad6470
DM
515 let fingerprint = client.fingerprint();
516
517 let last_sync = tgt_store.last_successful_backup(group)?;
518
519 let mut remote_snapshots = std::collections::HashSet::new();
520
ebbe4958 521 // start with 16384 chunks (up to 65GB)
e2956c60 522 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
ebbe4958 523
fc8920e3 524 progress.group_snapshots = list.len() as u64;
7b8aa893 525
d2354a16
DC
526 let mut skip_info = SkipInfo {
527 oldest: i64::MAX,
528 newest: i64::MIN,
529 count: 0,
530 };
531
7b8aa893 532 for (pos, item) in list.into_iter().enumerate() {
e0e5b442 533 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
86f6f741
FG
534
535 // in-progress backups can't be synced
54aec2fa 536 if item.size.is_none() {
e2956c60
FG
537 worker.log(format!(
538 "skipping snapshot {} - in-progress backup",
539 snapshot
540 ));
86f6f741
FG
541 continue;
542 }
543
544 let backup_time = snapshot.backup_time();
545
07ad6470
DM
546 remote_snapshots.insert(backup_time);
547
548 if let Some(last_sync_time) = last_sync {
e2956c60 549 if last_sync_time > backup_time {
d2354a16 550 skip_info.update(backup_time);
e2956c60
FG
551 continue;
552 }
07ad6470
DM
553 }
554
0081903f
DM
555 // get updated auth_info (new tickets)
556 let auth_info = client.login().await?;
557
93e3581c 558 let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
07ad6470 559
e2956c60
FG
560 let new_client = HttpClient::new(
561 src_repo.host(),
562 src_repo.port(),
563 src_repo.auth_id(),
564 options,
565 )?;
07ad6470
DM
566
567 let reader = BackupReader::start(
568 new_client,
569 None,
570 src_repo.store(),
86f6f741
FG
571 snapshot.group().backup_type(),
572 snapshot.group().backup_id(),
07ad6470
DM
573 backup_time,
574 true,
e2956c60
FG
575 )
576 .await?;
07ad6470 577
e2956c60
FG
578 let result = pull_snapshot_from(
579 worker,
580 reader,
581 tgt_store.clone(),
582 &snapshot,
583 downloaded_chunks.clone(),
584 )
585 .await;
7b8aa893 586
fc8920e3 587 progress.done_snapshots = pos as u64 + 1;
6eff8dec 588 worker.log(format!("percentage done: {}", progress));
7b8aa893
DM
589
590 result?; // stop on error
07ad6470
DM
591 }
592
593 if delete {
594 let local_list = group.list_backups(&tgt_store.base_path())?;
595 for info in local_list {
596 let backup_time = info.backup_dir.backup_time();
e2956c60
FG
597 if remote_snapshots.contains(&backup_time) {
598 continue;
599 }
600 worker.log(format!(
601 "delete vanished snapshot {:?}",
602 info.backup_dir.relative_path()
603 ));
c9756b40 604 tgt_store.remove_backup_dir(&info.backup_dir, false)?;
07ad6470
DM
605 }
606 }
607
d2354a16
DC
608 if skip_info.count > 0 {
609 task_log!(worker, "{}", skip_info);
610 }
611
07ad6470
DM
612 Ok(())
613}
614
615pub async fn pull_store(
616 worker: &WorkerTask,
617 client: &HttpClient,
618 src_repo: &BackupRepository,
619 tgt_store: Arc<DataStore>,
620 delete: bool,
e6dc35ac 621 auth_id: Authid,
07ad6470 622) -> Result<(), Error> {
07ad6470
DM
623 // explicit create shared lock to prevent GC on newly created chunks
624 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
625
626 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
627
44de5bcc
FG
628 let mut result = client
629 .get(&path, None)
630 .await
631 .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
07ad6470
DM
632
633 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
634
f6e28f4e
DC
635 worker.log(format!("found {} groups to sync", list.len()));
636
07ad6470
DM
637 list.sort_unstable_by(|a, b| {
638 let type_order = a.backup_type.cmp(&b.backup_type);
639 if type_order == std::cmp::Ordering::Equal {
640 a.backup_id.cmp(&b.backup_id)
641 } else {
642 type_order
643 }
644 });
645
646 let mut errors = false;
647
648 let mut new_groups = std::collections::HashSet::new();
649 for item in list.iter() {
650 new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
651 }
652
fc8920e3
FG
653 let mut progress = StoreProgress::new(list.len() as u64);
654
655 for (done, item) in list.into_iter().enumerate() {
656 progress.done_groups = done as u64;
657 progress.done_snapshots = 0;
658 progress.group_snapshots = 0;
7b8aa893 659
07ad6470
DM
660 let group = BackupGroup::new(&item.backup_type, &item.backup_id);
661
30f73fa2
DM
662 let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
663 Ok(result) => result,
664 Err(err) => {
e2956c60
FG
665 worker.log(format!(
666 "sync group {}/{} failed - group lock failed: {}",
667 item.backup_type, item.backup_id, err
668 ));
30f73fa2
DM
669 errors = true; // do not stop here, instead continue
670 continue;
671 }
672 };
673
07ad6470 674 // permission check
e2956c60
FG
675 if auth_id != owner {
676 // only the owner is allowed to create additional snapshots
677 worker.log(format!(
678 "sync group {}/{} failed - owner check failed ({} != {})",
679 item.backup_type, item.backup_id, auth_id, owner
680 ));
7b8aa893 681 errors = true; // do not stop here, instead continue
20813274
WB
682 } else if let Err(err) = pull_group(
683 worker,
684 client,
685 src_repo,
686 tgt_store.clone(),
687 &group,
688 delete,
fc8920e3 689 &mut progress,
e2956c60
FG
690 )
691 .await
692 {
20813274
WB
693 worker.log(format!(
694 "sync group {}/{} failed - {}",
e2956c60 695 item.backup_type, item.backup_id, err,
20813274
WB
696 ));
697 errors = true; // do not stop here, instead continue
07ad6470
DM
698 }
699 }
700
701 if delete {
702 let result: Result<(), Error> = proxmox::try_block!({
7f3b0f67 703 let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
07ad6470 704 for local_group in local_groups {
e2956c60
FG
705 if new_groups.contains(&local_group) {
706 continue;
707 }
708 worker.log(format!(
709 "delete vanished group '{}/{}'",
710 local_group.backup_type(),
711 local_group.backup_id()
712 ));
07ad6470
DM
713 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
714 worker.log(err.to_string());
715 errors = true;
716 }
717 }
718 Ok(())
719 });
720 if let Err(err) = result {
721 worker.log(format!("error during cleanup: {}", err));
722 errors = true;
723 };
724 }
725
726 if errors {
727 bail!("sync failed with some errors.");
728 }
729
730 Ok(())
731}