]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/pull.rs
split the namespace out of BackupGroup/Dir api types
[proxmox-backup.git] / src / server / pull.rs
index 97eee1e6d24866ada223612adfdbf0a298ae6a60..0ad9aac6f01e3dd294b77b56e21a8724897cfb60 100644 (file)
@@ -8,42 +8,56 @@ use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
-use serde_json::json;
 use http::StatusCode;
+use serde_json::json;
 
 use proxmox_router::HttpError;
 use proxmox_sys::task_log;
 
-use pbs_api_types::{Authid, GroupFilter, GroupListItem, Remote, SnapshotListItem};
+use pbs_api_types::{
+    Authid, BackupNamespace, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote,
+    SnapshotListItem,
+};
 
-use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
+use pbs_client::{
+    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
+};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
-    CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
+    archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
+use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
 use pbs_tools::sha::sha256;
-use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
 use proxmox_rest_server::WorkerTask;
 
-use crate::tools::ParallelHandler;
-
-// fixme: implement filters
-// fixme: delete vanished groups
-// Todo: correctly lock backup groups
+use crate::tools::parallel_handler::ParallelHandler;
 
+/// Parameters for a pull operation.
 pub struct PullParameters {
+    /// Remote that is pulled from
     remote: Remote,
+    /// Full specification of remote datastore
     source: BackupRepository,
+    /// Local store that is pulled into
     store: Arc<DataStore>,
+    /// Owner of synced groups (needs to match local owner of pre-existing groups)
     owner: Authid,
+    /// Whether to remove groups which exist locally, but not on the remote end
     remove_vanished: bool,
+    /// Filters for reducing the pull scope
     group_filter: Option<Vec<GroupFilter>>,
+    /// Rate limits for all transfers from `remote`
+    limit: RateLimitConfig,
 }
 
 impl PullParameters {
+    /// Creates a new instance of `PullParameters`.
+    ///
+    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
+    /// [BackupRepository] with `remote_store`.
     pub fn new(
         store: &str,
         remote: &str,
@@ -51,8 +65,9 @@ impl PullParameters {
         owner: Authid,
         remove_vanished: Option<bool>,
         group_filter: Option<Vec<GroupFilter>>,
+        limit: RateLimitConfig,
     ) -> Result<Self, Error> {
-        let store = DataStore::lookup_datastore(store)?;
+        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
 
         let (remote_config, _digest) = pbs_config::remote::config()?;
         let remote: Remote = remote_config.lookup("remote", remote)?;
@@ -66,11 +81,20 @@ impl PullParameters {
             remote_store.to_string(),
         );
 
-        Ok(Self { remote, source, store, owner, remove_vanished, group_filter })
+        Ok(Self {
+            remote,
+            source,
+            store,
+            owner,
+            remove_vanished,
+            group_filter,
+            limit,
+        })
     }
 
+    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
     pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote).await
+        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
     }
 }
 
@@ -105,7 +129,7 @@ async fn pull_index_chunks<I: IndexFile>(
         "sync chunk writer",
         4,
         move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
-            // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
+            // println!("verify and write {}", hex::encode(&digest));
             chunk.verify_unencrypted(size as usize, &digest)?;
             target2.insert_chunk(&chunk, &digest)?;
             Ok(())
@@ -128,10 +152,10 @@ async fn pull_index_chunks<I: IndexFile>(
                     target.cond_touch_chunk(&info.digest, false)
                 })?;
                 if chunk_exists {
-                    //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest));
+                    //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest));
                     return Ok::<_, Error>(());
                 }
-                //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest));
+                //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest));
                 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
                 let raw_size = chunk.raw_size() as usize;
 
