]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/pull.rs
use RateLimitConfig for HttpClient and pull
[proxmox-backup.git] / src / server / pull.rs
index 555f0a942a57abee410061830f7f97ddc3a79db8..acf2c265fee1c802cee810205a71d05b35029ee8 100644 (file)
@@ -12,9 +12,14 @@ use serde_json::json;
 use http::StatusCode;
 
 use proxmox_router::HttpError;
+use proxmox_sys::task_log;
 
-use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
-use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_api_types::{
+    Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote,
+    SnapshotListItem,
+};
+
+use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -23,7 +28,6 @@ use pbs_datastore::manifest::{
     CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
 };
 use pbs_tools::sha::sha256;
-use pbs_tools::task_log;
 use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
 use proxmox_rest_server::WorkerTask;
 
@@ -33,6 +37,48 @@ use crate::tools::ParallelHandler;
 // fixme: delete vanished groups
 // Todo: correctly lock backup groups
 
+pub struct PullParameters {
+    remote: Remote,
+    source: BackupRepository,
+    store: Arc<DataStore>,
+    owner: Authid,
+    remove_vanished: bool,
+    group_filter: Option<Vec<GroupFilter>>,
+    limit: RateLimitConfig,
+}
+
+impl PullParameters {
+    pub fn new(
+        store: &str,
+        remote: &str,
+        remote_store: &str,
+        owner: Authid,
+        remove_vanished: Option<bool>,
+        group_filter: Option<Vec<GroupFilter>>,
+        limit: RateLimitConfig,
+    ) -> Result<Self, Error> {
+        let store = DataStore::lookup_datastore(store)?;
+
+        let (remote_config, _digest) = pbs_config::remote::config()?;
+        let remote: Remote = remote_config.lookup("remote", remote)?;
+
+        let remove_vanished = remove_vanished.unwrap_or(false);
+
+        let source = BackupRepository::new(
+            Some(remote.config.auth_id.clone()),
+            Some(remote.config.host.clone()),
+            remote.config.port,
+            remote_store.to_string(),
+        );
+
+        Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit })
+    }
+
+    pub async fn client(&self) -> Result<HttpClient, Error> {
+        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+    }
+}
+
 async fn pull_index_chunks<I: IndexFile>(
     worker: &WorkerTask,
     chunk_reader: RemoteChunkReader,
@@ -83,7 +129,7 @@ async fn pull_index_chunks<I: IndexFile>(
             let verify_and_write_channel = verify_and_write_channel.clone();
 
             Ok::<_, Error>(async move {
-                let chunk_exists = pbs_runtime::block_in_place(|| {
+                let chunk_exists = proxmox_async::runtime::block_in_place(|| {
                     target.cond_touch_chunk(&info.digest, false)
                 })?;
                 if chunk_exists {
@@ -95,7 +141,7 @@ async fn pull_index_chunks<I: IndexFile>(
                 let raw_size = chunk.raw_size() as usize;
 
                 // decode, verify and write in a separate threads to maximize throughput
-                pbs_runtime::block_in_place(|| {
+                proxmox_async::runtime::block_in_place(|| {
                     verify_and_write_channel.send((chunk, info.digest, info.size()))
                 })?;
 
@@ -503,13 +549,11 @@ impl std::fmt::Display for SkipInfo {
 pub async fn pull_group(
     worker: &WorkerTask,
     client: &HttpClient,
-    src_repo: &BackupRepository,
-    tgt_store: Arc<DataStore>,
+    params: &PullParameters,
     group: &BackupGroup,
-    delete: bool,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
+    let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
 
     let args = json!({
         "backup-type": group.backup_type(),
@@ -525,7 +569,7 @@ pub async fn pull_group(
 
     let fingerprint = client.fingerprint();
 
-    let last_sync = tgt_store.last_successful_backup(group)?;
+    let last_sync = params.store.last_successful_backup(group)?;
 
     let mut remote_snapshots = std::collections::HashSet::new();
 
@@ -566,16 +610,16 @@ pub async fn pull_group(
         let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
 
         let new_client = HttpClient::new(
-            src_repo.host(),
-            src_repo.port(),
-            src_repo.auth_id(),
+            params.source.host(),
+            params.source.port(),
+            params.source.auth_id(),
             options,
         )?;
 
         let reader = BackupReader::start(
             new_client,
             None,
-            src_repo.store(),
+            params.source.store(),
             snapshot.group().backup_type(),
             snapshot.group().backup_id(),
             backup_time,
@@ -586,7 +630,7 @@ pub async fn pull_group(
         let result = pull_snapshot_from(
             worker,
             reader,
-            tgt_store.clone(),
+            params.store.clone(),
             &snapshot,
             downloaded_chunks.clone(),
         )
@@ -598,14 +642,14 @@ pub async fn pull_group(
         result?; // stop on error
     }
 
-    if delete {
-        let local_list = group.list_backups(&tgt_store.base_path())?;
+    if params.remove_vanished {
+        let local_list = group.list_backups(&params.store.base_path())?;
         for info in local_list {
             let backup_time = info.backup_dir.backup_time();
             if remote_snapshots.contains(&backup_time) {
                 continue;
             }
-            if info.backup_dir.is_protected(tgt_store.base_path()) {
+            if info.backup_dir.is_protected(params.store.base_path()) {
                 task_log!(
                     worker,
                     "don't delete vanished snapshot {:?} (protected)",
@@ -614,7 +658,7 @@ pub async fn pull_group(
                 continue;
             }
             task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
-            tgt_store.remove_backup_dir(&info.backup_dir, false)?;
+            params.store.remove_backup_dir(&info.backup_dir, false)?;
         }
     }
 
@@ -628,15 +672,12 @@ pub async fn pull_group(
 pub async fn pull_store(
     worker: &WorkerTask,
     client: &HttpClient,
-    src_repo: &BackupRepository,
-    tgt_store: Arc<DataStore>,
-    delete: bool,
-    auth_id: Authid,
+    params: &PullParameters,
 ) -> Result<(), Error> {
     // explicit create shared lock to prevent GC on newly created chunks
-    let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
+    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
 
-    let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
+    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
 
     let mut result = client
         .get(&path, None)
@@ -645,8 +686,7 @@ pub async fn pull_store(
 
     let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
 
-    task_log!(worker, "found {} groups to sync", list.len());
-
+    let total_count = list.len();
     list.sort_unstable_by(|a, b| {
         let type_order = a.backup_type.cmp(&b.backup_type);
         if type_order == std::cmp::Ordering::Equal {
@@ -656,29 +696,53 @@ pub async fn pull_store(
         }
     });
 
+    let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
+        filters
+            .iter()
+            .any(|filter| group.matches(filter))
+    };
+
+    let list:Vec<BackupGroup> = list
+        .into_iter()
+        .map(|item| BackupGroup::new(item.backup_type, item.backup_id))
+        .collect();
+
+    let list = if let Some(ref group_filter) = &params.group_filter {
+        let unfiltered_count = list.len();
+        let list:Vec<BackupGroup> = list
+            .into_iter()
+            .filter(|group| {
+                apply_filters(&group, group_filter)
+            })
+            .collect();
+        task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count);
+        list
+    } else {
+        task_log!(worker, "found {} groups to sync", total_count);
+        list
+    };
+
     let mut errors = false;
 
     let mut new_groups = std::collections::HashSet::new();
-    for item in list.iter() {
-        new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
+    for group in list.iter() {
+        new_groups.insert(group.clone());
     }
 
     let mut progress = StoreProgress::new(list.len() as u64);
 
-    for (done, item) in list.into_iter().enumerate() {
+    for (done, group) in list.into_iter().enumerate() {
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
 
-        let group = BackupGroup::new(&item.backup_type, &item.backup_id);
-
-        let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
+        let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, &params.owner) {
             Ok(result) => result,
             Err(err) => {
                 task_log!(
                     worker,
-                    "sync group {}/{} failed - group lock failed: {}",
-                    item.backup_type, item.backup_id, err
+                    "sync group {} failed - group lock failed: {}",
+                    &group, err
                 );
                 errors = true; // do not stop here, instead continue
                 continue;
@@ -686,48 +750,51 @@ pub async fn pull_store(
         };
 
         // permission check
-        if auth_id != owner {
+        if params.owner != owner {
             // only the owner is allowed to create additional snapshots
             task_log!(
                 worker,
-                "sync group {}/{} failed - owner check failed ({} != {})",
-                item.backup_type, item.backup_id, auth_id, owner
+                "sync group {} failed - owner check failed ({} != {})",
+                &group, params.owner, owner
             );
             errors = true; // do not stop here, instead continue
         } else if let Err(err) = pull_group(
             worker,
             client,
-            src_repo,
-            tgt_store.clone(),
+            params,
             &group,
-            delete,
             &mut progress,
         )
         .await
         {
             task_log!(
                 worker,
-                "sync group {}/{} failed - {}",
-                item.backup_type, item.backup_id, err,
+                "sync group {} failed - {}",
+                &group, err,
             );
             errors = true; // do not stop here, instead continue
         }
     }
 
-    if delete {
+    if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
-            let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
+            let local_groups = BackupInfo::list_backup_groups(&params.store.base_path())?;
             for local_group in local_groups {
                 if new_groups.contains(&local_group) {
                     continue;
                 }
+                if let Some(ref group_filter) = &params.group_filter {
+                    if !apply_filters(&local_group, group_filter) {
+                        continue;
+                    }
+                }
                 task_log!(
                     worker,
                     "delete vanished group '{}/{}'",
                     local_group.backup_type(),
                     local_group.backup_id()
                 );
-                match tgt_store.remove_backup_group(&local_group) {
+                match params.store.remove_backup_group(&local_group) {
                     Ok(true) => {},
                     Ok(false) => {
                         task_log!(worker, "kept some protected snapshots of group '{}'", local_group);