]> git.proxmox.com Git - proxmox-backup.git/commitdiff
cached_traffic_control: use ShareableRateLimit trait object
authorDietmar Maurer <dietmar@proxmox.com>
Sat, 13 Nov 2021 14:43:56 +0000 (15:43 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Sat, 13 Nov 2021 16:49:38 +0000 (17:49 +0100)
Cargo.toml
pbs-client/Cargo.toml
src/bin/proxmox-backup-proxy.rs
src/cached_traffic_control.rs

index 8cebbd29df943b53ba0a550f74089375dc9a9076..7d9644acdae244f347cf4ace1a3e7077ec832b3e 100644 (file)
@@ -98,7 +98,7 @@ pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
 proxmox = { version = "0.15.0", features = [ "sortable-macro" ] }
-proxmox-http = { version = "0.5.2", features = [ "client", "http-helpers", "websocket" ] }
+proxmox-http = { version = "0.5.3", features = [ "client", "http-helpers", "websocket" ] }
 proxmox-io = "1"
 proxmox-lang = "1"
 proxmox-router = { version = "1.1", features = [ "cli" ] }
index c7b4b1c8d9f3bdd8b494dd640d1fa62245783439..4f6614adbbd776abbbdd3ff42b2ca3ec98d33803 100644 (file)
@@ -30,7 +30,7 @@ xdg = "2.2"
 pathpatterns = "0.1.2"
 proxmox = "0.15.0"
 proxmox-fuse = "0.1.1"
-proxmox-http = { version = "0.5.2", features = [ "client", "http-helpers", "websocket" ] }
+proxmox-http = { version = "0.5.3", features = [ "client", "http-helpers", "websocket" ] }
 proxmox-io = { version = "1", features = [ "tokio" ] }
 proxmox-lang = "1"
 proxmox-router = { version = "1.1", features = [ "cli" ] }
index a3bd4cfa611765ab2fc7a0a90a0af321474c4fac..0fc61ed51d850775570ff2ae69d296f0af169325 100644 (file)
@@ -21,7 +21,7 @@ use proxmox::sys::linux::socket::set_tcp_keepalive;
 use proxmox::tools::fs::CreateOptions;
 use proxmox_lang::try_block;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
-use proxmox_http::client::{RateLimiter, RateLimitedStream};
+use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
 
 use pbs_tools::{task_log, task_warn};
 use pbs_datastore::DataStore;
@@ -1093,7 +1093,7 @@ lazy_static::lazy_static!{
 
 fn lookup_rate_limiter(
     peer: Option<std::net::SocketAddr>,
-) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+) -> (Option<Arc<dyn ShareableRateLimit>>, Option<Arc<dyn ShareableRateLimit>>) {
     let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap();
 
     let now = proxmox_time::epoch_i64();
index 9c7387d6114fb4c5ce5c6c26e7e63822eddecdda..deb6e2343d314e88200d0078fdbf242b95bd054f 100644 (file)
@@ -6,7 +6,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use anyhow::Error;
 use cidr::IpInet;
 
-use proxmox_http::client::RateLimiter;
+use proxmox_http::client::{ShareableRateLimit, RateLimiter};
 use proxmox_section_config::SectionConfigData;
 
 use proxmox_systemd::daily_duration::{parse_daily_duration, DailyDuration};
@@ -26,7 +26,7 @@ pub struct TrafficControlCache {
     last_update: i64,
     last_traffic_control_generation: usize,
     rules: Vec<ParsedTcRule>,
-    limiter_map: HashMap<String, (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>)>,
+    limiter_map: HashMap<String, (Option<Arc<dyn ShareableRateLimit>>, Option<Arc<dyn ShareableRateLimit>>)>,
     use_utc: bool, // currently only used for testing
 }
 
@@ -84,6 +84,14 @@ fn cannonical_ip(ip: IpAddr) -> IpAddr {
     }
 }
 
+fn create_limiter(
+    rate: u64,
+    burst: u64,
+    _direction: bool, // false => in, true => out
+) -> Result<Arc<dyn ShareableRateLimit>, Error> {
+    Ok(Arc::new(Mutex::new(RateLimiter::new(rate, burst))))
+}
+
 impl TrafficControlCache {
 
     pub fn new() -> Self {
@@ -130,6 +138,7 @@ impl TrafficControlCache {
         self.update_config(&config)
     }
 
+
     fn update_config(&mut self, config: &SectionConfigData) -> Result<(), Error> {
         self.limiter_map.retain(|key, _value| config.sections.contains_key(key));
 
@@ -146,16 +155,15 @@ impl TrafficControlCache {
                 Some(ref read_limiter) => {
                     match rule.rate_in {
                         Some(rate_in) => {
-                            read_limiter.lock().unwrap().
-                                update_rate(rate_in, rule.burst_in.unwrap_or(rate_in));
+                            read_limiter.update_rate(rate_in, rule.burst_in.unwrap_or(rate_in));
                         }
                         None => entry.0 = None,
                     }
                 }
                 None => {
                     if let Some(rate_in) = rule.rate_in {
-                        let limiter = RateLimiter::new(rate_in, rule.burst_in.unwrap_or(rate_in));
-                        entry.0 = Some(Arc::new(Mutex::new(limiter)));
+                        let limiter = create_limiter(rate_in, rule.burst_in.unwrap_or(rate_in), false)?;
+                        entry.0 = Some(limiter);
                     }
                 }
             }
@@ -164,16 +172,15 @@ impl TrafficControlCache {
                 Some(ref write_limiter) => {
                     match rule.rate_out {
                         Some(rate_out) => {
-                            write_limiter.lock().unwrap().
-                                update_rate(rate_out, rule.burst_out.unwrap_or(rate_out));
+                            write_limiter.update_rate(rate_out, rule.burst_out.unwrap_or(rate_out));
                         }
                         None => entry.1 = None,
                     }
                 }
                 None => {
                     if let Some(rate_out) = rule.rate_out {
-                        let limiter = RateLimiter::new(rate_out, rule.burst_out.unwrap_or(rate_out));
-                        entry.1 = Some(Arc::new(Mutex::new(limiter)));
+                        let limiter = create_limiter(rate_out, rule.burst_out.unwrap_or(rate_out), true)?;
+                        entry.1 = Some(limiter);
                     }
                 }
             }
@@ -212,7 +219,7 @@ impl TrafficControlCache {
         &self,
         peer: Option<SocketAddr>,
         now: i64,
-    ) -> (&str, Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+    ) -> (&str, Option<Arc<dyn ShareableRateLimit>>, Option<Arc<dyn ShareableRateLimit>>) {
 
         let peer = match peer {
             None => return ("", None, None),