@@ -158,7 +182,7 @@ async fn pull_index_chunks<I: IndexFile>(
     let bytes = bytes.load(Ordering::SeqCst);
 
     task_log!(
-        worker, 
+        worker,
         "downloaded {} bytes ({:.2} MiB/s)",
         bytes,
         (bytes as f64) / (1024.0 * 1024.0 * elapsed)
@@ -204,18 +228,26 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
     Ok(())
 }
 
+/// Pulls a single file referenced by a manifest.
+///
+/// Pulling an archive consists of the following steps:
+/// - Create tmp file for archive
+/// - Download archive file into tmp file
+/// - Verify tmp file checksum
+/// - if archive is an index, pull referenced chunks
+/// - Rename tmp file into real path
 async fn pull_single_archive(
     worker: &WorkerTask,
     reader: &BackupReader,
     chunk_reader: &mut RemoteChunkReader,
     tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
+    snapshot: &pbs_api_types::BackupDir,
     archive_info: &FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let archive_name = &archive_info.filename;
     let mut path = tgt_store.base_path();
-    path.push(snapshot.relative_path());
+    path.push(snapshot.to_string());
     path.push(archive_name);
 
     let mut tmp_path = path.clone();
@@ -303,19 +335,30 @@ async fn try_client_log_download(
     Ok(())
 }
 
+/// Actual implementation of pulling a snapshot.
+///
+/// Pulling a snapshot consists of the following steps:
+/// - (Re)download the manifest
+/// -- if it matches, only download log and treat snapshot as already synced
+/// - Iterate over referenced files
+/// -- if file already exists, verify contents
+/// -- if not, pull it from the remote
+/// - Download log if not already existing
 async fn pull_snapshot(
     worker: &WorkerTask,
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
+    snapshot: &pbs_api_types::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
+    let snapshot_relative_path = snapshot.to_string();
+
     let mut manifest_name = tgt_store.base_path();
-    manifest_name.push(snapshot.relative_path());
+    manifest_name.push(&snapshot_relative_path);
     manifest_name.push(MANIFEST_BLOB_NAME);
 
     let mut client_log_name = tgt_store.base_path();
-    client_log_name.push(snapshot.relative_path());
+    client_log_name.push(&snapshot_relative_path);
     client_log_name.push(CLIENT_LOG_BLOB_NAME);
 
     let mut tmp_manifest_name = manifest_name.clone();
@@ -382,7 +425,7 @@ async fn pull_snapshot(
 
     for item in manifest.files() {
         let mut path = tgt_store.base_path();
-        path.push(snapshot.relative_path());
+        path.push(&snapshot_relative_path);
         path.push(&item.filename);
 
         if path.exists() {
@@ -433,7 +476,7 @@ async fn pull_snapshot(
             &mut chunk_reader,
             tgt_store.clone(),
             snapshot,
-            &item,
+            item,
             downloaded_chunks.clone(),
         )
         .await?;
@@ -453,44 +496,49 @@ async fn pull_snapshot(
     Ok(())
 }
 
-pub async fn pull_snapshot_from(
+/// Pulls a `snapshot` into `tgt_store`, differentiating between new snapshots (removed on error)
+/// and existing ones (kept even on error).
+async fn pull_snapshot_from(
     worker: &WorkerTask,
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
+    snapshot: &pbs_api_types::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-    let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
+    // FIXME: Namespace support requires source AND target namespace
+    let ns = BackupNamespace::root();
+    let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&ns, snapshot)?;
 
+    let snapshot_path = snapshot.to_string();
     if is_new {
-        task_log!(worker, "sync snapshot {:?}", snapshot.relative_path());
+        task_log!(worker, "sync snapshot {:?}", snapshot_path);
 
         if let Err(err) = pull_snapshot(
             worker,
             reader,
             tgt_store.clone(),
-            &snapshot,
+            snapshot,
             downloaded_chunks,
         )
         .await
         {
-            if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
+            if let Err(cleanup_err) = tgt_store.remove_backup_dir(&ns, snapshot, true) {
                 task_log!(worker, "cleanup error - {}", cleanup_err);
             }
             return Err(err);
         }
-        task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path());
+        task_log!(worker, "sync snapshot {:?} done", snapshot_path);
     } else {
-        task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path());
+        task_log!(worker, "re-sync snapshot {:?}", snapshot_path);
         pull_snapshot(
             worker,
             reader,
             tgt_store.clone(),
-            &snapshot,
+            snapshot,
             downloaded_chunks,
         )
         .await?;
-        task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path());
+        task_log!(worker, "re-sync snapshot {:?} done", snapshot_path);
     }
 
     Ok(())
@@ -519,13 +567,11 @@ impl SkipInfo {
         match self.count {
             0 => Ok(String::new()),
             1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?),
-            _ => {
-                Ok(format!(
-                    "{} .. {}",
-                    proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
-                    proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
-                ))
-            }
+            _ => Ok(format!(
+                "{} .. {}",
+                proxmox_time::epoch_to_rfc3339_utc(self.oldest)?,
+                proxmox_time::epoch_to_rfc3339_utc(self.newest)?,
+            )),
         }
     }
 }
@@ -541,34 +587,53 @@ impl std::fmt::Display for SkipInfo {
     }
 }
 
-pub async fn pull_group(
+/// Pulls a group according to `params`.
+///
+/// Pulling a group consists of the following steps:
+/// - Query the list of snapshots available for this group on the remote, sort by snapshot time
+/// - Get last snapshot timestamp on local datastore
+/// - Iterate over list of snapshots
+/// -- Recreate client/BackupReader
+/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
+/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
+///
+/// Permission checks:
+/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
+/// - local group owner is already checked by pull_store
+async fn pull_group(
     worker: &WorkerTask,
     client: &HttpClient,
     params: &PullParameters,
-    group: &BackupGroup,
+    group: &pbs_api_types::BackupGroup,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
+    // FIXME: Namespace support
+    let ns = BackupNamespace::root();
+
+    let path = format!(
+        "api2/json/admin/datastore/{}/snapshots",
+        params.source.store()
+    );
 
     let args = json!({
-        "backup-type": group.backup_type(),
-        "backup-id": group.backup_id(),
+        "backup-type": group.ty,
+        "backup-id": group.id,
     });
 
     let mut result = client.get(&path, Some(args)).await?;
     let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
 
-    list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
+    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
 
     client.login().await?; // make sure auth is complete
 
     let fingerprint = client.fingerprint();
 
-    let last_sync = params.store.last_successful_backup(group)?;
+    let last_sync = params.store.last_successful_backup(&ns, group)?;
 
     let mut remote_snapshots = std::collections::HashSet::new();
 
-    // start with 16384 chunks (up to 65GB)
+    // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
     progress.group_snapshots = list.len() as u64;
@@ -580,21 +645,23 @@ pub async fn pull_group(
     };
 
     for (pos, item) in list.into_iter().enumerate() {
-        let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
+        let snapshot = item.backup;
 
         // in-progress backups can't be synced
         if item.size.is_none() {
-            task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot);
+            task_log!(
+                worker,
+                "skipping snapshot {} - in-progress backup",
+                snapshot
+            );
             continue;
         }
 
-        let backup_time = snapshot.backup_time();
-
-        remote_snapshots.insert(backup_time);
+        remote_snapshots.insert(snapshot.time);
 
         if let Some(last_sync_time) = last_sync {
-            if last_sync_time > backup_time {
-                skip_info.update(backup_time);
+            if last_sync_time > snapshot.time {
+                skip_info.update(snapshot.time);
                 continue;
             }
         }
@@ -602,7 +669,9 @@ pub async fn pull_group(
         // get updated auth_info (new tickets)
         let auth_info = client.login().await?;
 
-        let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
+        let options =
+            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
+                .rate_limit(params.limit.clone());
 
         let new_client = HttpClient::new(
             params.source.host(),
@@ -615,9 +684,8 @@ pub async fn pull_group(
             new_client,
             None,
             params.source.store(),
-            snapshot.group().backup_type(),
-            snapshot.group().backup_id(),
-            backup_time,
+            &ns,
+            &snapshot,
             true,
         )
         .await?;
@@ -638,13 +706,14 @@ pub async fn pull_group(
     }
 
     if params.remove_vanished {
-        let local_list = group.list_backups(&params.store.base_path())?;
+        let group = params.store.backup_group(ns.clone(), group.clone());
+        let local_list = group.list_backups()?;
         for info in local_list {
             let backup_time = info.backup_dir.backup_time();
             if remote_snapshots.contains(&backup_time) {
                 continue;
             }
-            if info.backup_dir.is_protected(params.store.base_path()) {
+            if info.backup_dir.is_protected() {
                 task_log!(
                     worker,
                     "don't delete vanished snapshot {:?} (protected)",
@@ -652,8 +721,14 @@ pub async fn pull_group(
                 );
                 continue;
             }
-            task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
-            params.store.remove_backup_dir(&info.backup_dir, false)?;
+            task_log!(
+                worker,
+                "delete vanished snapshot {:?}",
+                info.backup_dir.relative_path()
+            );
+            params
+                .store
+                .remove_backup_dir(&ns, info.backup_dir.as_ref(), false)?;
         }
     }
 
@@ -664,14 +739,33 @@ pub async fn pull_group(
     Ok(())
 }
 
+/// Pulls a store according to `params`.
+///
+/// Pulling a store consists of the following steps:
+/// - Query list of groups on the remote
+/// - Filter list according to configured group filters
+/// - Iterate list and attempt to pull each group in turn
+/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
+///   not or no longer available on the remote
+///
+/// Permission checks:
+/// - access to local datastore and remote entry need to be checked at call site
+/// - remote groups are filtered by remote
+/// - owner check for vanished groups done here
 pub async fn pull_store(
     worker: &WorkerTask,
     client: &HttpClient,
     params: &PullParameters,
 ) -> Result<(), Error> {
+    // FIXME: Namespace support requires source AND target namespace
+    let ns = BackupNamespace::root();
+    let local_ns = BackupNamespace::root();
+
     // explicit create shared lock to prevent GC on newly created chunks
     let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
 
+    // FIXME: Namespaces! AND: If we make this API call recurse down namespaces we need to do the
+    // same down in the `remove_vanished` case!
     let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
 
     let mut result = client
@@ -683,34 +777,32 @@ pub async fn pull_store(
 
     let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-        let type_order = a.backup_type.cmp(&b.backup_type);
+        let type_order = a.backup.ty.cmp(&b.backup.ty);
         if type_order == std::cmp::Ordering::Equal {
-            a.backup_id.cmp(&b.backup_id)
+            a.backup.id.cmp(&b.backup.id)
         } else {
             type_order
         }
     });
 
-    let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
-        filters
-            .iter()
-            .any(|filter| group.matches(filter))
+    let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+        filters.iter().any(|filter| group.matches(filter))
     };
 
-    let list:Vec<BackupGroup> = list
-        .into_iter()
-        .map(|item| BackupGroup::new(item.backup_type, item.backup_id))
-        .collect();
+    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
 
     let list = if let Some(ref group_filter) = &params.group_filter {
         let unfiltered_count = list.len();
-        let list:Vec<BackupGroup> = list
+        let list: Vec<pbs_api_types::BackupGroup> = list
             .into_iter()
-            .filter(|group| {
-                apply_filters(&group, group_filter)
-            })
+            .filter(|group| apply_filters(group, group_filter))
             .collect();
-        task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count);
+        task_log!(
+            worker,
+            "found {} groups to sync (out of {} total)",
+            list.len(),
+            unfiltered_count
+        );
         list
     } else {
         task_log!(worker, "found {} groups to sync", total_count);
@@ -731,18 +823,23 @@ pub async fn pull_store(
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
 
-        let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, &params.owner) {
-            Ok(result) => result,
-            Err(err) => {
-                task_log!(
-                    worker,
-                    "sync group {} failed - group lock failed: {}",
-                    &group, err
-                );
-                errors = true; // do not stop here, instead continue
-                continue;
-            }
-        };
+        let (owner, _lock_guard) =
+            match params
+                .store
+                .create_locked_backup_group(&ns, &group, &params.owner)
+            {
+                Ok(result) => result,
+                Err(err) => {
+                    task_log!(
+                        worker,
+                        "sync group {} failed - group lock failed: {}",
+                        &group,
+                        err
+                    );
+                    errors = true; // do not stop here, instead continue
+                    continue;
+                }
+            };
 
         // permission check
         if params.owner != owner {
@@ -750,36 +847,31 @@ pub async fn pull_store(
             task_log!(
                 worker,
                 "sync group {} failed - owner check failed ({} != {})",
-                &group, params.owner, owner
+                &group,
+                params.owner,
+                owner
             );
             errors = true; // do not stop here, instead continue
-        } else if let Err(err) = pull_group(
-            worker,
-            client,
-            params,
-            &group,
-            &mut progress,
-        )
-        .await
-        {
-            task_log!(
-                worker,
-                "sync group {} failed - {}",
-                &group, err,
-            );
+        } else if let Err(err) = pull_group(worker, client, params, &group, &mut progress).await {
+            task_log!(worker, "sync group {} failed - {}", &group, err,);
             errors = true; // do not stop here, instead continue
         }
     }
 
     if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
-            let local_groups = BackupInfo::list_backup_groups(&params.store.base_path())?;
-            for local_group in local_groups {
-                if new_groups.contains(&local_group) {
+            // FIXME: See above comment about namespaces & recursion
+            for local_group in params.store.iter_backup_groups(Default::default())? {
+                let local_group = local_group?;
+                if new_groups.contains(local_group.as_ref()) {
+                    continue;
+                }
+                let owner = params.store.get_owner(&local_ns, &local_group.group())?;
+                if check_backup_owner(&owner, &params.owner).is_err() {
                     continue;
                 }
                 if let Some(ref group_filter) = &params.group_filter {
-                    if !apply_filters(&local_group, group_filter) {
+                    if !apply_filters(local_group.as_ref(), group_filter) {
                         continue;
                     }
                 }
@@ -789,11 +881,15 @@ pub async fn pull_store(
                     local_group.backup_type(),
                     local_group.backup_id()
                 );
-                match params.store.remove_backup_group(&local_group) {
-                    Ok(true) => {},
+                match params.store.remove_backup_group(&ns, local_group.as_ref()) {
+                    Ok(true) => {}
                     Ok(false) => {
-                        task_log!(worker, "kept some protected snapshots of group '{}'", local_group);
-                    },
+                        task_log!(
+                            worker,
+                            "kept some protected snapshots of group '{}'",
+                            local_group
+                        );
+                    }
                     Err(err) => {
                         task_log!(worker, "{}", err);
                         errors = true;