From 2d5287fbbc5a3b77e3a5bd47aff10100e456148e Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Mon, 22 Nov 2021 06:26:55 +0100 Subject: [PATCH] use RateLimitConfig for HttpClient and pull Signed-off-by: Dietmar Maurer --- pbs-client/src/http_client.rs | 33 ++++++++++++++++--------------- pbs-client/src/tools/mod.rs | 16 +++++++-------- proxmox-backup-client/src/main.rs | 33 +++++++++++++++++++++++-------- src/api2/config/remote.rs | 17 +++++++++++----- src/api2/pull.rs | 9 ++++++++- src/server/pull.rs | 11 ++++++++--- 6 files changed, 77 insertions(+), 42 deletions(-) diff --git a/pbs-client/src/http_client.rs b/pbs-client/src/http_client.rs index 61f05f28..a91b5e00 100644 --- a/pbs-client/src/http_client.rs +++ b/pbs-client/src/http_client.rs @@ -24,7 +24,7 @@ use proxmox_http::client::{HttpsConnector, RateLimiter}; use proxmox_http::uri::build_authority; use proxmox_async::broadcast_future::BroadcastFuture; -use pbs_api_types::{Authid, Userid}; +use pbs_api_types::{Authid, Userid, RateLimitConfig}; use pbs_tools::json::json_object_to_query; use pbs_tools::ticket; use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET; @@ -51,8 +51,7 @@ pub struct HttpClientOptions { ticket_cache: bool, fingerprint_cache: bool, verify_cert: bool, - rate_limit: Option, - bucket_size: Option, + limit: RateLimitConfig, } impl HttpClientOptions { @@ -112,13 +111,8 @@ impl HttpClientOptions { self } - pub fn rate_limit(mut self, rate_limit: Option) -> Self { - self.rate_limit = rate_limit; - self - } - - pub fn bucket_size(mut self, bucket_size: Option) -> Self { - self.bucket_size = bucket_size; + pub fn rate_limit(mut self, rate_limit: RateLimitConfig) -> Self { + self.limit = rate_limit; self } } @@ -133,8 +127,7 @@ impl Default for HttpClientOptions { ticket_cache: false, fingerprint_cache: false, verify_cert: true, - rate_limit: None, - bucket_size: None, + limit: RateLimitConfig::default(), // unlimited } } } @@ -359,10 +352,18 @@ impl HttpClient { httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0))); let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); - if let Some(rate_limit) = options.rate_limit { - let bucket_size = options.bucket_size.unwrap_or_else(|| rate_limit*3); - https.set_read_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size))))); - https.set_write_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size))))); + if let Some(rate_in) = options.limit.rate_in { + let burst_in = options.limit.burst_in.unwrap_or_else(|| rate_in).as_u64(); + https.set_read_limiter(Some(Arc::new(Mutex::new( + RateLimiter::new(rate_in.as_u64(), burst_in) + )))); + } + + if let Some(rate_out) = options.limit.rate_out { + let burst_out = options.limit.burst_out.unwrap_or_else(|| rate_out).as_u64(); + https.set_write_limiter(Some(Arc::new(Mutex::new( + RateLimiter::new(rate_out.as_u64(), burst_out) + )))); } let client = Client::builder() diff --git a/pbs-client/src/tools/mod.rs b/pbs-client/src/tools/mod.rs index a38b1c87..f7a253df 100644 --- a/pbs-client/src/tools/mod.rs +++ b/pbs-client/src/tools/mod.rs @@ -14,7 +14,7 @@ use proxmox_schema::*; use proxmox_router::cli::{complete_file_name, shellword_split}; use proxmox::tools::fs::file_get_json; -use pbs_api_types::{BACKUP_REPO_URL, Authid, UserWithTokens}; +use pbs_api_types::{BACKUP_REPO_URL, Authid, RateLimitConfig, UserWithTokens}; use pbs_datastore::BackupDir; use pbs_tools::json::json_object_to_query; @@ -135,16 +135,16 @@ pub fn extract_repository_from_map(param: &HashMap) -> Option Result { - connect_do(repo.host(), repo.port(), repo.auth_id(), None, None) + let rate_limit = RateLimitConfig::default(); // unlimited + connect_do(repo.host(), repo.port(), repo.auth_id(), rate_limit) .map_err(|err| format_err!("error building client for repository {} - {}", repo, err)) } pub fn connect_rate_limited( repo: &BackupRepository, - rate: Option, - bucket_size: Option, + rate_limit: RateLimitConfig, ) -> Result { - connect_do(repo.host(), repo.port(), repo.auth_id(), rate, bucket_size) + connect_do(repo.host(), repo.port(), repo.auth_id(), rate_limit) .map_err(|err| format_err!("error building client for repository {} - {}", repo, err)) } @@ -152,15 +152,13 @@ fn connect_do( server: &str, port: u16, auth_id: &Authid, - rate_limit: Option, - bucket_size: Option, + rate_limit: RateLimitConfig, ) -> Result { let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok(); let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?; let options = HttpClientOptions::new_interactive(password, fingerprint) - .rate_limit(rate_limit) - .bucket_size(bucket_size); + .rate_limit(rate_limit); HttpClient::new(server, port, auth_id, options) } diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index b4ad166a..199ab582 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -23,8 +23,9 @@ use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; use pbs_api_types::{ BACKUP_ID_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, TRAFFIC_CONTROL_BURST_SCHEMA, TRAFFIC_CONTROL_RATE_SCHEMA, - Authid, CryptMode, Fingerprint, GroupListItem, PruneListItem, PruneOptions, - SnapshotListItem, StorageStatus, + Authid, CryptMode, Fingerprint, GroupListItem, HumanByte, + PruneListItem, PruneOptions, RateLimitConfig, SnapshotListItem, + StorageStatus, }; use pbs_client::{ BACKUP_SOURCE_SCHEMA, @@ -640,8 +641,16 @@ async fn create_backup( verify_chunk_size(size)?; } - let rate_limit = param["rate"].as_u64(); - let bucket_size = param["burst"].as_u64(); + let rate = match param["rate"].as_str() { + Some(s) => Some(s.parse::()?), + None => None, + }; + let burst = match param["burst"].as_str() { + Some(s) => Some(s.parse::()?), + None => None, + }; + + let rate_limit = RateLimitConfig::with_same_inout(rate, burst); let crypto = crypto_parameters(¶m)?; @@ -737,7 +746,7 @@ async fn create_backup( let backup_time = backup_time_opt.unwrap_or_else(epoch_i64); - let client = connect_rate_limited(&repo, rate_limit, bucket_size)?; + let client = connect_rate_limited(&repo, rate_limit)?; record_repository(&repo); println!("Starting backup: {}/{}/{}", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)?); @@ -1092,10 +1101,18 @@ async fn restore(param: Value) -> Result { let archive_name = json::required_string_param(¶m, "archive-name")?; - let rate_limit = param["rate"].as_u64(); - let bucket_size = param["burst"].as_u64(); + let rate = match param["rate"].as_str() { + Some(s) => Some(s.parse::()?), + None => None, + }; + let burst = match param["burst"].as_str() { + Some(s) => Some(s.parse::()?), + None => None, + }; + + let rate_limit = RateLimitConfig::with_same_inout(rate, burst); - let client = connect_rate_limited(&repo, rate_limit, bucket_size)?; + let client = connect_rate_limited(&repo, rate_limit)?; record_repository(&repo); let path = json::required_string_param(¶m, "snapshot")?; diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs index 0e2413a9..c43b30c4 100644 --- a/src/api2/config/remote.rs +++ b/src/api2/config/remote.rs @@ -12,7 +12,7 @@ use pbs_client::{HttpClient, HttpClientOptions}; use pbs_api_types::{ REMOTE_ID_SCHEMA, REMOTE_PASSWORD_SCHEMA, Remote, RemoteConfig, RemoteConfigUpdater, Authid, PROXMOX_CONFIG_DIGEST_SCHEMA, DATASTORE_SCHEMA, GroupListItem, - DataStoreListItem, SyncJobConfig, PRIV_REMOTE_AUDIT, PRIV_REMOTE_MODIFY, + DataStoreListItem, RateLimitConfig, SyncJobConfig, PRIV_REMOTE_AUDIT, PRIV_REMOTE_MODIFY, }; use pbs_config::sync; @@ -280,8 +280,15 @@ pub fn delete_remote(name: String, digest: Option) -> Result<(), Error> } /// Helper to get client for remote.cfg entry -pub async fn remote_client(remote: &Remote) -> Result { - let options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone()); +pub async fn remote_client( + remote: &Remote, + limit: Option, +) -> Result { + let mut options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone()); + + if let Some(limit) = limit { + options = options.rate_limit(limit); + } let client = HttpClient::new( &remote.config.host, @@ -325,7 +332,7 @@ pub async fn scan_remote_datastores(name: String) -> Result Result for PullParameters { sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(), sync_job.remove_vanished, sync_job.group_filter.clone(), + sync_job.limit.clone(), ) } } @@ -156,6 +157,10 @@ pub fn do_sync_job( schema: GROUP_FILTER_LIST_SCHEMA, optional: true, }, + limit: { + type: RateLimitConfig, + flatten: true, + } }, }, access: { @@ -174,6 +179,7 @@ async fn pull ( remote_store: String, remove_vanished: Option, group_filter: Option>, + limit: RateLimitConfig, _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { @@ -190,6 +196,7 @@ async fn pull ( auth_id.clone(), remove_vanished, group_filter, + limit, )?; let client = pull_params.client().await?; diff --git a/src/server/pull.rs b/src/server/pull.rs index 97eee1e6..acf2c265 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -14,7 +14,10 @@ use http::StatusCode; use proxmox_router::HttpError; use proxmox_sys::task_log; -use pbs_api_types::{Authid, GroupFilter, GroupListItem, Remote, SnapshotListItem}; +use pbs_api_types::{ + Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote, + SnapshotListItem, +}; use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress}; use pbs_datastore::data_blob::DataBlob; @@ -41,6 +44,7 @@ pub struct PullParameters { owner: Authid, remove_vanished: bool, group_filter: Option>, + limit: RateLimitConfig, } impl PullParameters { @@ -51,6 +55,7 @@ impl PullParameters { owner: Authid, remove_vanished: Option, group_filter: Option>, + limit: RateLimitConfig, ) -> Result { let store = DataStore::lookup_datastore(store)?; @@ -66,11 +71,11 @@ impl PullParameters { remote_store.to_string(), ); - Ok(Self { remote, source, store, owner, remove_vanished, group_filter }) + Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit }) } pub async fn client(&self) -> Result { - crate::api2::config::remote::remote_client(&self.remote).await + crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await } } -- 2.39.2