]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/api2/pull.rs
use RateLimitConfig for HttpClient and pull
[proxmox-backup.git] / src / api2 / pull.rs
index 4280d922aacba8e02dff15eb41ecd10e8cf7d01e..aaeed4dea0c98c5c4f877817816b93e626decc81 100644 (file)
@@ -1,24 +1,24 @@
 //! Sync datastore from remote server
-use std::sync::{Arc};
+use std::convert::TryFrom;
 
 use anyhow::{format_err, Error};
 use futures::{select, future::FutureExt};
 
 use proxmox_schema::api;
 use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
+use proxmox_sys::task_log;
 
-use pbs_client::{HttpClient, BackupRepository};
 use pbs_api_types::{
-    Remote, Authid, SyncJobConfig,
+    Authid, SyncJobConfig, GroupFilter, RateLimitConfig, GROUP_FILTER_LIST_SCHEMA,
     DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
 };
-use pbs_tools::task_log;
 use proxmox_rest_server::WorkerTask;
 use pbs_config::CachedUserInfo;
-use pbs_datastore::DataStore;
 
-use crate::server::{jobstate::Job, pull::pull_store};
+use crate::server::pull::{PullParameters, pull_store};
+use crate::server::jobstate::Job;
+
 
 pub fn check_pull_privs(
     auth_id: &Authid,
@@ -40,27 +40,20 @@ pub fn check_pull_privs(
     Ok(())
 }
 
-pub async fn get_pull_parameters(
-    store: &str,
-    remote: &str,
-    remote_store: &str,
-) -> Result<(HttpClient, BackupRepository, Arc<DataStore>), Error> {
-
-    let tgt_store = DataStore::lookup_datastore(store)?;
-
-    let (remote_config, _digest) = pbs_config::remote::config()?;
-    let remote: Remote = remote_config.lookup("remote", remote)?;
-
-    let src_repo = BackupRepository::new(
-        Some(remote.config.auth_id.clone()),
-        Some(remote.config.host.clone()),
-        remote.config.port,
-        remote_store.to_string(),
-    );
-
-    let client = crate::api2::config::remote::remote_client(remote).await?;
-
-    Ok((client, src_repo, tgt_store))
+impl TryFrom<&SyncJobConfig> for PullParameters {
+    type Error = Error;
+
+    fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
+        PullParameters::new(
+            &sync_job.store,
+            &sync_job.remote,
+            &sync_job.remote_store,
+            sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
+            sync_job.remove_vanished,
+            sync_job.group_filter.clone(),
+            sync_job.limit.clone(),
+        )
+    }
 }
 
 pub fn do_sync_job(
@@ -94,9 +87,8 @@ pub fn do_sync_job(
 
             let worker_future = async move {
 
-                let delete = sync_job.remove_vanished.unwrap_or(true);
-                let sync_owner = sync_job.owner.unwrap_or_else(|| Authid::root_auth_id().clone());
-                let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?;
+                let pull_params = PullParameters::try_from(&sync_job)?;
+                let client = pull_params.client().await?;
 
                 task_log!(worker, "Starting datastore sync job '{}'", job_id);
                 if let Some(event_str) = schedule {
@@ -110,7 +102,7 @@ pub fn do_sync_job(
                     sync_job.remote_store,
                 );
 
-                pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?;
+                pull_store(&worker, &client, &pull_params).await?;
 
                 task_log!(worker, "sync job '{}' end", &job_id);
 
@@ -161,6 +153,14 @@ pub fn do_sync_job(
                 schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
                 optional: true,
             },
+            "group-filter": {
+                schema: GROUP_FILTER_LIST_SCHEMA,
+                optional: true,
+            },
+            limit: {
+                type: RateLimitConfig,
+                flatten: true,
+            }
         },
     },
     access: {
@@ -178,23 +178,34 @@ async fn pull (
     remote: String,
     remote_store: String,
     remove_vanished: Option<bool>,
+    group_filter: Option<Vec<GroupFilter>>,
+    limit: RateLimitConfig,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
 
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let delete = remove_vanished.unwrap_or(true);
+    let delete = remove_vanished.unwrap_or(false);
 
     check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?;
 
-    let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
+    let pull_params = PullParameters::new(
+        &store,
+        &remote,
+        &remote_store,
+        auth_id.clone(),
+        remove_vanished,
+        group_filter,
+        limit,
+    )?;
+    let client = pull_params.client().await?;
 
     // fixme: set to_stdout to false?
     let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
 
         task_log!(worker, "sync datastore '{}' start", store);
 
-        let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, auth_id);
+        let pull_future = pull_store(&worker, &client, &pull_params);
         let future = select!{
             success = pull_future.fuse() => success,
             abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,