]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/pull.rs
use RateLimitConfig for HttpClient and pull
[proxmox-backup.git] / src / server / pull.rs
index 2c454e2d649ad1fe6f1b87fa5a865782439eea26..acf2c265fee1c802cee810205a71d05b35029ee8 100644 (file)
@@ -12,9 +12,14 @@ use serde_json::json;
 use http::StatusCode;
 
 use proxmox_router::HttpError;
+use proxmox_sys::task_log;
 
-use pbs_api_types::{Authid, GroupListItem, Remote, SnapshotListItem};
-use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_api_types::{
+    Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote,
+    SnapshotListItem,
+};
+
+use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -23,7 +28,6 @@ use pbs_datastore::manifest::{
     CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
 };
 use pbs_tools::sha::sha256;
-use pbs_tools::task_log;
 use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
 use proxmox_rest_server::WorkerTask;
 
@@ -39,6 +43,8 @@ pub struct PullParameters {
     store: Arc<DataStore>,
     owner: Authid,
     remove_vanished: bool,
+    group_filter: Option<Vec<GroupFilter>>,
+    limit: RateLimitConfig,
 }
 
 impl PullParameters {
@@ -48,13 +54,15 @@ impl PullParameters {
         remote_store: &str,
         owner: Authid,
         remove_vanished: Option<bool>,
+        group_filter: Option<Vec<GroupFilter>>,
+        limit: RateLimitConfig,
     ) -> Result<Self, Error> {
         let store = DataStore::lookup_datastore(store)?;
 
         let (remote_config, _digest) = pbs_config::remote::config()?;
         let remote: Remote = remote_config.lookup("remote", remote)?;
 
-        let remove_vanished = remove_vanished.unwrap_or(true);
+        let remove_vanished = remove_vanished.unwrap_or(false);
 
         let source = BackupRepository::new(
             Some(remote.config.auth_id.clone()),
@@ -63,11 +71,11 @@ impl PullParameters {
             remote_store.to_string(),
         );
 
-        Ok(Self { remote, source, store, owner, remove_vanished })
+        Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit })
     }
 
     pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote).await
+        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
     }
 }
 
@@ -121,7 +129,7 @@ async fn pull_index_chunks<I: IndexFile>(
             let verify_and_write_channel = verify_and_write_channel.clone();
 
             Ok::<_, Error>(async move {
-                let chunk_exists = pbs_runtime::block_in_place(|| {
+                let chunk_exists = proxmox_async::runtime::block_in_place(|| {
                     target.cond_touch_chunk(&info.digest, false)
                 })?;
                 if chunk_exists {
@@ -133,7 +141,7 @@ async fn pull_index_chunks<I: IndexFile>(
                 let raw_size = chunk.raw_size() as usize;
 
                 // decode, verify and write in a separate threads to maximize throughput
-                pbs_runtime::block_in_place(|| {
+                proxmox_async::runtime::block_in_place(|| {
                     verify_and_write_channel.send((chunk, info.digest, info.size()))
                 })?;
 
@@ -678,8 +686,7 @@ pub async fn pull_store(
 
     let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
 
-    task_log!(worker, "found {} groups to sync", list.len());
-
+    let total_count = list.len();
     list.sort_unstable_by(|a, b| {
         let type_order = a.backup_type.cmp(&b.backup_type);
         if type_order == std::cmp::Ordering::Equal {
@@ -689,11 +696,32 @@ pub async fn pull_store(
         }
     });
 
+    let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
+        filters
+            .iter()
+            .any(|filter| group.matches(filter))
+    };
+
     let list:Vec<BackupGroup> = list
         .into_iter()
         .map(|item| BackupGroup::new(item.backup_type, item.backup_id))
         .collect();
 
+    let list = if let Some(ref group_filter) = &params.group_filter {
+        let unfiltered_count = list.len();
+        let list:Vec<BackupGroup> = list
+            .into_iter()
+            .filter(|group| {
+                apply_filters(&group, group_filter)
+            })
+            .collect();
+        task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count);
+        list
+    } else {
+        task_log!(worker, "found {} groups to sync", total_count);
+        list
+    };
+
     let mut errors = false;
 
     let mut new_groups = std::collections::HashSet::new();
@@ -755,6 +783,11 @@ pub async fn pull_store(
                 if new_groups.contains(&local_group) {
                     continue;
                 }
+                if let Some(ref group_filter) = &params.group_filter {
+                    if !apply_filters(&local_group, group_filter) {
+                        continue;
+                    }
+                }
                 task_log!(
                     worker,
                     "delete vanished group '{}/{}'",