use proxmox::tools::fs::CreateOptions;
use proxmox_lang::try_block;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
-use proxmox_http::client::{RateLimiter, RateLimitedStream};
+use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
use pbs_tools::{task_log, task_warn};
use pbs_datastore::DataStore;
fn lookup_rate_limiter(
peer: Option<std::net::SocketAddr>,
-) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+) -> (Option<Arc<dyn ShareableRateLimit>>, Option<Arc<dyn ShareableRateLimit>>) {
let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap();
let now = proxmox_time::epoch_i64();
use anyhow::Error;
use cidr::IpInet;
-use proxmox_http::client::RateLimiter;
+use proxmox_http::client::{ShareableRateLimit, RateLimiter};
use proxmox_section_config::SectionConfigData;
use proxmox_systemd::daily_duration::{parse_daily_duration, DailyDuration};
last_update: i64,
last_traffic_control_generation: usize,
rules: Vec<ParsedTcRule>,
- limiter_map: HashMap<String, (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>)>,
+ limiter_map: HashMap<String, (Option<Arc<dyn ShareableRateLimit>>, Option<Arc<dyn ShareableRateLimit>>)>,
use_utc: bool, // currently only used for testing
}
}
}
+fn create_limiter(
+ rate: u64,
+ burst: u64,
+ _direction: bool, // false => in, true => out
+) -> Result<Arc<dyn ShareableRateLimit>, Error> {
+ Ok(Arc::new(Mutex::new(RateLimiter::new(rate, burst))))
+}
+
impl TrafficControlCache {
pub fn new() -> Self {
self.update_config(&config)
}
+
fn update_config(&mut self, config: &SectionConfigData) -> Result<(), Error> {
self.limiter_map.retain(|key, _value| config.sections.contains_key(key));
Some(ref read_limiter) => {
match rule.rate_in {
Some(rate_in) => {
- read_limiter.lock().unwrap().
- update_rate(rate_in, rule.burst_in.unwrap_or(rate_in));
+ read_limiter.update_rate(rate_in, rule.burst_in.unwrap_or(rate_in));
}
None => entry.0 = None,
}
}
None => {
if let Some(rate_in) = rule.rate_in {
- let limiter = RateLimiter::new(rate_in, rule.burst_in.unwrap_or(rate_in));
- entry.0 = Some(Arc::new(Mutex::new(limiter)));
+ let limiter = create_limiter(rate_in, rule.burst_in.unwrap_or(rate_in), false)?;
+ entry.0 = Some(limiter);
}
}
}
Some(ref write_limiter) => {
match rule.rate_out {
Some(rate_out) => {
- write_limiter.lock().unwrap().
- update_rate(rate_out, rule.burst_out.unwrap_or(rate_out));
+ write_limiter.update_rate(rate_out, rule.burst_out.unwrap_or(rate_out));
}
None => entry.1 = None,
}
}
None => {
if let Some(rate_out) = rule.rate_out {
- let limiter = RateLimiter::new(rate_out, rule.burst_out.unwrap_or(rate_out));
- entry.1 = Some(Arc::new(Mutex::new(limiter)));
+ let limiter = create_limiter(rate_out, rule.burst_out.unwrap_or(rate_out), true)?;
+ entry.1 = Some(limiter);
}
}
}
&self,
peer: Option<SocketAddr>,
now: i64,
- ) -> (&str, Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+ ) -> (&str, Option<Arc<dyn ShareableRateLimit>>, Option<Arc<dyn ShareableRateLimit>>) {
let peer = match peer {
None => return ("", None, None),