]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/pull.rs
server: sync job: include removed vanished stats
[proxmox-backup.git] / src / server / pull.rs
1 //! Sync datastore from remote server
2
3 use std::collections::{HashMap, HashSet};
4 use std::io::{Seek, Write};
5 use std::path::{Path, PathBuf};
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::{Arc, Mutex};
8 use std::time::{Duration, SystemTime};
9
10 use anyhow::{bail, format_err, Error};
11 use http::StatusCode;
12 use proxmox_human_byte::HumanByte;
13 use proxmox_rest_server::WorkerTask;
14 use proxmox_router::HttpError;
15 use proxmox_sys::{task_log, task_warn};
16 use serde_json::json;
17
18 use pbs_api_types::{
19 print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
20 GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
21 PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
22 };
23 use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
24 use pbs_config::CachedUserInfo;
25 use pbs_datastore::data_blob::DataBlob;
26 use pbs_datastore::dynamic_index::DynamicIndexReader;
27 use pbs_datastore::fixed_index::FixedIndexReader;
28 use pbs_datastore::index::IndexFile;
29 use pbs_datastore::manifest::{
30 archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
31 };
32 use pbs_datastore::read_chunk::AsyncReadChunk;
33 use pbs_datastore::{
34 check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
35 };
36 use pbs_tools::sha::sha256;
37
38 use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
39 use crate::tools::parallel_handler::ParallelHandler;
40
41 struct RemoteReader {
42 backup_reader: Arc<BackupReader>,
43 dir: BackupDir,
44 }
45
46 struct LocalReader {
47 _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
48 path: PathBuf,
49 datastore: Arc<DataStore>,
50 }
51
52 pub(crate) struct PullTarget {
53 store: Arc<DataStore>,
54 ns: BackupNamespace,
55 }
56
57 pub(crate) struct RemoteSource {
58 repo: BackupRepository,
59 ns: BackupNamespace,
60 client: HttpClient,
61 }
62
63 pub(crate) struct LocalSource {
64 store: Arc<DataStore>,
65 ns: BackupNamespace,
66 }
67
68 #[derive(Default)]
69 pub(crate) struct RemovedVanishedStats {
70 pub(crate) groups: usize,
71 pub(crate) snapshots: usize,
72 pub(crate) namespaces: usize,
73 }
74
75 impl RemovedVanishedStats {
76 fn add(&mut self, rhs: RemovedVanishedStats) {
77 self.groups += rhs.groups;
78 self.snapshots += rhs.snapshots;
79 self.namespaces += rhs.namespaces;
80 }
81 }
82
83 #[derive(Default)]
84 pub(crate) struct PullStats {
85 pub(crate) chunk_count: usize,
86 pub(crate) bytes: usize,
87 pub(crate) elapsed: Duration,
88 pub(crate) removed: Option<RemovedVanishedStats>,
89 }
90
91 impl From<RemovedVanishedStats> for PullStats {
92 fn from(removed: RemovedVanishedStats) -> Self {
93 Self {
94 removed: Some(removed),
95 ..Default::default()
96 }
97 }
98 }
99
100 impl PullStats {
101 fn add(&mut self, rhs: PullStats) {
102 self.chunk_count += rhs.chunk_count;
103 self.bytes += rhs.bytes;
104 self.elapsed += rhs.elapsed;
105
106 if let Some(rhs_removed) = rhs.removed {
107 if let Some(ref mut removed) = self.removed {
108 removed.add(rhs_removed);
109 } else {
110 self.removed = Some(rhs_removed);
111 }
112 }
113 }
114 }
115
116 #[async_trait::async_trait]
117 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
118 /// The trait includes methods for listing namespaces, groups, and backup directories,
119 /// as well as retrieving a reader for reading data from the source
120 trait PullSource: Send + Sync {
121 /// Lists namespaces from the source.
122 async fn list_namespaces(
123 &self,
124 max_depth: &mut Option<usize>,
125 worker: &WorkerTask,
126 ) -> Result<Vec<BackupNamespace>, Error>;
127
128 /// Lists groups within a specific namespace from the source.
129 async fn list_groups(
130 &self,
131 namespace: &BackupNamespace,
132 owner: &Authid,
133 ) -> Result<Vec<BackupGroup>, Error>;
134
135 /// Lists backup directories for a specific group within a specific namespace from the source.
136 async fn list_backup_dirs(
137 &self,
138 namespace: &BackupNamespace,
139 group: &BackupGroup,
140 worker: &WorkerTask,
141 ) -> Result<Vec<BackupDir>, Error>;
142 fn get_ns(&self) -> BackupNamespace;
143 fn get_store(&self) -> &str;
144
145 /// Returns a reader for reading data from a specific backup directory.
146 async fn reader(
147 &self,
148 ns: &BackupNamespace,
149 dir: &BackupDir,
150 ) -> Result<Arc<dyn PullReader>, Error>;
151 }
152
153 #[async_trait::async_trait]
154 impl PullSource for RemoteSource {
155 async fn list_namespaces(
156 &self,
157 max_depth: &mut Option<usize>,
158 worker: &WorkerTask,
159 ) -> Result<Vec<BackupNamespace>, Error> {
160 if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
161 return Ok(vec![self.ns.clone()]);
162 }
163
164 let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
165 let mut data = json!({});
166 if let Some(max_depth) = max_depth {
167 data["max-depth"] = json!(max_depth);
168 }
169
170 if !self.ns.is_root() {
171 data["parent"] = json!(self.ns);
172 }
173 self.client.login().await?;
174
175 let mut result = match self.client.get(&path, Some(data)).await {
176 Ok(res) => res,
177 Err(err) => match err.downcast_ref::<HttpError>() {
178 Some(HttpError { code, message }) => match code {
179 &StatusCode::NOT_FOUND => {
180 if self.ns.is_root() && max_depth.is_none() {
181 task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
182 task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
183 max_depth.replace(0);
184 } else {
185 bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
186 }
187
188 return Ok(vec![self.ns.clone()]);
189 }
190 _ => {
191 bail!("Querying namespaces failed - HTTP error {code} - {message}");
192 }
193 },
194 None => {
195 bail!("Querying namespaces failed - {err}");
196 }
197 },
198 };
199
200 let list: Vec<BackupNamespace> =
201 serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
202 .into_iter()
203 .map(|list_item| list_item.ns)
204 .collect();
205
206 Ok(list)
207 }
208
209 async fn list_groups(
210 &self,
211 namespace: &BackupNamespace,
212 _owner: &Authid,
213 ) -> Result<Vec<BackupGroup>, Error> {
214 let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
215
216 let args = if !namespace.is_root() {
217 Some(json!({ "ns": namespace.clone() }))
218 } else {
219 None
220 };
221
222 self.client.login().await?;
223 let mut result =
224 self.client.get(&path, args).await.map_err(|err| {
225 format_err!("Failed to retrieve backup groups from remote - {}", err)
226 })?;
227
228 Ok(
229 serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
230 .map_err(Error::from)?
231 .into_iter()
232 .map(|item| item.backup)
233 .collect::<Vec<BackupGroup>>(),
234 )
235 }
236
237 async fn list_backup_dirs(
238 &self,
239 namespace: &BackupNamespace,
240 group: &BackupGroup,
241 worker: &WorkerTask,
242 ) -> Result<Vec<BackupDir>, Error> {
243 let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
244
245 let mut args = json!({
246 "backup-type": group.ty,
247 "backup-id": group.id,
248 });
249
250 if !namespace.is_root() {
251 args["ns"] = serde_json::to_value(namespace)?;
252 }
253
254 self.client.login().await?;
255
256 let mut result = self.client.get(&path, Some(args)).await?;
257 let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
258 Ok(snapshot_list
259 .into_iter()
260 .filter_map(|item: SnapshotListItem| {
261 let snapshot = item.backup;
262 // in-progress backups can't be synced
263 if item.size.is_none() {
264 task_log!(
265 worker,
266 "skipping snapshot {} - in-progress backup",
267 snapshot
268 );
269 return None;
270 }
271
272 Some(snapshot)
273 })
274 .collect::<Vec<BackupDir>>())
275 }
276
277 fn get_ns(&self) -> BackupNamespace {
278 self.ns.clone()
279 }
280
281 fn get_store(&self) -> &str {
282 self.repo.store()
283 }
284
285 async fn reader(
286 &self,
287 ns: &BackupNamespace,
288 dir: &BackupDir,
289 ) -> Result<Arc<dyn PullReader>, Error> {
290 let backup_reader =
291 BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
292 Ok(Arc::new(RemoteReader {
293 backup_reader,
294 dir: dir.clone(),
295 }))
296 }
297 }
298
299 #[async_trait::async_trait]
300 impl PullSource for LocalSource {
301 async fn list_namespaces(
302 &self,
303 max_depth: &mut Option<usize>,
304 _worker: &WorkerTask,
305 ) -> Result<Vec<BackupNamespace>, Error> {
306 ListNamespacesRecursive::new_max_depth(
307 self.store.clone(),
308 self.ns.clone(),
309 max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
310 )?
311 .collect()
312 }
313
314 async fn list_groups(
315 &self,
316 namespace: &BackupNamespace,
317 owner: &Authid,
318 ) -> Result<Vec<BackupGroup>, Error> {
319 Ok(ListAccessibleBackupGroups::new_with_privs(
320 &self.store,
321 namespace.clone(),
322 0,
323 Some(PRIV_DATASTORE_READ),
324 Some(PRIV_DATASTORE_BACKUP),
325 Some(owner),
326 )?
327 .filter_map(Result::ok)
328 .map(|backup_group| backup_group.group().clone())
329 .collect::<Vec<pbs_api_types::BackupGroup>>())
330 }
331
332 async fn list_backup_dirs(
333 &self,
334 namespace: &BackupNamespace,
335 group: &BackupGroup,
336 _worker: &WorkerTask,
337 ) -> Result<Vec<BackupDir>, Error> {
338 Ok(self
339 .store
340 .backup_group(namespace.clone(), group.clone())
341 .iter_snapshots()?
342 .filter_map(Result::ok)
343 .map(|snapshot| snapshot.dir().to_owned())
344 .collect::<Vec<BackupDir>>())
345 }
346
347 fn get_ns(&self) -> BackupNamespace {
348 self.ns.clone()
349 }
350
351 fn get_store(&self) -> &str {
352 self.store.name()
353 }
354
355 async fn reader(
356 &self,
357 ns: &BackupNamespace,
358 dir: &BackupDir,
359 ) -> Result<Arc<dyn PullReader>, Error> {
360 let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
361 let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
362 &dir.full_path(),
363 "snapshot",
364 "locked by another operation",
365 )?;
366 Ok(Arc::new(LocalReader {
367 _dir_lock: Arc::new(Mutex::new(dir_lock)),
368 path: dir.full_path(),
369 datastore: dir.datastore().clone(),
370 }))
371 }
372 }
373
374 #[async_trait::async_trait]
375 /// `PullReader` is a trait that provides an interface for reading data from a source.
376 /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
377 trait PullReader: Send + Sync {
378 /// Returns a chunk reader with the specified encryption mode.
379 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
380
381 /// Asynchronously loads a file from the source into a local file.
382 /// `filename` is the name of the file to load from the source.
383 /// `into` is the path of the local file to load the source file into.
384 async fn load_file_into(
385 &self,
386 filename: &str,
387 into: &Path,
388 worker: &WorkerTask,
389 ) -> Result<Option<DataBlob>, Error>;
390
391 /// Tries to download the client log from the source and save it into a local file.
392 async fn try_download_client_log(
393 &self,
394 to_path: &Path,
395 worker: &WorkerTask,
396 ) -> Result<(), Error>;
397
398 fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
399 }
400
401 #[async_trait::async_trait]
402 impl PullReader for RemoteReader {
403 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
404 Arc::new(RemoteChunkReader::new(
405 self.backup_reader.clone(),
406 None,
407 crypt_mode,
408 HashMap::new(),
409 ))
410 }
411
412 async fn load_file_into(
413 &self,
414 filename: &str,
415 into: &Path,
416 worker: &WorkerTask,
417 ) -> Result<Option<DataBlob>, Error> {
418 let mut tmp_file = std::fs::OpenOptions::new()
419 .write(true)
420 .create(true)
421 .truncate(true)
422 .read(true)
423 .open(into)?;
424 let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
425 if let Err(err) = download_result {
426 match err.downcast_ref::<HttpError>() {
427 Some(HttpError { code, message }) => match *code {
428 StatusCode::NOT_FOUND => {
429 task_log!(
430 worker,
431 "skipping snapshot {} - vanished since start of sync",
432 &self.dir,
433 );
434 return Ok(None);
435 }
436 _ => {
437 bail!("HTTP error {code} - {message}");
438 }
439 },
440 None => {
441 return Err(err);
442 }
443 };
444 };
445 tmp_file.rewind()?;
446 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
447 }
448
449 async fn try_download_client_log(
450 &self,
451 to_path: &Path,
452 worker: &WorkerTask,
453 ) -> Result<(), Error> {
454 let mut tmp_path = to_path.to_owned();
455 tmp_path.set_extension("tmp");
456
457 let tmpfile = std::fs::OpenOptions::new()
458 .write(true)
459 .create(true)
460 .read(true)
461 .open(&tmp_path)?;
462
463 // Note: be silent if there is no log - only log successful download
464 if let Ok(()) = self
465 .backup_reader
466 .download(CLIENT_LOG_BLOB_NAME, tmpfile)
467 .await
468 {
469 if let Err(err) = std::fs::rename(&tmp_path, to_path) {
470 bail!("Atomic rename file {:?} failed - {}", to_path, err);
471 }
472 task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
473 }
474
475 Ok(())
476 }
477
478 fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
479 false
480 }
481 }
482
483 #[async_trait::async_trait]
484 impl PullReader for LocalReader {
485 fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
486 Arc::new(LocalChunkReader::new(
487 self.datastore.clone(),
488 None,
489 crypt_mode,
490 ))
491 }
492
493 async fn load_file_into(
494 &self,
495 filename: &str,
496 into: &Path,
497 _worker: &WorkerTask,
498 ) -> Result<Option<DataBlob>, Error> {
499 let mut tmp_file = std::fs::OpenOptions::new()
500 .write(true)
501 .create(true)
502 .truncate(true)
503 .read(true)
504 .open(into)?;
505 let mut from_path = self.path.clone();
506 from_path.push(filename);
507 tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
508 tmp_file.rewind()?;
509 Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
510 }
511
512 async fn try_download_client_log(
513 &self,
514 _to_path: &Path,
515 _worker: &WorkerTask,
516 ) -> Result<(), Error> {
517 Ok(())
518 }
519
520 fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
521 self.datastore.name() == target_store_name
522 }
523 }
524
525 /// Parameters for a pull operation.
526 pub(crate) struct PullParameters {
527 /// Where data is pulled from
528 source: Arc<dyn PullSource>,
529 /// Where data should be pulled into
530 target: PullTarget,
531 /// Owner of synced groups (needs to match local owner of pre-existing groups)
532 owner: Authid,
533 /// Whether to remove groups which exist locally, but not on the remote end
534 remove_vanished: bool,
535 /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
536 max_depth: Option<usize>,
537 /// Filters for reducing the pull scope
538 group_filter: Vec<GroupFilter>,
539 /// How many snapshots should be transferred at most (taking the newest N snapshots)
540 transfer_last: Option<usize>,
541 }
542
543 impl PullParameters {
544 /// Creates a new instance of `PullParameters`.
545 pub(crate) fn new(
546 store: &str,
547 ns: BackupNamespace,
548 remote: Option<&str>,
549 remote_store: &str,
550 remote_ns: BackupNamespace,
551 owner: Authid,
552 remove_vanished: Option<bool>,
553 max_depth: Option<usize>,
554 group_filter: Option<Vec<GroupFilter>>,
555 limit: RateLimitConfig,
556 transfer_last: Option<usize>,
557 ) -> Result<Self, Error> {
558 if let Some(max_depth) = max_depth {
559 ns.check_max_depth(max_depth)?;
560 remote_ns.check_max_depth(max_depth)?;
561 };
562 let remove_vanished = remove_vanished.unwrap_or(false);
563
564 let source: Arc<dyn PullSource> = if let Some(remote) = remote {
565 let (remote_config, _digest) = pbs_config::remote::config()?;
566 let remote: Remote = remote_config.lookup("remote", remote)?;
567
568 let repo = BackupRepository::new(
569 Some(remote.config.auth_id.clone()),
570 Some(remote.config.host.clone()),
571 remote.config.port,
572 remote_store.to_string(),
573 );
574 let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
575 Arc::new(RemoteSource {
576 repo,
577 ns: remote_ns,
578 client,
579 })
580 } else {
581 Arc::new(LocalSource {
582 store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
583 ns: remote_ns,
584 })
585 };
586 let target = PullTarget {
587 store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
588 ns,
589 };
590
591 let group_filter = group_filter.unwrap_or_default();
592
593 Ok(Self {
594 source,
595 target,
596 owner,
597 remove_vanished,
598 max_depth,
599 group_filter,
600 transfer_last,
601 })
602 }
603 }
604
605 async fn pull_index_chunks<I: IndexFile>(
606 worker: &WorkerTask,
607 chunk_reader: Arc<dyn AsyncReadChunk>,
608 target: Arc<DataStore>,
609 index: I,
610 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
611 ) -> Result<PullStats, Error> {
612 use futures::stream::{self, StreamExt, TryStreamExt};
613
614 let start_time = SystemTime::now();
615
616 let stream = stream::iter(
617 (0..index.index_count())
618 .map(|pos| index.chunk_info(pos).unwrap())
619 .filter(|info| {
620 let mut guard = downloaded_chunks.lock().unwrap();
621 let done = guard.contains(&info.digest);
622 if !done {
623 // Note: We mark a chunk as downloaded before its actually downloaded
624 // to avoid duplicate downloads.
625 guard.insert(info.digest);
626 }
627 !done
628 }),
629 );
630
631 let target2 = target.clone();
632 let verify_pool = ParallelHandler::new(
633 "sync chunk writer",
634 4,
635 move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
636 // println!("verify and write {}", hex::encode(&digest));
637 chunk.verify_unencrypted(size as usize, &digest)?;
638 target2.insert_chunk(&chunk, &digest)?;
639 Ok(())
640 },
641 );
642
643 let verify_and_write_channel = verify_pool.channel();
644
645 let bytes = Arc::new(AtomicUsize::new(0));
646 let chunk_count = Arc::new(AtomicUsize::new(0));
647
648 stream
649 .map(|info| {
650 let target = Arc::clone(&target);
651 let chunk_reader = chunk_reader.clone();
652 let bytes = Arc::clone(&bytes);
653 let chunk_count = Arc::clone(&chunk_count);
654 let verify_and_write_channel = verify_and_write_channel.clone();
655
656 Ok::<_, Error>(async move {
657 let chunk_exists = proxmox_async::runtime::block_in_place(|| {
658 target.cond_touch_chunk(&info.digest, false)
659 })?;
660 if chunk_exists {
661 //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
662 return Ok::<_, Error>(());
663 }
664 //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
665 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
666 let raw_size = chunk.raw_size() as usize;
667
668 // decode, verify and write in a separate threads to maximize throughput
669 proxmox_async::runtime::block_in_place(|| {
670 verify_and_write_channel.send((chunk, info.digest, info.size()))
671 })?;
672
673 bytes.fetch_add(raw_size, Ordering::SeqCst);
674 chunk_count.fetch_add(1, Ordering::SeqCst);
675
676 Ok(())
677 })
678 })
679 .try_buffer_unordered(20)
680 .try_for_each(|_res| futures::future::ok(()))
681 .await?;
682
683 drop(verify_and_write_channel);
684
685 verify_pool.complete()?;
686
687 let elapsed = start_time.elapsed()?;
688
689 let bytes = bytes.load(Ordering::SeqCst);
690 let chunk_count = chunk_count.load(Ordering::SeqCst);
691
692 task_log!(
693 worker,
694 "downloaded {} ({}/s)",
695 HumanByte::from(bytes),
696 HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
697 );
698
699 Ok(PullStats {
700 chunk_count,
701 bytes,
702 elapsed,
703 removed: None,
704 })
705 }
706
707 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
708 if size != info.size {
709 bail!(
710 "wrong size for file '{}' ({} != {})",
711 info.filename,
712 info.size,
713 size
714 );
715 }
716
717 if csum != &info.csum {
718 bail!("wrong checksum for file '{}'", info.filename);
719 }
720
721 Ok(())
722 }
723
724 /// Pulls a single file referenced by a manifest.
725 ///
726 /// Pulling an archive consists of the following steps:
727 /// - Load archive file into tmp file
728 /// -- Load file into tmp file
729 /// -- Verify tmp file checksum
730 /// - if archive is an index, pull referenced chunks
731 /// - Rename tmp file into real path
732 async fn pull_single_archive<'a>(
733 worker: &'a WorkerTask,
734 reader: Arc<dyn PullReader + 'a>,
735 snapshot: &'a pbs_datastore::BackupDir,
736 archive_info: &'a FileInfo,
737 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
738 ) -> Result<PullStats, Error> {
739 let archive_name = &archive_info.filename;
740 let mut path = snapshot.full_path();
741 path.push(archive_name);
742
743 let mut tmp_path = path.clone();
744 tmp_path.set_extension("tmp");
745
746 let mut pull_stats = PullStats::default();
747
748 task_log!(worker, "sync archive {}", archive_name);
749
750 reader
751 .load_file_into(archive_name, &tmp_path, worker)
752 .await?;
753
754 let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
755
756 match archive_type(archive_name)? {
757 ArchiveType::DynamicIndex => {
758 let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
759 format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
760 })?;
761 let (csum, size) = index.compute_csum();
762 verify_archive(archive_info, &csum, size)?;
763
764 if reader.skip_chunk_sync(snapshot.datastore().name()) {
765 task_log!(worker, "skipping chunk sync for same datastore");
766 } else {
767 let stats = pull_index_chunks(
768 worker,
769 reader.chunk_reader(archive_info.crypt_mode),
770 snapshot.datastore().clone(),
771 index,
772 downloaded_chunks,
773 )
774 .await?;
775 pull_stats.add(stats);
776 }
777 }
778 ArchiveType::FixedIndex => {
779 let index = FixedIndexReader::new(tmpfile).map_err(|err| {
780 format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
781 })?;
782 let (csum, size) = index.compute_csum();
783 verify_archive(archive_info, &csum, size)?;
784
785 if reader.skip_chunk_sync(snapshot.datastore().name()) {
786 task_log!(worker, "skipping chunk sync for same datastore");
787 } else {
788 let stats = pull_index_chunks(
789 worker,
790 reader.chunk_reader(archive_info.crypt_mode),
791 snapshot.datastore().clone(),
792 index,
793 downloaded_chunks,
794 )
795 .await?;
796 pull_stats.add(stats);
797 }
798 }
799 ArchiveType::Blob => {
800 tmpfile.rewind()?;
801 let (csum, size) = sha256(&mut tmpfile)?;
802 verify_archive(archive_info, &csum, size)?;
803 }
804 }
805 if let Err(err) = std::fs::rename(&tmp_path, &path) {
806 bail!("Atomic rename file {:?} failed - {}", path, err);
807 }
808 Ok(pull_stats)
809 }
810
811 /// Actual implementation of pulling a snapshot.
812 ///
813 /// Pulling a snapshot consists of the following steps:
814 /// - (Re)download the manifest
815 /// -- if it matches, only download log and treat snapshot as already synced
816 /// - Iterate over referenced files
817 /// -- if file already exists, verify contents
818 /// -- if not, pull it from the remote
819 /// - Download log if not already existing
820 async fn pull_snapshot<'a>(
821 worker: &'a WorkerTask,
822 reader: Arc<dyn PullReader + 'a>,
823 snapshot: &'a pbs_datastore::BackupDir,
824 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
825 ) -> Result<PullStats, Error> {
826 let mut pull_stats = PullStats::default();
827 let mut manifest_name = snapshot.full_path();
828 manifest_name.push(MANIFEST_BLOB_NAME);
829
830 let mut client_log_name = snapshot.full_path();
831 client_log_name.push(CLIENT_LOG_BLOB_NAME);
832
833 let mut tmp_manifest_name = manifest_name.clone();
834 tmp_manifest_name.set_extension("tmp");
835 let tmp_manifest_blob;
836 if let Some(data) = reader
837 .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
838 .await?
839 {
840 tmp_manifest_blob = data;
841 } else {
842 return Ok(pull_stats);
843 }
844
845 if manifest_name.exists() {
846 let manifest_blob = proxmox_lang::try_block!({
847 let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
848 format_err!("unable to open local manifest {manifest_name:?} - {err}")
849 })?;
850
851 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
852 Ok(manifest_blob)
853 })
854 .map_err(|err: Error| {
855 format_err!("unable to read local manifest {manifest_name:?} - {err}")
856 })?;
857
858 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
859 if !client_log_name.exists() {
860 reader
861 .try_download_client_log(&client_log_name, worker)
862 .await?;
863 };
864 task_log!(worker, "no data changes");
865 let _ = std::fs::remove_file(&tmp_manifest_name);
866 return Ok(pull_stats); // nothing changed
867 }
868 }
869
870 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
871
872 for item in manifest.files() {
873 let mut path = snapshot.full_path();
874 path.push(&item.filename);
875
876 if path.exists() {
877 match archive_type(&item.filename)? {
878 ArchiveType::DynamicIndex => {
879 let index = DynamicIndexReader::open(&path)?;
880 let (csum, size) = index.compute_csum();
881 match manifest.verify_file(&item.filename, &csum, size) {
882 Ok(_) => continue,
883 Err(err) => {
884 task_log!(worker, "detected changed file {:?} - {}", path, err);
885 }
886 }
887 }
888 ArchiveType::FixedIndex => {
889 let index = FixedIndexReader::open(&path)?;
890 let (csum, size) = index.compute_csum();
891 match manifest.verify_file(&item.filename, &csum, size) {
892 Ok(_) => continue,
893 Err(err) => {
894 task_log!(worker, "detected changed file {:?} - {}", path, err);
895 }
896 }
897 }
898 ArchiveType::Blob => {
899 let mut tmpfile = std::fs::File::open(&path)?;
900 let (csum, size) = sha256(&mut tmpfile)?;
901 match manifest.verify_file(&item.filename, &csum, size) {
902 Ok(_) => continue,
903 Err(err) => {
904 task_log!(worker, "detected changed file {:?} - {}", path, err);
905 }
906 }
907 }
908 }
909 }
910
911 let stats = pull_single_archive(
912 worker,
913 reader.clone(),
914 snapshot,
915 item,
916 downloaded_chunks.clone(),
917 )
918 .await?;
919 pull_stats.add(stats);
920 }
921
922 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
923 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
924 }
925
926 if !client_log_name.exists() {
927 reader
928 .try_download_client_log(&client_log_name, worker)
929 .await?;
930 };
931 snapshot
932 .cleanup_unreferenced_files(&manifest)
933 .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
934
935 Ok(pull_stats)
936 }
937
938 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
939 ///
940 /// The `reader` is configured to read from the source backup directory, while the
941 /// `snapshot` is pointing to the local datastore and target namespace.
942 async fn pull_snapshot_from<'a>(
943 worker: &'a WorkerTask,
944 reader: Arc<dyn PullReader + 'a>,
945 snapshot: &'a pbs_datastore::BackupDir,
946 downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
947 ) -> Result<PullStats, Error> {
948 let (_path, is_new, _snap_lock) = snapshot
949 .datastore()
950 .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
951
952 let pull_stats = if is_new {
953 task_log!(worker, "sync snapshot {}", snapshot.dir());
954
955 match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
956 Err(err) => {
957 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
958 snapshot.backup_ns(),
959 snapshot.as_ref(),
960 true,
961 ) {
962 task_log!(worker, "cleanup error - {}", cleanup_err);
963 }
964 return Err(err);
965 }
966 Ok(pull_stats) => {
967 task_log!(worker, "sync snapshot {} done", snapshot.dir());
968 pull_stats
969 }
970 }
971 } else {
972 task_log!(worker, "re-sync snapshot {}", snapshot.dir());
973 pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?
974 };
975
976 Ok(pull_stats)
977 }
978
979 #[derive(PartialEq, Eq)]
980 enum SkipReason {
981 AlreadySynced,
982 TransferLast,
983 }
984
985 impl std::fmt::Display for SkipReason {
986 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
987 write!(
988 f,
989 "{}",
990 match self {
991 SkipReason::AlreadySynced => "older than the newest local snapshot",
992 SkipReason::TransferLast => "due to transfer-last",
993 }
994 )
995 }
996 }
997
998 struct SkipInfo {
999 oldest: i64,
1000 newest: i64,
1001 count: u64,
1002 skip_reason: SkipReason,
1003 }
1004
1005 impl SkipInfo {
1006 fn new(skip_reason: SkipReason) -> Self {
1007 SkipInfo {
1008 oldest: i64::MAX,
1009 newest: i64::MIN,
1010 count: 0,
1011 skip_reason,
1012 }
1013 }
1014
1015 fn reset(&mut self) {
1016 self.count = 0;
1017 self.oldest = i64::MAX;
1018 self.newest = i64::MIN;
1019 }
1020
1021 fn update(&mut self, backup_time: i64) {
1022 self.count += 1;
1023
1024 if backup_time < self.oldest {
1025 self.oldest = backup_time;
1026 }
1027
1028 if backup_time > self.newest {
1029 self.newest = backup_time;
1030 }
1031 }
1032
1033 fn affected(&self) -> Result<String, Error> {
1034 match self.count {
1035 0 => Ok(String::new()),
1036 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
1037 _ => Ok(format!(
1038 "{} .. {}",
1039 proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
1040 proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
1041 )),
1042 }
1043 }
1044 }
1045
1046 impl std::fmt::Display for SkipInfo {
1047 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1048 write!(
1049 f,
1050 "skipped: {} snapshot(s) ({}) - {}",
1051 self.count,
1052 self.affected().map_err(|_| std::fmt::Error)?,
1053 self.skip_reason,
1054 )
1055 }
1056 }
1057
1058 /// Pulls a group according to `params`.
1059 ///
1060 /// Pulling a group consists of the following steps:
1061 /// - Query the list of snapshots available for this group in the source namespace on the remote
1062 /// - Sort by snapshot time
1063 /// - Get last snapshot timestamp on local datastore
1064 /// - Iterate over list of snapshots
1065 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
1066 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
1067 ///
1068 /// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
1069 /// remote when querying snapshots. This allows us to interact with old remotes that don't have
1070 /// namespace support yet.
1071 ///
1072 /// Permission checks:
1073 /// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
1074 /// - local group owner is already checked by pull_store
1075 async fn pull_group(
1076 worker: &WorkerTask,
1077 params: &PullParameters,
1078 source_namespace: &BackupNamespace,
1079 group: &BackupGroup,
1080 progress: &mut StoreProgress,
1081 ) -> Result<PullStats, Error> {
1082 let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
1083 let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
1084
1085 let mut raw_list: Vec<BackupDir> = params
1086 .source
1087 .list_backup_dirs(source_namespace, group, worker)
1088 .await?;
1089 raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
1090
1091 let total_amount = raw_list.len();
1092
1093 let cutoff = params
1094 .transfer_last
1095 .map(|count| total_amount.saturating_sub(count))
1096 .unwrap_or_default();
1097
1098 let target_ns = source_namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1099
1100 let mut source_snapshots = HashSet::new();
1101 let last_sync_time = params
1102 .target
1103 .store
1104 .last_successful_backup(&target_ns, group)?
1105 .unwrap_or(i64::MIN);
1106
1107 let list: Vec<BackupDir> = raw_list
1108 .into_iter()
1109 .enumerate()
1110 .filter(|&(pos, ref dir)| {
1111 source_snapshots.insert(dir.time);
1112 if last_sync_time > dir.time {
1113 already_synced_skip_info.update(dir.time);
1114 return false;
1115 } else if already_synced_skip_info.count > 0 {
1116 task_log!(worker, "{}", already_synced_skip_info);
1117 already_synced_skip_info.reset();
1118 return true;
1119 }
1120
1121 if pos < cutoff && last_sync_time != dir.time {
1122 transfer_last_skip_info.update(dir.time);
1123 return false;
1124 } else if transfer_last_skip_info.count > 0 {
1125 task_log!(worker, "{}", transfer_last_skip_info);
1126 transfer_last_skip_info.reset();
1127 }
1128 true
1129 })
1130 .map(|(_, dir)| dir)
1131 .collect();
1132
1133 // start with 65536 chunks (up to 256 GiB)
1134 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
1135
1136 progress.group_snapshots = list.len() as u64;
1137
1138 let mut pull_stats = PullStats::default();
1139
1140 for (pos, from_snapshot) in list.into_iter().enumerate() {
1141 let to_snapshot = params
1142 .target
1143 .store
1144 .backup_dir(target_ns.clone(), from_snapshot.clone())?;
1145
1146 let reader = params
1147 .source
1148 .reader(source_namespace, &from_snapshot)
1149 .await?;
1150 let result =
1151 pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
1152
1153 progress.done_snapshots = pos as u64 + 1;
1154 task_log!(worker, "percentage done: {}", progress);
1155
1156 let stats = result?; // stop on error
1157 pull_stats.add(stats);
1158 }
1159
1160 if params.remove_vanished {
1161 let group = params
1162 .target
1163 .store
1164 .backup_group(target_ns.clone(), group.clone());
1165 let local_list = group.list_backups()?;
1166 for info in local_list {
1167 let snapshot = info.backup_dir;
1168 if source_snapshots.contains(&snapshot.backup_time()) {
1169 continue;
1170 }
1171 if snapshot.is_protected() {
1172 task_log!(
1173 worker,
1174 "don't delete vanished snapshot {} (protected)",
1175 snapshot.dir()
1176 );
1177 continue;
1178 }
1179 task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
1180 params
1181 .target
1182 .store
1183 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
1184 pull_stats.add(PullStats::from(RemovedVanishedStats {
1185 snapshots: 1,
1186 groups: 0,
1187 namespaces: 0,
1188 }));
1189 }
1190 }
1191
1192 Ok(pull_stats)
1193 }
1194
1195 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
1196 let mut created = false;
1197 let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
1198
1199 if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
1200 check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
1201 .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
1202
1203 let name = match ns.components().last() {
1204 Some(name) => name.to_owned(),
1205 None => {
1206 bail!("Failed to determine last component of namespace.");
1207 }
1208 };
1209
1210 if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
1211 bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
1212 }
1213 created = true;
1214 }
1215
1216 check_ns_privs(
1217 params.target.store.name(),
1218 ns,
1219 &params.owner,
1220 PRIV_DATASTORE_BACKUP,
1221 )
1222 .map_err(|err| format_err!("sync into {store_ns_str} not allowed - {err}"))?;
1223
1224 Ok(created)
1225 }
1226
1227 fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
1228 check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
1229 .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
1230
1231 params
1232 .target
1233 .store
1234 .remove_namespace_recursive(local_ns, true)
1235 }
1236
1237 fn check_and_remove_vanished_ns(
1238 worker: &WorkerTask,
1239 params: &PullParameters,
1240 synced_ns: HashSet<BackupNamespace>,
1241 ) -> Result<(bool, RemovedVanishedStats), Error> {
1242 let mut errors = false;
1243 let mut removed_stats = RemovedVanishedStats::default();
1244 let user_info = CachedUserInfo::new()?;
1245
1246 // clamp like remote does so that we don't list more than we can ever have synced.
1247 let max_depth = params
1248 .max_depth
1249 .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
1250
1251 let mut local_ns_list: Vec<BackupNamespace> = params
1252 .target
1253 .store
1254 .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
1255 .filter(|ns| {
1256 let user_privs =
1257 user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
1258 user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
1259 })
1260 .collect();
1261
1262 // children first!
1263 local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
1264
1265 for local_ns in local_ns_list {
1266 if local_ns == params.target.ns {
1267 continue;
1268 }
1269
1270 if synced_ns.contains(&local_ns) {
1271 continue;
1272 }
1273
1274 if local_ns.is_root() {
1275 continue;
1276 }
1277 match check_and_remove_ns(params, &local_ns) {
1278 Ok(true) => {
1279 task_log!(worker, "Removed namespace {local_ns}");
1280 removed_stats.namespaces += 1;
1281 }
1282 Ok(false) => task_log!(
1283 worker,
1284 "Did not remove namespace {} - protected snapshots remain",
1285 local_ns
1286 ),
1287 Err(err) => {
1288 task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err);
1289 errors = true;
1290 }
1291 }
1292 }
1293
1294 Ok((errors, removed_stats))
1295 }
1296
1297 /// Pulls a store according to `params`.
1298 ///
1299 /// Pulling a store consists of the following steps:
1300 /// - Query list of namespaces on the remote
1301 /// - Iterate list
1302 /// -- create sub-NS if needed (and allowed)
1303 /// -- attempt to pull each NS in turn
1304 /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
1305 ///
1306 /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
1307 /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
1308 ///
1309 /// Permission checks:
1310 /// - access to local datastore, namespace anchor and remote entry need to be checked at call site
1311 /// - remote namespaces are filtered by remote
1312 /// - creation and removal of sub-NS checked here
1313 /// - access to sub-NS checked here
1314 pub(crate) async fn pull_store(
1315 worker: &WorkerTask,
1316 mut params: PullParameters,
1317 ) -> Result<PullStats, Error> {
1318 // explicit create shared lock to prevent GC on newly created chunks
1319 let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
1320 let mut errors = false;
1321
1322 let old_max_depth = params.max_depth;
1323 let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
1324 vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
1325 } else {
1326 params
1327 .source
1328 .list_namespaces(&mut params.max_depth, worker)
1329 .await?
1330 };
1331
1332 let ns_layers_to_be_pulled = namespaces
1333 .iter()
1334 .map(BackupNamespace::depth)
1335 .max()
1336 .map_or(0, |v| v - params.source.get_ns().depth());
1337 let target_depth = params.target.ns.depth();
1338
1339 if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
1340 bail!(
1341 "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
1342 ns_layers_to_be_pulled,
1343 target_depth,
1344 MAX_NAMESPACE_DEPTH
1345 );
1346 }
1347
1348 errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
1349 namespaces.sort_unstable_by_key(|a| a.name_len());
1350
1351 let (mut groups, mut snapshots) = (0, 0);
1352 let mut synced_ns = HashSet::with_capacity(namespaces.len());
1353 let mut pull_stats = PullStats::default();
1354
1355 for namespace in namespaces {
1356 let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
1357
1358 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1359 let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
1360
1361 task_log!(worker, "----");
1362 task_log!(
1363 worker,
1364 "Syncing {} into {}",
1365 source_store_ns_str,
1366 target_store_ns_str
1367 );
1368
1369 synced_ns.insert(target_ns.clone());
1370
1371 match check_and_create_ns(&params, &target_ns) {
1372 Ok(true) => task_log!(worker, "Created namespace {}", target_ns),
1373 Ok(false) => {}
1374 Err(err) => {
1375 task_log!(
1376 worker,
1377 "Cannot sync {} into {} - {}",
1378 source_store_ns_str,
1379 target_store_ns_str,
1380 err,
1381 );
1382 errors = true;
1383 continue;
1384 }
1385 }
1386
1387 match pull_ns(worker, &namespace, &mut params).await {
1388 Ok((ns_progress, ns_pull_stats, ns_errors)) => {
1389 errors |= ns_errors;
1390
1391 pull_stats.add(ns_pull_stats);
1392
1393 if params.max_depth != Some(0) {
1394 groups += ns_progress.done_groups;
1395 snapshots += ns_progress.done_snapshots;
1396 task_log!(
1397 worker,
1398 "Finished syncing namespace {}, current progress: {} groups, {} snapshots",
1399 namespace,
1400 groups,
1401 snapshots,
1402 );
1403 }
1404 }
1405 Err(err) => {
1406 errors = true;
1407 task_log!(
1408 worker,
1409 "Encountered errors while syncing namespace {} - {}",
1410 &namespace,
1411 err,
1412 );
1413 }
1414 };
1415 }
1416
1417 if params.remove_vanished {
1418 let (has_errors, stats) = check_and_remove_vanished_ns(worker, &params, synced_ns)?;
1419 errors |= has_errors;
1420 pull_stats.add(PullStats::from(stats));
1421 }
1422
1423 if errors {
1424 bail!("sync failed with some errors.");
1425 }
1426
1427 Ok(pull_stats)
1428 }
1429
1430 /// Pulls a namespace according to `params`.
1431 ///
1432 /// Pulling a namespace consists of the following steps:
1433 /// - Query list of groups on the remote (in `source_ns`)
1434 /// - Filter list according to configured group filters
1435 /// - Iterate list and attempt to pull each group in turn
1436 /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
1437 /// not or no longer available on the remote
1438 ///
1439 /// Permission checks:
1440 /// - remote namespaces are filtered by remote
1441 /// - owner check for vanished groups done here
1442 pub(crate) async fn pull_ns(
1443 worker: &WorkerTask,
1444 namespace: &BackupNamespace,
1445 params: &mut PullParameters,
1446 ) -> Result<(StoreProgress, PullStats, bool), Error> {
1447 let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
1448
1449 list.sort_unstable_by(|a, b| {
1450 let type_order = a.ty.cmp(&b.ty);
1451 if type_order == std::cmp::Ordering::Equal {
1452 a.id.cmp(&b.id)
1453 } else {
1454 type_order
1455 }
1456 });
1457
1458 let unfiltered_count = list.len();
1459 let list: Vec<BackupGroup> = list
1460 .into_iter()
1461 .filter(|group| group.apply_filters(&params.group_filter))
1462 .collect();
1463 task_log!(
1464 worker,
1465 "found {} groups to sync (out of {} total)",
1466 list.len(),
1467 unfiltered_count
1468 );
1469
1470 let mut errors = false;
1471
1472 let mut new_groups = HashSet::new();
1473 for group in list.iter() {
1474 new_groups.insert(group.clone());
1475 }
1476
1477 let mut progress = StoreProgress::new(list.len() as u64);
1478 let mut pull_stats = PullStats::default();
1479
1480 let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
1481
1482 for (done, group) in list.into_iter().enumerate() {
1483 progress.done_groups = done as u64;
1484 progress.done_snapshots = 0;
1485 progress.group_snapshots = 0;
1486
1487 let (owner, _lock_guard) =
1488 match params
1489 .target
1490 .store
1491 .create_locked_backup_group(&target_ns, &group, &params.owner)
1492 {
1493 Ok(result) => result,
1494 Err(err) => {
1495 task_log!(
1496 worker,
1497 "sync group {} failed - group lock failed: {}",
1498 &group,
1499 err
1500 );
1501 errors = true;
1502 // do not stop here, instead continue
1503 task_log!(worker, "create_locked_backup_group failed");
1504 continue;
1505 }
1506 };
1507
1508 // permission check
1509 if params.owner != owner {
1510 // only the owner is allowed to create additional snapshots
1511 task_log!(
1512 worker,
1513 "sync group {} failed - owner check failed ({} != {})",
1514 &group,
1515 params.owner,
1516 owner
1517 );
1518 errors = true; // do not stop here, instead continue
1519 } else {
1520 match pull_group(worker, params, namespace, &group, &mut progress).await {
1521 Ok(stats) => pull_stats.add(stats),
1522 Err(err) => {
1523 task_log!(worker, "sync group {} failed - {}", &group, err,);
1524 errors = true; // do not stop here, instead continue
1525 }
1526 }
1527 }
1528 }
1529
1530 if params.remove_vanished {
1531 let result: Result<(), Error> = proxmox_lang::try_block!({
1532 for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
1533 let local_group = local_group?;
1534 let local_group = local_group.group();
1535 if new_groups.contains(local_group) {
1536 continue;
1537 }
1538 let owner = params.target.store.get_owner(&target_ns, local_group)?;
1539 if check_backup_owner(&owner, &params.owner).is_err() {
1540 continue;
1541 }
1542 if !local_group.apply_filters(&params.group_filter) {
1543 continue;
1544 }
1545 task_log!(worker, "delete vanished group '{local_group}'",);
1546 let delete_stats_result = params
1547 .target
1548 .store
1549 .remove_backup_group(&target_ns, local_group);
1550
1551 match delete_stats_result {
1552 Ok(stats) => {
1553 if !stats.all_removed() {
1554 task_log!(
1555 worker,
1556 "kept some protected snapshots of group '{local_group}'",
1557 );
1558 pull_stats.add(PullStats::from(RemovedVanishedStats {
1559 snapshots: stats.removed_snapshots(),
1560 groups: 0,
1561 namespaces: 0,
1562 }));
1563 } else {
1564 pull_stats.add(PullStats::from(RemovedVanishedStats {
1565 snapshots: stats.removed_snapshots(),
1566 groups: 1,
1567 namespaces: 0,
1568 }));
1569 }
1570 }
1571 Err(err) => {
1572 task_log!(worker, "{}", err);
1573 errors = true;
1574 }
1575 }
1576 }
1577 Ok(())
1578 });
1579 if let Err(err) = result {
1580 task_log!(worker, "error during cleanup: {}", err);
1581 errors = true;
1582 };
1583 }
1584
1585 Ok((progress, pull_stats, errors))
1586 }