]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/pull.rs
split the namespace out of BackupGroup/Dir api types
[proxmox-backup.git] / src / server / pull.rs
index 5214a218f1524cd980bd521fbf88cdd0118007be..0ad9aac6f01e3dd294b77b56e21a8724897cfb60 100644 (file)
@@ -8,31 +8,95 @@ use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
+use http::StatusCode;
 use serde_json::json;
 
-use proxmox::api::error::{HttpError, StatusCode};
+use proxmox_router::HttpError;
+use proxmox_sys::task_log;
 
-use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
-use pbs_datastore::{task_log, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_api_types::{
+    Authid, BackupNamespace, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote,
+    SnapshotListItem,
+};
+
+use pbs_client::{
+    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
+};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
-    CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
+    archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
+use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
 use pbs_tools::sha::sha256;
-use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
+use proxmox_rest_server::WorkerTask;
+
+use crate::tools::parallel_handler::ParallelHandler;
+
+/// Parameters for a pull operation.
+pub struct PullParameters {
+    /// Remote that is pulled from
+    remote: Remote,
+    /// Full specification of remote datastore
+    source: BackupRepository,
+    /// Local store that is pulled into
+    store: Arc<DataStore>,
+    /// Owner of synced groups (needs to match local owner of pre-existing groups)
+    owner: Authid,
+    /// Whether to remove groups which exist locally, but not on the remote end
+    remove_vanished: bool,
+    /// Filters for reducing the pull scope
+    group_filter: Option<Vec<GroupFilter>>,
+    /// Rate limits for all transfers from `remote`
+    limit: RateLimitConfig,
+}
 
-use crate::{
-    backup::DataStore,
-    server::WorkerTask,
-    tools::ParallelHandler,
-};
+impl PullParameters {
+    /// Creates a new instance of `PullParameters`.
+    ///
+    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
+    /// [BackupRepository] with `remote_store`.
+    pub fn new(
+        store: &str,
+        remote: &str,
+        remote_store: &str,
+        owner: Authid,
+        remove_vanished: Option<bool>,
+        group_filter: Option<Vec<GroupFilter>>,
+        limit: RateLimitConfig,
+    ) -> Result<Self, Error> {
+        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
+
+        let (remote_config, _digest) = pbs_config::remote::config()?;
+        let remote: Remote = remote_config.lookup("remote", remote)?;
+
+        let remove_vanished = remove_vanished.unwrap_or(false);
+
+        let source = BackupRepository::new(
+            Some(remote.config.auth_id.clone()),
+            Some(remote.config.host.clone()),
+            remote.config.port,
+            remote_store.to_string(),
+        );
 
-// fixme: implement filters
-// fixme: delete vanished groups
-// Todo: correctly lock backup groups
+        Ok(Self {
+            remote,
+            source,
+            store,
+            owner,
+            remove_vanished,
+            group_filter,
+            limit,
+        })
+    }
+
+    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
+    pub async fn client(&self) -> Result<HttpClient, Error> {
+        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+    }
+}
 
 async fn pull_index_chunks<I: IndexFile>(
     worker: &WorkerTask,
@@ -65,7 +129,7 @@ async fn pull_index_chunks<I: IndexFile>(
         "sync chunk writer",
         4,
         move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
-            // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
+            // println!("verify and write {}", hex::encode(&digest));
             chunk.verify_unencrypted(size as usize, &digest)?;
             target2.insert_chunk(&chunk, &digest)?;
             Ok(())
@@ -84,19 +148,19 @@ async fn pull_index_chunks<I: IndexFile>(
             let verify_and_write_channel = verify_and_write_channel.clone();
 
             Ok::<_, Error>(async move {
-                let chunk_exists = pbs_runtime::block_in_place(|| {
+                let chunk_exists = proxmox_async::runtime::block_in_place(|| {
                     target.cond_touch_chunk(&info.digest, false)
                 })?;
                 if chunk_exists {
-                    //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
+                    //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
                     return Ok::<_, Error>(());
                 }
-                //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
+                //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
                 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
                 let raw_size = chunk.raw_size() as usize;
 
                 // decode, verify and write in a separate threads to maximize throughput
-                pbs_runtime::block_in_place(|| {
+                proxmox_async::runtime::block_in_place(|| {
                     verify_and_write_channel.send((chunk, info.digest, info.size()))
                 })?;
 
@@ -117,11 +181,12 @@ async fn pull_index_chunks<I: IndexFile>(
 
     let bytes = bytes.load(Ordering::SeqCst);
 
-    worker.log(format!(
+    task_log!(
+        worker,
         "downloaded {} bytes ({:.2} MiB/s)",
         bytes,
         (bytes as f64) / (1024.0 * 1024.0 * elapsed)
-    ));
+    );
 
     Ok(())
 }
@@ -163,24 +228,33 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
     Ok(())
 }
 
+/// Pulls a single file referenced by a manifest.
+///
+/// Pulling an archive consists of the following steps:
+/// - Create tmp file for archive
+/// - Download archive file into tmp file
+/// - Verify tmp file checksum
+/// - if archive is an index, pull referenced chunks
+/// - Rename tmp file into real path
 async fn pull_single_archive(
     worker: &WorkerTask,
     reader: &BackupReader,
     chunk_reader: &mut RemoteChunkReader,
     tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
+    snapshot: &pbs_api_types::BackupDir,
     archive_info: &FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let archive_name = &archive_info.filename;
     let mut path = tgt_store.base_path();
-    path.push(snapshot.relative_path());
+    path.push(snapshot.to_string());
     path.push(archive_name);
 
     let mut tmp_path = path.clone();
     tmp_path.set_extension("tmp");
 
-    worker.log(format!("sync archive {}", archive_name));
+    task_log!(worker, "sync archive {}", archive_name);
+
     let mut tmpfile = std::fs::OpenOptions::new()
         .write(true)
         .create(true)
@@ -255,25 +329,36 @@ async fn try_client_log_download(
         if let Err(err) = std::fs::rename(&tmp_path, &path) {
             bail!("Atomic rename file {:?} failed - {}", path, err);
         }
-        worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME));
+        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
     }
 
     Ok(())
 }
 
+/// Actual implementation of pulling a snapshot.
+///
+/// Pulling a snapshot consists of the following steps:
+/// - (Re)download the manifest
+/// -- if it matches, only download log and treat snapshot as already synced
+/// - Iterate over referenced files
+/// -- if file already exists, verify contents
+/// -- if not, pull it from the remote
+/// - Download log if not already existing
 async fn pull_snapshot(
     worker: &WorkerTask,
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
+    snapshot: &pbs_api_types::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
+    let snapshot_relative_path = snapshot.to_string();
+
     let mut manifest_name = tgt_store.base_path();
-    manifest_name.push(snapshot.relative_path());
+    manifest_name.push(&snapshot_relative_path);
     manifest_name.push(MANIFEST_BLOB_NAME);
 
     let mut client_log_name = tgt_store.base_path();
-    client_log_name.push(snapshot.relative_path());
+    client_log_name.push(&snapshot_relative_path);
     client_log_name.push(CLIENT_LOG_BLOB_NAME);
 
     let mut tmp_manifest_name = manifest_name.clone();
@@ -286,10 +371,11 @@ async fn pull_snapshot(
             match err.downcast_ref::<HttpError>() {
                 Some(HttpError { code, message }) => match *code {
                     StatusCode::NOT_FOUND => {
-                        worker.log(format!(
+                        task_log!(
+                            worker,
                             "skipping snapshot {} - vanished since start of sync",
                             snapshot
-                        ));
+                        );
                         return Ok(());
                     }
                     _ => {
@@ -305,7 +391,7 @@ async fn pull_snapshot(
     let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
 
     if manifest_name.exists() {
-        let manifest_blob = proxmox::try_block!({
+        let manifest_blob = proxmox_lang::try_block!({
             let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
                 format_err!(
                     "unable to open local manifest {:?} - {}",
@@ -329,7 +415,7 @@ async fn pull_snapshot(
             if !client_log_name.exists() {
                 try_client_log_download(worker, reader, &client_log_name).await?;
             }
-            worker.log("no data changes");
+            task_log!(worker, "no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(()); // nothing changed
         }
@@ -339,7 +425,7 @@ async fn pull_snapshot(
 
     for item in manifest.files() {
         let mut path = tgt_store.base_path();
-        path.push(snapshot.relative_path());
+        path.push(&snapshot_relative_path);
         path.push(&item.filename);
 
         if path.exists() {
@@ -350,7 +436,7 @@ async fn pull_snapshot(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            worker.log(format!("detected changed file {:?} - {}", path, err));
+                            task_log!(worker, "detected changed file {:?} - {}", path, err);
                         }
                     }
                 }
@@ -360,7 +446,7 @@ async fn pull_snapshot(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            worker.log(format!("detected changed file {:?} - {}", path, err));
+                            task_log!(worker, "detected changed file {:?} - {}", path, err);
                         }
                     }
                 }
@@ -370,7 +456,7 @@ async fn pull_snapshot(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            worker.log(format!("detected changed file {:?} - {}", path, err));
+                            task_log!(worker, "detected changed file {:?} - {}", path, err);
                         }
                     }
                 }
@@ -390,7 +476,7 @@ async fn pull_snapshot(
             &mut chunk_reader,
             tgt_store.clone(),
             snapshot,
-            &item,
+            item,
             downloaded_chunks.clone(),
         )
         .await?;
@@ -410,47 +496,49 @@ async fn pull_snapshot(
     Ok(())
 }
 
-pub async fn pull_snapshot_from(
+/// Pulls a `snapshot` into `tgt_store`, differentiating between new snapshots (removed on error)
+/// and existing ones (kept even on error).
+async fn pull_snapshot_from(
     worker: &WorkerTask,
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
+    snapshot: &pbs_api_types::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-    let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
+    // FIXME: Namespace support requires source AND target namespace
+    let ns = BackupNamespace::root();
+    let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&ns, snapshot)?;
 
+    let snapshot_path = snapshot.to_string();
     if is_new {
-        worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
+        task_log!(worker, "sync snapshot {:?}", snapshot_path);
 
         if let Err(err) = pull_snapshot(
             worker,
             reader,
             tgt_store.clone(),
-            &snapshot,
+            snapshot,
             downloaded_chunks,
         )
         .await
         {
-            if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
-                worker.log(format!("cleanup error - {}", cleanup_err));
+            if let Err(cleanup_err) = tgt_store.remove_backup_dir(&ns, snapshot, true) {
+                task_log!(worker, "cleanup error - {}", cleanup_err);
             }
             return Err(err);
         }
-        worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
+        task_log!(worker, "sync snapshot {:?} done", snapshot_path);
     } else {
-        worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
+        task_log!(worker, "re-sync snapshot {:?}", snapshot_path);
         pull_snapshot(
             worker,
             reader,
             tgt_store.clone(),
-            &snapshot,
+            snapshot,
             downloaded_chunks,
         )
         .await?;
-        worker.log(format!(
-            "re-sync snapshot {:?} done",
-            snapshot.relative_path()
-        ));
+        task_log!(worker, "re-sync snapshot {:?} done", snapshot_path);
     }
 
     Ok(())
@@ -478,14 +566,12 @@ impl SkipInfo {
     fn affected(&self) -> Result<String, Error> {
         match self.count {
             0 => Ok(String::new()),
-            1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest),
-            _ => {
-                Ok(format!(
-                    "{} .. {}",
-                    proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?,
-                    proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?,
-                ))
-            }
+            1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
+            _ => Ok(format!(
+                "{} .. {}",
+                proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
+                proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
+            )),
         }
     }
 }
@@ -501,36 +587,53 @@ impl std::fmt::Display for SkipInfo {
     }
 }
 
-pub async fn pull_group(
+/// Pulls a group according to `params`.
+///
+/// Pulling a group consists of the following steps:
+/// - Query the list of snapshots available for this group on the remote, sort by snapshot time
+/// - Get last snapshot timestamp on local datastore
+/// - Iterate over list of snapshots
+/// -- Recreate client/BackupReader
+/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
+/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
+///
+/// Permission checks:
+/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
+/// - local group owner is already checked by pull_store
+async fn pull_group(
     worker: &WorkerTask,
     client: &HttpClient,
-    src_repo: &BackupRepository,
-    tgt_store: Arc<DataStore>,
-    group: &BackupGroup,
-    delete: bool,
+    params: &PullParameters,
+    group: &pbs_api_types::BackupGroup,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
+    // FIXME: Namespace support
+    let ns = BackupNamespace::root();
+
+    let path = format!(
+        "api2/json/admin/datastore/{}/snapshots",
+        params.source.store()
+    );
 
     let args = json!({
-        "backup-type": group.backup_type(),
-        "backup-id": group.backup_id(),
+        "backup-type": group.ty,
+        "backup-id": group.id,
     });
 
     let mut result = client.get(&path, Some(args)).await?;
     let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
 
-    list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
+    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
 
     client.login().await?; // make sure auth is complete
 
     let fingerprint = client.fingerprint();
 
-    let last_sync = tgt_store.last_successful_backup(group)?;
+    let last_sync = params.store.last_successful_backup(&ns, group)?;
 
     let mut remote_snapshots = std::collections::HashSet::new();
 
-    // start with 16384 chunks (up to 65GB)
+    // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
     progress.group_snapshots = list.len() as u64;
@@ -542,24 +645,23 @@ pub async fn pull_group(
     };
 
     for (pos, item) in list.into_iter().enumerate() {
-        let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
+        let snapshot = item.backup;
 
         // in-progress backups can't be synced
         if item.size.is_none() {
-            worker.log(format!(
+            task_log!(
+                worker,
                 "skipping snapshot {} - in-progress backup",
                 snapshot
-            ));
+            );
             continue;
         }
 
-        let backup_time = snapshot.backup_time();
-
-        remote_snapshots.insert(backup_time);
+        remote_snapshots.insert(snapshot.time);
 
         if let Some(last_sync_time) = last_sync {
-            if last_sync_time > backup_time {
-                skip_info.update(backup_time);
+            if last_sync_time > snapshot.time {
+                skip_info.update(snapshot.time);
                 continue;
             }
         }
@@ -567,22 +669,23 @@ pub async fn pull_group(
         // get updated auth_info (new tickets)
         let auth_info = client.login().await?;
 
-        let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
+        let options =
+            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
+                .rate_limit(params.limit.clone());
 
         let new_client = HttpClient::new(
-            src_repo.host(),
-            src_repo.port(),
-            src_repo.auth_id(),
+            params.source.host(),
+            params.source.port(),
+            params.source.auth_id(),
             options,
         )?;
 
         let reader = BackupReader::start(
             new_client,
             None,
-            src_repo.store(),
-            snapshot.group().backup_type(),
-            snapshot.group().backup_id(),
-            backup_time,
+            params.source.store(),
+            &ns,
+            &snapshot,
             true,
         )
         .await?;
@@ -590,30 +693,42 @@ pub async fn pull_group(
         let result = pull_snapshot_from(
             worker,
             reader,
-            tgt_store.clone(),
+            params.store.clone(),
             &snapshot,
             downloaded_chunks.clone(),
         )
         .await;
 
         progress.done_snapshots = pos as u64 + 1;
-        worker.log(format!("percentage done: {}", progress));
+        task_log!(worker, "percentage done: {}", progress);
 
         result?; // stop on error
     }
 
-    if delete {
-        let local_list = group.list_backups(&tgt_store.base_path())?;
+    if params.remove_vanished {
+        let group = params.store.backup_group(ns.clone(), group.clone());
+        let local_list = group.list_backups()?;
         for info in local_list {
             let backup_time = info.backup_dir.backup_time();
             if remote_snapshots.contains(&backup_time) {
                 continue;
             }
-            worker.log(format!(
+            if info.backup_dir.is_protected() {
+                task_log!(
+                    worker,
+                    "don't delete vanished snapshot {:?} (protected)",
+                    info.backup_dir.relative_path()
+                );
+                continue;
+            }
+            task_log!(
+                worker,
                 "delete vanished snapshot {:?}",
                 info.backup_dir.relative_path()
-            ));
-            tgt_store.remove_backup_dir(&info.backup_dir, false)?;
+            );
+            params
+                .store
+                .remove_backup_dir(&ns, info.backup_dir.as_ref(), false)?;
         }
     }
 
@@ -624,18 +739,34 @@ pub async fn pull_group(
     Ok(())
 }
 
+/// Pulls a store according to `params`.
+///
+/// Pulling a store consists of the following steps:
+/// - Query list of groups on the remote
+/// - Filter list according to configured group filters
+/// - Iterate list and attempt to pull each group in turn
+/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
+///   not or no longer available on the remote
+///
+/// Permission checks:
+/// - access to local datastore and remote entry need to be checked at call site
+/// - remote groups are filtered by remote
+/// - owner check for vanished groups done here
 pub async fn pull_store(
     worker: &WorkerTask,
     client: &HttpClient,
-    src_repo: &BackupRepository,
-    tgt_store: Arc<DataStore>,
-    delete: bool,
-    auth_id: Authid,
+    params: &PullParameters,
 ) -> Result<(), Error> {
+    // FIXME: Namespace support requires source AND target namespace
+    let ns = BackupNamespace::root();
+    let local_ns = BackupNamespace::root();
+
     // explicit create shared lock to prevent GC on newly created chunks
-    let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
+    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
 
-    let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
+    // FIXME: Namespaces! AND: If we make this API call recurse down namespaces we need to do the
+    // same down in the `remove_vanished` case!
+    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
 
     let mut result = client
         .get(&path, None)
@@ -644,93 +775,131 @@ pub async fn pull_store(
 
     let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
 
-    worker.log(format!("found {} groups to sync", list.len()));
-
+    let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-        let type_order = a.backup_type.cmp(&b.backup_type);
+        let type_order = a.backup.ty.cmp(&b.backup.ty);
         if type_order == std::cmp::Ordering::Equal {
-            a.backup_id.cmp(&b.backup_id)
+            a.backup.id.cmp(&b.backup.id)
         } else {
             type_order
         }
     });
 
+    let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+        filters.iter().any(|filter| group.matches(filter))
+    };
+
+    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
+
+    let list = if let Some(ref group_filter) = &params.group_filter {
+        let unfiltered_count = list.len();
+        let list: Vec<pbs_api_types::BackupGroup> = list
+            .into_iter()
+            .filter(|group| apply_filters(group, group_filter))
+            .collect();
+        task_log!(
+            worker,
+            "found {} groups to sync (out of {} total)",
+            list.len(),
+            unfiltered_count
+        );
+        list
+    } else {
+        task_log!(worker, "found {} groups to sync", total_count);
+        list
+    };
+
     let mut errors = false;
 
     let mut new_groups = std::collections::HashSet::new();
-    for item in list.iter() {
-        new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
+    for group in list.iter() {
+        new_groups.insert(group.clone());
     }
 
     let mut progress = StoreProgress::new(list.len() as u64);
 
-    for (done, item) in list.into_iter().enumerate() {
+    for (done, group) in list.into_iter().enumerate() {
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
 
-        let group = BackupGroup::new(&item.backup_type, &item.backup_id);
-
-        let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
-            Ok(result) => result,
-            Err(err) => {
-                worker.log(format!(
-                    "sync group {}/{} failed - group lock failed: {}",
-                    item.backup_type, item.backup_id, err
-                ));
-                errors = true; // do not stop here, instead continue
-                continue;
-            }
-        };
+        let (owner, _lock_guard) =
+            match params
+                .store
+                .create_locked_backup_group(&ns, &group, &params.owner)
+            {
+                Ok(result) => result,
+                Err(err) => {
+                    task_log!(
+                        worker,
+                        "sync group {} failed - group lock failed: {}",
+                        &group,
+                        err
+                    );
+                    errors = true; // do not stop here, instead continue
+                    continue;
+                }
+            };
 
         // permission check
-        if auth_id != owner {
+        if params.owner != owner {
             // only the owner is allowed to create additional snapshots
-            worker.log(format!(
-                "sync group {}/{} failed - owner check failed ({} != {})",
-                item.backup_type, item.backup_id, auth_id, owner
-            ));
+            task_log!(
+                worker,
+                "sync group {} failed - owner check failed ({} != {})",
+                &group,
+                params.owner,
+                owner
+            );
             errors = true; // do not stop here, instead continue
-        } else if let Err(err) = pull_group(
-            worker,
-            client,
-            src_repo,
-            tgt_store.clone(),
-            &group,
-            delete,
-            &mut progress,
-        )
-        .await
-        {
-            worker.log(format!(
-                "sync group {}/{} failed - {}",
-                item.backup_type, item.backup_id, err,
-            ));
+        } else if let Err(err) = pull_group(worker, client, params, &group, &mut progress).await {
+            task_log!(worker, "sync group {} failed - {}", &group, err,);
             errors = true; // do not stop here, instead continue
         }
     }
 
-    if delete {
-        let result: Result<(), Error> = proxmox::try_block!({
-            let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
-            for local_group in local_groups {
-                if new_groups.contains(&local_group) {
+    if params.remove_vanished {
+        let result: Result<(), Error> = proxmox_lang::try_block!({
+            // FIXME: See above comment about namespaces & recursion
+            for local_group in params.store.iter_backup_groups(Default::default())? {
+                let local_group = local_group?;
+                if new_groups.contains(local_group.as_ref()) {
+                    continue;
+                }
+                let owner = params.store.get_owner(&local_ns, &local_group.group())?;
+                if check_backup_owner(&owner, &params.owner).is_err() {
                     continue;
                 }
-                worker.log(format!(
+                if let Some(ref group_filter) = &params.group_filter {
+                    if !apply_filters(local_group.as_ref(), group_filter) {
+                        continue;
+                    }
+                }
+                task_log!(
+                    worker,
                     "delete vanished group '{}/{}'",
                     local_group.backup_type(),
                     local_group.backup_id()
-                ));
-                if let Err(err) = tgt_store.remove_backup_group(&local_group) {
-                    worker.log(err.to_string());
-                    errors = true;
+                );
+                match params.store.remove_backup_group(&ns, local_group.as_ref()) {
+                    Ok(true) => {}
+                    Ok(false) => {
+                        task_log!(
+                            worker,
+                            "kept some protected snapshots of group '{}'",
+                            local_group
+                        );
+                    }
+                    Err(err) => {
+                        task_log!(worker, "{}", err);
+                        errors = true;
+                    }
                 }
             }
             Ok(())
         });
         if let Err(err) = result {
-            worker.log(format!("error during cleanup: {}", err));
+            task_log!(worker, "error during cleanup: {}", err);
             errors = true;
         };
     }