]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/pull.rs
split the namespace out of BackupGroup/Dir api types
[proxmox-backup.git] / src / server / pull.rs
index 3b6f912d1f08be86ba99c24016e358b74f0b594e..0ad9aac6f01e3dd294b77b56e21a8724897cfb60 100644 (file)
@@ -15,7 +15,8 @@ use proxmox_router::HttpError;
 use proxmox_sys::task_log;
 
 use pbs_api_types::{
-    Authid, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem,
+    Authid, BackupNamespace, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote,
+    SnapshotListItem,
 };
 
 use pbs_client::{
@@ -28,27 +29,35 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
-use pbs_datastore::{BackupDir, BackupGroup, BackupInfo, DataStore, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
 use pbs_tools::sha::sha256;
 use proxmox_rest_server::WorkerTask;
 
 use crate::tools::parallel_handler::ParallelHandler;
 
-// fixme: implement filters
-// fixme: delete vanished groups
-// Todo: correctly lock backup groups
-
+/// Parameters for a pull operation.
 pub struct PullParameters {
+    /// Remote that is pulled from
     remote: Remote,
+    /// Full specification of remote datastore
     source: BackupRepository,
+    /// Local store that is pulled into
     store: Arc<DataStore>,
+    /// Owner of synced groups (needs to match local owner of pre-existing groups)
     owner: Authid,
+    /// Whether to remove groups which exist locally, but not on the remote end
     remove_vanished: bool,
+    /// Filters for reducing the pull scope
     group_filter: Option<Vec<GroupFilter>>,
+    /// Rate limits for all transfers from `remote`
     limit: RateLimitConfig,
 }
 
 impl PullParameters {
+    /// Creates a new instance of `PullParameters`.
+    ///
+    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
+    /// [BackupRepository] with `remote_store`.
     pub fn new(
         store: &str,
         remote: &str,
@@ -83,6 +92,7 @@ impl PullParameters {
         })
     }
 
+    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
     pub async fn client(&self) -> Result<HttpClient, Error> {
         crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
     }
@@ -218,18 +228,26 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
     Ok(())
 }
 
+/// Pulls a single file referenced by a manifest.
+///
+/// Pulling an archive consists of the following steps:
+/// - Create tmp file for archive
+/// - Download archive file into tmp file
+/// - Verify tmp file checksum
+/// - if archive is an index, pull referenced chunks
+/// - Rename tmp file into real path
 async fn pull_single_archive(
     worker: &WorkerTask,
     reader: &BackupReader,
     chunk_reader: &mut RemoteChunkReader,
     tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
+    snapshot: &pbs_api_types::BackupDir,
     archive_info: &FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let archive_name = &archive_info.filename;
     let mut path = tgt_store.base_path();
-    path.push(snapshot.relative_path());
+    path.push(snapshot.to_string());
     path.push(archive_name);
 
     let mut tmp_path = path.clone();
@@ -317,19 +335,30 @@ async fn try_client_log_download(
     Ok(())
 }
 
+/// Actual implementation of pulling a snapshot.
+///
+/// Pulling a snapshot consists of the following steps:
+/// - (Re)download the manifest
+/// -- if it matches, only download log and treat snapshot as already synced
+/// - Iterate over referenced files
+/// -- if file already exists, verify contents
+/// -- if not, pull it from the remote
+/// - Download log if not already existing
 async fn pull_snapshot(
     worker: &WorkerTask,
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
+    snapshot: &pbs_api_types::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
+    let snapshot_relative_path = snapshot.to_string();
+
     let mut manifest_name = tgt_store.base_path();
-    manifest_name.push(snapshot.relative_path());
+    manifest_name.push(&snapshot_relative_path);
     manifest_name.push(MANIFEST_BLOB_NAME);
 
     let mut client_log_name = tgt_store.base_path();
-    client_log_name.push(snapshot.relative_path());
+    client_log_name.push(&snapshot_relative_path);
     client_log_name.push(CLIENT_LOG_BLOB_NAME);
 
     let mut tmp_manifest_name = manifest_name.clone();
@@ -396,7 +425,7 @@ async fn pull_snapshot(
 
     for item in manifest.files() {
         let mut path = tgt_store.base_path();
-        path.push(snapshot.relative_path());
+        path.push(&snapshot_relative_path);
         path.push(&item.filename);
 
         if path.exists() {
@@ -467,17 +496,22 @@ async fn pull_snapshot(
     Ok(())
 }
 
-pub async fn pull_snapshot_from(
+/// Pulls a `snapshot` into `tgt_store`, differentiating between new snapshots (removed on error)
+/// and existing ones (kept even on error).
+async fn pull_snapshot_from(
     worker: &WorkerTask,
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
+    snapshot: &pbs_api_types::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-    let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(snapshot)?;
+    // FIXME: Namespace support requires source AND target namespace
+    let ns = BackupNamespace::root();
+    let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&ns, snapshot)?;
 
+    let snapshot_path = snapshot.to_string();
     if is_new {
-        task_log!(worker, "sync snapshot {:?}", snapshot.relative_path());
+        task_log!(worker, "sync snapshot {:?}", snapshot_path);
 
         if let Err(err) = pull_snapshot(
             worker,
@@ -488,14 +522,14 @@ pub async fn pull_snapshot_from(
         )
         .await
         {
-            if let Err(cleanup_err) = tgt_store.remove_backup_dir(snapshot, true) {
+            if let Err(cleanup_err) = tgt_store.remove_backup_dir(&ns, snapshot, true) {
                 task_log!(worker, "cleanup error - {}", cleanup_err);
             }
             return Err(err);
         }
-        task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path());
+        task_log!(worker, "sync snapshot {:?} done", snapshot_path);
     } else {
-        task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path());
+        task_log!(worker, "re-sync snapshot {:?}", snapshot_path);
         pull_snapshot(
             worker,
             reader,
@@ -504,11 +538,7 @@ pub async fn pull_snapshot_from(
             downloaded_chunks,
         )
         .await?;
-        task_log!(
-            worker,
-            "re-sync snapshot {:?} done",
-            snapshot.relative_path()
-        );
+        task_log!(worker, "re-sync snapshot {:?} done", snapshot_path);
     }
 
     Ok(())
@@ -557,37 +587,53 @@ impl std::fmt::Display for SkipInfo {
     }
 }
 
-pub async fn pull_group(
+/// Pulls a group according to `params`.
+///
+/// Pulling a group consists of the following steps:
+/// - Query the list of snapshots available for this group on the remote, sort by snapshot time
+/// - Get last snapshot timestamp on local datastore
+/// - Iterate over list of snapshots
+/// -- Recreate client/BackupReader
+/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
+/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
+///
+/// Permission checks:
+/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
+/// - local group owner is already checked by pull_store
+async fn pull_group(
     worker: &WorkerTask,
     client: &HttpClient,
     params: &PullParameters,
-    group: &BackupGroup,
+    group: &pbs_api_types::BackupGroup,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
+    // FIXME: Namespace support
+    let ns = BackupNamespace::root();
+
     let path = format!(
         "api2/json/admin/datastore/{}/snapshots",
         params.source.store()
     );
 
     let args = json!({
-        "backup-type": group.backup_type(),
-        "backup-id": group.backup_id(),
+        "backup-type": group.ty,
+        "backup-id": group.id,
     });
 
     let mut result = client.get(&path, Some(args)).await?;
     let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
 
-    list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
+    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
 
     client.login().await?; // make sure auth is complete
 
     let fingerprint = client.fingerprint();
 
-    let last_sync = params.store.last_successful_backup(group)?;
+    let last_sync = params.store.last_successful_backup(&ns, group)?;
 
     let mut remote_snapshots = std::collections::HashSet::new();
 
-    // start with 16384 chunks (up to 65GB)
+    // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
     progress.group_snapshots = list.len() as u64;
@@ -599,7 +645,7 @@ pub async fn pull_group(
     };
 
     for (pos, item) in list.into_iter().enumerate() {
-        let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
+        let snapshot = item.backup;
 
         // in-progress backups can't be synced
         if item.size.is_none() {
@@ -611,13 +657,11 @@ pub async fn pull_group(
             continue;
         }
 
-        let backup_time = snapshot.backup_time();
-
-        remote_snapshots.insert(backup_time);
+        remote_snapshots.insert(snapshot.time);
 
         if let Some(last_sync_time) = last_sync {
-            if last_sync_time > backup_time {
-                skip_info.update(backup_time);
+            if last_sync_time > snapshot.time {
+                skip_info.update(snapshot.time);
                 continue;
             }
         }
@@ -640,9 +684,8 @@ pub async fn pull_group(
             new_client,
             None,
             params.source.store(),
-            snapshot.group().backup_type(),
-            snapshot.group().backup_id(),
-            backup_time,
+            &ns,
+            &snapshot,
             true,
         )
         .await?;
@@ -663,13 +706,14 @@ pub async fn pull_group(
     }
 
     if params.remove_vanished {
-        let local_list = group.list_backups(&params.store.base_path())?;
+        let group = params.store.backup_group(ns.clone(), group.clone());
+        let local_list = group.list_backups()?;
         for info in local_list {
             let backup_time = info.backup_dir.backup_time();
             if remote_snapshots.contains(&backup_time) {
                 continue;
             }
-            if info.backup_dir.is_protected(params.store.base_path()) {
+            if info.backup_dir.is_protected() {
                 task_log!(
                     worker,
                     "don't delete vanished snapshot {:?} (protected)",
@@ -682,7 +726,9 @@ pub async fn pull_group(
                 "delete vanished snapshot {:?}",
                 info.backup_dir.relative_path()
             );
-            params.store.remove_backup_dir(&info.backup_dir, false)?;
+            params
+                .store
+                .remove_backup_dir(&ns, info.backup_dir.as_ref(), false)?;
         }
     }
 
@@ -693,14 +739,33 @@ pub async fn pull_group(
     Ok(())
 }
 
+/// Pulls a store according to `params`.
+///
+/// Pulling a store consists of the following steps:
+/// - Query list of groups on the remote
+/// - Filter list according to configured group filters
+/// - Iterate list and attempt to pull each group in turn
+/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
+///   not or no longer available on the remote
+///
+/// Permission checks:
+/// - access to local datastore and remote entry need to be checked at call site
+/// - remote groups are filtered by remote
+/// - owner check for vanished groups done here
 pub async fn pull_store(
     worker: &WorkerTask,
     client: &HttpClient,
     params: &PullParameters,
 ) -> Result<(), Error> {
+    // FIXME: Namespace support requires source AND target namespace
+    let ns = BackupNamespace::root();
+    let local_ns = BackupNamespace::root();
+
     // explicit create shared lock to prevent GC on newly created chunks
     let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
 
+    // FIXME: Namespaces! AND: If we make this API call recurse down namespaces we need to do the
+    // same down in the `remove_vanished` case!
     let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
 
     let mut result = client
@@ -712,26 +777,23 @@ pub async fn pull_store(
 
     let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-        let type_order = a.backup_type.cmp(&b.backup_type);
+        let type_order = a.backup.ty.cmp(&b.backup.ty);
         if type_order == std::cmp::Ordering::Equal {
-            a.backup_id.cmp(&b.backup_id)
+            a.backup.id.cmp(&b.backup.id)
         } else {
             type_order
         }
     });
 
-    let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
+    let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
         filters.iter().any(|filter| group.matches(filter))
     };
 
-    let list: Vec<BackupGroup> = list
-        .into_iter()
-        .map(|item| BackupGroup::new(item.backup_type, item.backup_id))
-        .collect();
+    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
 
     let list = if let Some(ref group_filter) = &params.group_filter {
         let unfiltered_count = list.len();
-        let list: Vec<BackupGroup> = list
+        let list: Vec<pbs_api_types::BackupGroup> = list
             .into_iter()
             .filter(|group| apply_filters(group, group_filter))
             .collect();
@@ -761,22 +823,23 @@ pub async fn pull_store(
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
 
-        let (owner, _lock_guard) = match params
-            .store
-            .create_locked_backup_group(&group, &params.owner)
-        {
-            Ok(result) => result,
-            Err(err) => {
-                task_log!(
-                    worker,
-                    "sync group {} failed - group lock failed: {}",
-                    &group,
-                    err
-                );
-                errors = true; // do not stop here, instead continue
-                continue;
-            }
-        };
+        let (owner, _lock_guard) =
+            match params
+                .store
+                .create_locked_backup_group(&ns, &group, &params.owner)
+            {
+                Ok(result) => result,
+                Err(err) => {
+                    task_log!(
+                        worker,
+                        "sync group {} failed - group lock failed: {}",
+                        &group,
+                        err
+                    );
+                    errors = true; // do not stop here, instead continue
+                    continue;
+                }
+            };
 
         // permission check
         if params.owner != owner {
@@ -797,13 +860,18 @@ pub async fn pull_store(
 
     if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
-            let local_groups = BackupInfo::list_backup_groups(&params.store.base_path())?;
-            for local_group in local_groups {
-                if new_groups.contains(&local_group) {
+            // FIXME: See above comment about namespaces & recursion
+            for local_group in params.store.iter_backup_groups(Default::default())? {
+                let local_group = local_group?;
+                if new_groups.contains(local_group.as_ref()) {
+                    continue;
+                }
+                let owner = params.store.get_owner(&local_ns, &local_group.group())?;
+                if check_backup_owner(&owner, &params.owner).is_err() {
                     continue;
                 }
                 if let Some(ref group_filter) = &params.group_filter {
-                    if !apply_filters(&local_group, group_filter) {
+                    if !apply_filters(local_group.as_ref(), group_filter) {
                         continue;
                     }
                 }
@@ -813,7 +881,7 @@ pub async fn pull_store(
                     local_group.backup_type(),
                     local_group.backup_id()
                 );
-                match params.store.remove_backup_group(&local_group) {
+                match params.store.remove_backup_group(&ns, local_group.as_ref()) {
                     Ok(true) => {}
                     Ok(false) => {
                         task_log!(