]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/pull.rs
move chunk_store to pbs-datastore
[proxmox-backup.git] / src / client / pull.rs
CommitLineData
07ad6470
DM
1//! Sync datastore from remote server
2
e2956c60 3use std::collections::{HashMap, HashSet};
07ad6470 4use std::convert::TryFrom;
07ad6470 5use std::io::{Seek, SeekFrom};
54417086 6use std::sync::atomic::{AtomicUsize, Ordering};
e2956c60
FG
7use std::sync::{Arc, Mutex};
8use std::time::SystemTime;
07ad6470 9
c23192d3
WB
10use anyhow::{bail, format_err, Error};
11use serde_json::json;
12
13use proxmox::api::error::{HttpError, StatusCode};
14
15use pbs_datastore::task_log;
16
1bc1d81a 17use crate::{
1bc1d81a 18 api2::types::*,
e2956c60 19 backup::*,
1bc1d81a 20 client::*,
e2956c60
FG
21 server::WorkerTask,
22 tools::{compute_file_csum, ParallelHandler},
1bc1d81a 23};
07ad6470
DM
24
25// fixme: implement filters
26// fixme: delete vanished groups
27// Todo: correctly lock backup groups
28
29async fn pull_index_chunks<I: IndexFile>(
998db639 30 worker: &WorkerTask,
73b2cc49 31 chunk_reader: RemoteChunkReader,
07ad6470
DM
32 target: Arc<DataStore>,
33 index: I,
e2956c60 34 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 35) -> Result<(), Error> {
73b2cc49 36 use futures::stream::{self, StreamExt, TryStreamExt};
07ad6470 37
998db639
DM
38 let start_time = SystemTime::now();
39
ebbe4958
DM
40 let stream = stream::iter(
41 (0..index.index_count())
42 .map(|pos| index.chunk_info(pos).unwrap())
43 .filter(|info| {
44 let mut guard = downloaded_chunks.lock().unwrap();
45 let done = guard.contains(&info.digest);
46 if !done {
47 // Note: We mark a chunk as downloaded before its actually downloaded
48 // to avoid duplicate downloads.
49 guard.insert(info.digest);
50 }
51 !done
e2956c60 52 }),
ebbe4958 53 );
07ad6470 54
a71bc08f 55 let target2 = target.clone();
54417086 56 let verify_pool = ParallelHandler::new(
e2956c60
FG
57 "sync chunk writer",
58 4,
59 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
54417086
DM
60 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
61 chunk.verify_unencrypted(size as usize, &digest)?;
a71bc08f 62 target2.insert_chunk(&chunk, &digest)?;
54417086 63 Ok(())
e2956c60 64 },
54417086
DM
65 );
66
67 let verify_and_write_channel = verify_pool.channel();
998db639 68
54417086
DM
69 let bytes = Arc::new(AtomicUsize::new(0));
70
71 stream
73b2cc49 72 .map(|info| {
73b2cc49
DM
73 let target = Arc::clone(&target);
74 let chunk_reader = chunk_reader.clone();
54417086
DM
75 let bytes = Arc::clone(&bytes);
76 let verify_and_write_channel = verify_and_write_channel.clone();
73b2cc49
DM
77
78 Ok::<_, Error>(async move {
d420962f 79 let chunk_exists = pbs_runtime::block_in_place(|| {
e2956c60
FG
80 target.cond_touch_chunk(&info.digest, false)
81 })?;
73b2cc49
DM
82 if chunk_exists {
83 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
84 return Ok::<_, Error>(());
85 }
86 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
87 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
54417086 88 let raw_size = chunk.raw_size() as usize;
73b2cc49 89
998db639 90 // decode, verify and write in a separate threads to maximize throughput
d420962f 91 pbs_runtime::block_in_place(|| {
e2956c60
FG
92 verify_and_write_channel.send((chunk, info.digest, info.size()))
93 })?;
54417086
DM
94
95 bytes.fetch_add(raw_size, Ordering::SeqCst);
998db639
DM
96
97 Ok(())
e2956c60 98 })
73b2cc49
DM
99 })
100 .try_buffer_unordered(20)
101 .try_for_each(|_res| futures::future::ok(()))
54417086 102 .await?;
998db639 103
54417086 104 drop(verify_and_write_channel);
998db639 105
54417086 106 verify_pool.complete()?;
998db639
DM
107
108 let elapsed = start_time.elapsed()?.as_secs_f64();
109
54417086
DM
110 let bytes = bytes.load(Ordering::SeqCst);
111
e2956c60
FG
112 worker.log(format!(
113 "downloaded {} bytes ({:.2} MiB/s)",
114 bytes,
115 (bytes as f64) / (1024.0 * 1024.0 * elapsed)
116 ));
07ad6470
DM
117
118 Ok(())
119}
120
121async fn download_manifest(
122 reader: &BackupReader,
123 filename: &std::path::Path,
124) -> Result<std::fs::File, Error> {
3d571d55 125 let mut tmp_manifest_file = std::fs::OpenOptions::new()
07ad6470
DM
126 .write(true)
127 .create(true)
194da6f8 128 .truncate(true)
07ad6470
DM
129 .read(true)
130 .open(&filename)?;
131
e2956c60
FG
132 reader
133 .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
134 .await?;
07ad6470
DM
135
136 tmp_manifest_file.seek(SeekFrom::Start(0))?;
137
138 Ok(tmp_manifest_file)
139}
140
e2956c60 141fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
2ce15934 142 if size != info.size {
e2956c60
FG
143 bail!(
144 "wrong size for file '{}' ({} != {})",
145 info.filename,
146 info.size,
147 size
148 );
2ce15934
FG
149 }
150
151 if csum != &info.csum {
152 bail!("wrong checksum for file '{}'", info.filename);
153 }
154
155 Ok(())
156}
157
07ad6470
DM
158async fn pull_single_archive(
159 worker: &WorkerTask,
160 reader: &BackupReader,
161 chunk_reader: &mut RemoteChunkReader,
162 tgt_store: Arc<DataStore>,
163 snapshot: &BackupDir,
2ce15934 164 archive_info: &FileInfo,
e2956c60 165 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 166) -> Result<(), Error> {
2ce15934 167 let archive_name = &archive_info.filename;
07ad6470
DM
168 let mut path = tgt_store.base_path();
169 path.push(snapshot.relative_path());
170 path.push(archive_name);
171
172 let mut tmp_path = path.clone();
173 tmp_path.set_extension("tmp");
174
175 worker.log(format!("sync archive {}", archive_name));
3d571d55 176 let mut tmpfile = std::fs::OpenOptions::new()
07ad6470
DM
177 .write(true)
178 .create(true)
179 .read(true)
180 .open(&tmp_path)?;
181
3d571d55 182 reader.download(archive_name, &mut tmpfile).await?;
07ad6470
DM
183
184 match archive_type(archive_name)? {
185 ArchiveType::DynamicIndex => {
e2956c60
FG
186 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
187 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
188 })?;
2ce15934
FG
189 let (csum, size) = index.compute_csum();
190 verify_archive(archive_info, &csum, size)?;
07ad6470 191
e2956c60
FG
192 pull_index_chunks(
193 worker,
194 chunk_reader.clone(),
195 tgt_store.clone(),
196 index,
197 downloaded_chunks,
198 )
199 .await?;
07ad6470
DM
200 }
201 ArchiveType::FixedIndex => {
e2956c60
FG
202 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
203 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
204 })?;
2ce15934
FG
205 let (csum, size) = index.compute_csum();
206 verify_archive(archive_info, &csum, size)?;
07ad6470 207
e2956c60
FG
208 pull_index_chunks(
209 worker,
210 chunk_reader.clone(),
211 tgt_store.clone(),
212 index,
213 downloaded_chunks,
214 )
215 .await?;
07ad6470 216 }
2ce15934
FG
217 ArchiveType::Blob => {
218 let (csum, size) = compute_file_csum(&mut tmpfile)?;
219 verify_archive(archive_info, &csum, size)?;
220 }
07ad6470
DM
221 }
222 if let Err(err) = std::fs::rename(&tmp_path, &path) {
223 bail!("Atomic rename file {:?} failed - {}", path, err);
224 }
225 Ok(())
226}
227
1610c45a
DM
228// Note: The client.log.blob is uploaded after the backup, so it is
229// not mentioned in the manifest.
230async fn try_client_log_download(
231 worker: &WorkerTask,
232 reader: Arc<BackupReader>,
233 path: &std::path::Path,
234) -> Result<(), Error> {
1610c45a
DM
235 let mut tmp_path = path.to_owned();
236 tmp_path.set_extension("tmp");
237
238 let tmpfile = std::fs::OpenOptions::new()
239 .write(true)
240 .create(true)
241 .read(true)
242 .open(&tmp_path)?;
243
add5861e 244 // Note: be silent if there is no log - only log successful download
3d571d55 245 if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
1610c45a
DM
246 if let Err(err) = std::fs::rename(&tmp_path, &path) {
247 bail!("Atomic rename file {:?} failed - {}", path, err);
248 }
add5861e 249 worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME));
1610c45a
DM
250 }
251
252 Ok(())
253}
254
07ad6470
DM
255async fn pull_snapshot(
256 worker: &WorkerTask,
257 reader: Arc<BackupReader>,
258 tgt_store: Arc<DataStore>,
259 snapshot: &BackupDir,
e2956c60 260 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 261) -> Result<(), Error> {
07ad6470
DM
262 let mut manifest_name = tgt_store.base_path();
263 manifest_name.push(snapshot.relative_path());
264 manifest_name.push(MANIFEST_BLOB_NAME);
265
1610c45a
DM
266 let mut client_log_name = tgt_store.base_path();
267 client_log_name.push(snapshot.relative_path());
268 client_log_name.push(CLIENT_LOG_BLOB_NAME);
269
07ad6470
DM
270 let mut tmp_manifest_name = manifest_name.clone();
271 tmp_manifest_name.set_extension("tmp");
272
c1c4a18f
FG
273 let download_res = download_manifest(&reader, &tmp_manifest_name).await;
274 let mut tmp_manifest_file = match download_res {
275 Ok(manifest_file) => manifest_file,
276 Err(err) => {
277 match err.downcast_ref::<HttpError>() {
e2956c60
FG
278 Some(HttpError { code, message }) => match *code {
279 StatusCode::NOT_FOUND => {
280 worker.log(format!(
281 "skipping snapshot {} - vanished since start of sync",
282 snapshot
283 ));
284 return Ok(());
285 }
286 _ => {
287 bail!("HTTP error {} - {}", code, message);
c1c4a18f
FG
288 }
289 },
290 None => {
291 return Err(err);
e2956c60 292 }
c1c4a18f 293 };
e2956c60 294 }
c1c4a18f 295 };
39f18b30 296 let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
07ad6470
DM
297
298 if manifest_name.exists() {
299 let manifest_blob = proxmox::try_block!({
e2956c60
FG
300 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
301 format_err!(
302 "unable to open local manifest {:?} - {}",
303 manifest_name,
304 err
305 )
306 })?;
07ad6470 307
39f18b30 308 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
07ad6470 309 Ok(manifest_blob)
e2956c60
FG
310 })
311 .map_err(|err: Error| {
312 format_err!(
313 "unable to read local manifest {:?} - {}",
314 manifest_name,
315 err
316 )
07ad6470
DM
317 })?;
318
319 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
1610c45a
DM
320 if !client_log_name.exists() {
321 try_client_log_download(worker, reader, &client_log_name).await?;
322 }
323 worker.log("no data changes");
e0085e66 324 let _ = std::fs::remove_file(&tmp_manifest_name);
07ad6470
DM
325 return Ok(()); // nothing changed
326 }
327 }
328
329 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
330
07ad6470
DM
331 for item in manifest.files() {
332 let mut path = tgt_store.base_path();
333 path.push(snapshot.relative_path());
334 path.push(&item.filename);
335
336 if path.exists() {
337 match archive_type(&item.filename)? {
338 ArchiveType::DynamicIndex => {
339 let index = DynamicIndexReader::open(&path)?;
340 let (csum, size) = index.compute_csum();
341 match manifest.verify_file(&item.filename, &csum, size) {
342 Ok(_) => continue,
343 Err(err) => {
344 worker.log(format!("detected changed file {:?} - {}", path, err));
345 }
346 }
347 }
348 ArchiveType::FixedIndex => {
349 let index = FixedIndexReader::open(&path)?;
350 let (csum, size) = index.compute_csum();
351 match manifest.verify_file(&item.filename, &csum, size) {
352 Ok(_) => continue,
353 Err(err) => {
354 worker.log(format!("detected changed file {:?} - {}", path, err));
355 }
356 }
357 }
358 ArchiveType::Blob => {
359 let mut tmpfile = std::fs::File::open(&path)?;
360 let (csum, size) = compute_file_csum(&mut tmpfile)?;
361 match manifest.verify_file(&item.filename, &csum, size) {
362 Ok(_) => continue,
363 Err(err) => {
364 worker.log(format!("detected changed file {:?} - {}", path, err));
365 }
366 }
367 }
368 }
369 }
370
e2956c60
FG
371 let mut chunk_reader = RemoteChunkReader::new(
372 reader.clone(),
373 None,
374 item.chunk_crypt_mode(),
375 HashMap::new(),
376 );
14f6c9cb 377
07ad6470
DM
378 pull_single_archive(
379 worker,
380 &reader,
381 &mut chunk_reader,
382 tgt_store.clone(),
383 snapshot,
2ce15934 384 &item,
ebbe4958 385 downloaded_chunks.clone(),
e2956c60
FG
386 )
387 .await?;
07ad6470
DM
388 }
389
390 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
391 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
392 }
393
1610c45a
DM
394 if !client_log_name.exists() {
395 try_client_log_download(worker, reader, &client_log_name).await?;
396 }
397
07ad6470
DM
398 // cleanup - remove stale files
399 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
400
401 Ok(())
402}
403
404pub async fn pull_snapshot_from(
405 worker: &WorkerTask,
406 reader: Arc<BackupReader>,
407 tgt_store: Arc<DataStore>,
408 snapshot: &BackupDir,
e2956c60 409 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
07ad6470 410) -> Result<(), Error> {
f23f7543 411 let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
07ad6470
DM
412
413 if is_new {
414 worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
415
e2956c60
FG
416 if let Err(err) = pull_snapshot(
417 worker,
418 reader,
419 tgt_store.clone(),
420 &snapshot,
421 downloaded_chunks,
422 )
423 .await
424 {
c9756b40 425 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
07ad6470
DM
426 worker.log(format!("cleanup error - {}", cleanup_err));
427 }
428 return Err(err);
429 }
4856a218 430 worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
07ad6470
DM
431 } else {
432 worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
e2956c60
FG
433 pull_snapshot(
434 worker,
435 reader,
436 tgt_store.clone(),
437 &snapshot,
438 downloaded_chunks,
439 )
440 .await?;
441 worker.log(format!(
442 "re-sync snapshot {:?} done",
443 snapshot.relative_path()
444 ));
07ad6470
DM
445 }
446
447 Ok(())
448}
449
d2354a16
DC
450struct SkipInfo {
451 oldest: i64,
452 newest: i64,
453 count: u64,
454}
455
456impl SkipInfo {
457 fn update(&mut self, backup_time: i64) {
458 self.count += 1;
459
460 if backup_time < self.oldest {
461 self.oldest = backup_time;
462 }
463
464 if backup_time > self.newest {
465 self.newest = backup_time;
466 }
467 }
468
469 fn affected(&self) -> Result<String, Error> {
470 match self.count {
471 0 => Ok(String::new()),
472 1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest),
473 _ => {
474 Ok(format!(
475 "{} .. {}",
476 proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?,
477 proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?,
478 ))
479 }
480 }
481 }
482}
483
484impl std::fmt::Display for SkipInfo {
485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486 write!(
487 f,
488 "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
489 self.count,
490 self.affected().map_err(|_| std::fmt::Error)?
491 )
492 }
493}
494
07ad6470
DM
495pub async fn pull_group(
496 worker: &WorkerTask,
497 client: &HttpClient,
498 src_repo: &BackupRepository,
499 tgt_store: Arc<DataStore>,
500 group: &BackupGroup,
501 delete: bool,
fc8920e3 502 progress: &mut StoreProgress,
07ad6470 503) -> Result<(), Error> {
07ad6470
DM
504 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
505
506 let args = json!({
507 "backup-type": group.backup_type(),
508 "backup-id": group.backup_id(),
509 });
510
511 let mut result = client.get(&path, Some(args)).await?;
512 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
513
514 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
515
0081903f
DM
516 client.login().await?; // make sure auth is complete
517
07ad6470
DM
518 let fingerprint = client.fingerprint();
519
520 let last_sync = tgt_store.last_successful_backup(group)?;
521
522 let mut remote_snapshots = std::collections::HashSet::new();
523
ebbe4958 524 // start with 16384 chunks (up to 65GB)
e2956c60 525 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
ebbe4958 526
fc8920e3 527 progress.group_snapshots = list.len() as u64;
7b8aa893 528
d2354a16
DC
529 let mut skip_info = SkipInfo {
530 oldest: i64::MAX,
531 newest: i64::MIN,
532 count: 0,
533 };
534
7b8aa893 535 for (pos, item) in list.into_iter().enumerate() {
e0e5b442 536 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
86f6f741
FG
537
538 // in-progress backups can't be synced
54aec2fa 539 if item.size.is_none() {
e2956c60
FG
540 worker.log(format!(
541 "skipping snapshot {} - in-progress backup",
542 snapshot
543 ));
86f6f741
FG
544 continue;
545 }
546
547 let backup_time = snapshot.backup_time();
548
07ad6470
DM
549 remote_snapshots.insert(backup_time);
550
551 if let Some(last_sync_time) = last_sync {
e2956c60 552 if last_sync_time > backup_time {
d2354a16 553 skip_info.update(backup_time);
e2956c60
FG
554 continue;
555 }
07ad6470
DM
556 }
557
0081903f
DM
558 // get updated auth_info (new tickets)
559 let auth_info = client.login().await?;
560
93e3581c 561 let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
07ad6470 562
e2956c60
FG
563 let new_client = HttpClient::new(
564 src_repo.host(),
565 src_repo.port(),
566 src_repo.auth_id(),
567 options,
568 )?;
07ad6470
DM
569
570 let reader = BackupReader::start(
571 new_client,
572 None,
573 src_repo.store(),
86f6f741
FG
574 snapshot.group().backup_type(),
575 snapshot.group().backup_id(),
07ad6470
DM
576 backup_time,
577 true,
e2956c60
FG
578 )
579 .await?;
07ad6470 580
e2956c60
FG
581 let result = pull_snapshot_from(
582 worker,
583 reader,
584 tgt_store.clone(),
585 &snapshot,
586 downloaded_chunks.clone(),
587 )
588 .await;
7b8aa893 589
fc8920e3 590 progress.done_snapshots = pos as u64 + 1;
6eff8dec 591 worker.log(format!("percentage done: {}", progress));
7b8aa893
DM
592
593 result?; // stop on error
07ad6470
DM
594 }
595
596 if delete {
597 let local_list = group.list_backups(&tgt_store.base_path())?;
598 for info in local_list {
599 let backup_time = info.backup_dir.backup_time();
e2956c60
FG
600 if remote_snapshots.contains(&backup_time) {
601 continue;
602 }
603 worker.log(format!(
604 "delete vanished snapshot {:?}",
605 info.backup_dir.relative_path()
606 ));
c9756b40 607 tgt_store.remove_backup_dir(&info.backup_dir, false)?;
07ad6470
DM
608 }
609 }
610
d2354a16
DC
611 if skip_info.count > 0 {
612 task_log!(worker, "{}", skip_info);
613 }
614
07ad6470
DM
615 Ok(())
616}
617
618pub async fn pull_store(
619 worker: &WorkerTask,
620 client: &HttpClient,
621 src_repo: &BackupRepository,
622 tgt_store: Arc<DataStore>,
623 delete: bool,
e6dc35ac 624 auth_id: Authid,
07ad6470 625) -> Result<(), Error> {
07ad6470
DM
626 // explicit create shared lock to prevent GC on newly created chunks
627 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
628
629 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
630
44de5bcc
FG
631 let mut result = client
632 .get(&path, None)
633 .await
634 .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
07ad6470
DM
635
636 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
637
f6e28f4e
DC
638 worker.log(format!("found {} groups to sync", list.len()));
639
07ad6470
DM
640 list.sort_unstable_by(|a, b| {
641 let type_order = a.backup_type.cmp(&b.backup_type);
642 if type_order == std::cmp::Ordering::Equal {
643 a.backup_id.cmp(&b.backup_id)
644 } else {
645 type_order
646 }
647 });
648
649 let mut errors = false;
650
651 let mut new_groups = std::collections::HashSet::new();
652 for item in list.iter() {
653 new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
654 }
655
fc8920e3
FG
656 let mut progress = StoreProgress::new(list.len() as u64);
657
658 for (done, item) in list.into_iter().enumerate() {
659 progress.done_groups = done as u64;
660 progress.done_snapshots = 0;
661 progress.group_snapshots = 0;
7b8aa893 662
07ad6470
DM
663 let group = BackupGroup::new(&item.backup_type, &item.backup_id);
664
30f73fa2
DM
665 let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
666 Ok(result) => result,
667 Err(err) => {
e2956c60
FG
668 worker.log(format!(
669 "sync group {}/{} failed - group lock failed: {}",
670 item.backup_type, item.backup_id, err
671 ));
30f73fa2
DM
672 errors = true; // do not stop here, instead continue
673 continue;
674 }
675 };
676
07ad6470 677 // permission check
e2956c60
FG
678 if auth_id != owner {
679 // only the owner is allowed to create additional snapshots
680 worker.log(format!(
681 "sync group {}/{} failed - owner check failed ({} != {})",
682 item.backup_type, item.backup_id, auth_id, owner
683 ));
7b8aa893 684 errors = true; // do not stop here, instead continue
20813274
WB
685 } else if let Err(err) = pull_group(
686 worker,
687 client,
688 src_repo,
689 tgt_store.clone(),
690 &group,
691 delete,
fc8920e3 692 &mut progress,
e2956c60
FG
693 )
694 .await
695 {
20813274
WB
696 worker.log(format!(
697 "sync group {}/{} failed - {}",
e2956c60 698 item.backup_type, item.backup_id, err,
20813274
WB
699 ));
700 errors = true; // do not stop here, instead continue
07ad6470
DM
701 }
702 }
703
704 if delete {
705 let result: Result<(), Error> = proxmox::try_block!({
7f3b0f67 706 let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
07ad6470 707 for local_group in local_groups {
e2956c60
FG
708 if new_groups.contains(&local_group) {
709 continue;
710 }
711 worker.log(format!(
712 "delete vanished group '{}/{}'",
713 local_group.backup_type(),
714 local_group.backup_id()
715 ));
07ad6470
DM
716 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
717 worker.log(err.to_string());
718 errors = true;
719 }
720 }
721 Ok(())
722 });
723 if let Err(err) = result {
724 worker.log(format!("error during cleanup: {}", err));
725 errors = true;
726 };
727 }
728
729 if errors {
730 bail!("sync failed with some errors.");
731 }
732
733 Ok(())
734}