]> git.proxmox.com Git - proxmox-backup.git/blame - src/traffic_control_cache.rs
api: use if-let pattern for error-only handling
[proxmox-backup.git] / src / traffic_control_cache.rs
CommitLineData
e705b305
DM
1//! Traffic control implementation
2
610150a4
DM
3use std::collections::HashMap;
4use std::net::{IpAddr, Ipv4Addr, SocketAddr};
9531d2c5 5use std::sync::{Arc, Mutex};
a0172d76 6use std::time::Instant;
610150a4
DM
7
8use anyhow::Error;
9use cidr::IpInet;
10
5aeeb44a 11use proxmox_http::{RateLimiter, ShareableRateLimit};
610150a4
DM
12use proxmox_section_config::SectionConfigData;
13
15cc41b6 14use proxmox_time::{parse_daily_duration, DailyDuration, TmEditor};
610150a4
DM
15
16use pbs_api_types::TrafficControlRule;
17
cb80ffc1 18use pbs_config::ConfigVersionCache;
610150a4 19
d6644e29 20use crate::tools::SharedRateLimiter;
de21d4ef 21
8e70d421
WB
22pub type SharedRateLimit = Arc<dyn ShareableRateLimit>;
23
9531d2c5 24lazy_static::lazy_static! {
e705b305 25 /// Shared traffic control cache singleton.
a0172d76
DM
26 pub static ref TRAFFIC_CONTROL_CACHE: Arc<Mutex<TrafficControlCache>> =
27 Arc::new(Mutex::new(TrafficControlCache::new()));
28}
29
610150a4 30struct ParsedTcRule {
9531d2c5
TL
31 config: TrafficControlRule, // original rule config
32 networks: Vec<IpInet>, // parsed networks
610150a4
DM
33 timeframe: Vec<DailyDuration>, // parsed timeframe
34}
35
e705b305 36/// Traffic control statistics
a0172d76 37pub struct TrafficStat {
d20137e5 38 /// Total incoming traffic (bytes)
a0172d76 39 pub traffic_in: u64,
e705b305 40 /// Incoming data rate (bytes/second)
a0172d76 41 pub rate_in: u64,
e705b305 42 /// Total outgoing traffic (bytes)
a0172d76 43 pub traffic_out: u64,
e705b305 44 /// Outgoing data rate (bytes/second)
a0172d76
DM
45 pub rate_out: u64,
46}
47
e705b305
DM
48/// Cache rules from `/etc/proxmox-backup/traffic-control.cfg`
49/// together with corresponding rate limiter implementation.
610150a4 50pub struct TrafficControlCache {
e705b305 51 // use shared memory to make it work with daemon restarts
de21d4ef 52 use_shared_memory: bool,
a0172d76
DM
53 last_rate_compute: Instant,
54 current_rate_map: HashMap<String, TrafficStat>,
610150a4
DM
55 last_update: i64,
56 last_traffic_control_generation: usize,
57 rules: Vec<ParsedTcRule>,
8e70d421 58 limiter_map: HashMap<String, (Option<SharedRateLimit>, Option<SharedRateLimit>)>,
610150a4
DM
59 use_utc: bool, // currently only used for testing
60}
61
9531d2c5
TL
62fn timeframe_match(duration_list: &[DailyDuration], now: &TmEditor) -> bool {
63 if duration_list.is_empty() {
64 return true;
65 }
610150a4
DM
66
67 for duration in duration_list.iter() {
68 if duration.time_match_with_tm_editor(now) {
69 return true;
70 }
71 }
72
73 false
74}
75
9531d2c5 76fn network_match_len(networks: &[IpInet], ip: &IpAddr) -> Option<u8> {
610150a4
DM
77 let mut match_len = None;
78
79 for cidr in networks.iter() {
80 if cidr.contains(ip) {
81 let network_length = cidr.network_length();
82 match match_len {
83 Some(len) => {
84 if network_length > len {
85 match_len = Some(network_length);
86 }
87 }
88 None => match_len = Some(network_length),
89 }
90 }
91 }
92 match_len
93}
94
95fn cannonical_ip(ip: IpAddr) -> IpAddr {
96 // TODO: use std::net::IpAddr::to_cananical once stable
97 match ip {
98 IpAddr::V4(addr) => IpAddr::V4(addr),
9531d2c5
TL
99 IpAddr::V6(addr) => match addr.octets() {
100 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
101 IpAddr::V4(Ipv4Addr::new(a, b, c, d))
610150a4 102 }
9531d2c5
TL
103 _ => IpAddr::V6(addr),
104 },
610150a4
DM
105 }
106}
107
d5f58006 108fn create_limiter(
de21d4ef
DM
109 use_shared_memory: bool,
110 name: &str,
d5f58006
DM
111 rate: u64,
112 burst: u64,
8e70d421 113) -> Result<SharedRateLimit, Error> {
de21d4ef
DM
114 if use_shared_memory {
115 let limiter = SharedRateLimiter::mmap_shmem(name, rate, burst)?;
116 Ok(Arc::new(limiter))
117 } else {
118 Ok(Arc::new(Mutex::new(RateLimiter::new(rate, burst))))
119 }
d5f58006
DM
120}
121
610150a4 122impl TrafficControlCache {
e705b305 123 fn new() -> Self {
610150a4 124 Self {
de21d4ef 125 use_shared_memory: true,
610150a4
DM
126 rules: Vec::new(),
127 limiter_map: HashMap::new(),
128 last_traffic_control_generation: 0,
129 last_update: 0,
130 use_utc: false,
a0172d76
DM
131 last_rate_compute: Instant::now(),
132 current_rate_map: HashMap::new(),
610150a4
DM
133 }
134 }
135
e705b305
DM
136 /// Reload rules from configuration file
137 ///
138 /// Only reload if configuration file was updated
139 /// ([ConfigVersionCache]) or last update is older that 60
140 /// seconds.
610150a4 141 pub fn reload(&mut self, now: i64) {
cb80ffc1
DM
142 let version_cache = match ConfigVersionCache::new() {
143 Ok(cache) => cache,
610150a4 144 Err(err) => {
9531d2c5
TL
145 log::error!(
146 "TrafficControlCache::reload failed in ConfigVersionCache::new: {}",
147 err
148 );
610150a4
DM
149 return;
150 }
151 };
152
cb80ffc1 153 let traffic_control_generation = version_cache.traffic_control_generation();
610150a4 154
9531d2c5
TL
155 if (self.last_update != 0)
156 && (traffic_control_generation == self.last_traffic_control_generation)
157 && ((now - self.last_update) < 60)
158 {
159 return;
160 }
610150a4
DM
161
162 log::debug!("reload traffic control rules");
163
164 self.last_traffic_control_generation = traffic_control_generation;
165 self.last_update = now;
166
6aff2de5
MS
167 if let Err(err) = self.reload_impl() {
168 log::error!("TrafficControlCache::reload failed -> {err}");
610150a4
DM
169 }
170 }
171
172 fn reload_impl(&mut self) -> Result<(), Error> {
173 let (config, _) = pbs_config::traffic_control::config()?;
174
175 self.update_config(&config)
176 }
177
e705b305
DM
178 /// Compute current data rates.
179 ///
180 /// This should be called every second (from `proxmox-backup-proxy`).
a0172d76 181 pub fn compute_current_rates(&mut self) {
a0172d76 182 let elapsed = self.last_rate_compute.elapsed().as_micros();
9531d2c5
TL
183 if elapsed < 200_000 {
184 return;
185 } // not enough data
a0172d76
DM
186
187 let mut new_rate_map = HashMap::new();
188
189 for (rule, (read_limit, write_limit)) in self.limiter_map.iter() {
190 let traffic_in = read_limit.as_ref().map(|l| l.traffic()).unwrap_or(0);
191 let traffic_out = write_limit.as_ref().map(|l| l.traffic()).unwrap_or(0);
192
193 let traffic_diff_in;
194 let traffic_diff_out;
195
196 if let Some(stat) = self.current_rate_map.get(rule) {
197 traffic_diff_in = traffic_in.saturating_sub(stat.traffic_in);
198 traffic_diff_out = traffic_out.saturating_sub(stat.traffic_out);
199 } else {
200 traffic_diff_in = 0;
201 traffic_diff_out = 0;
202 }
203
204 let rate_in = ((traffic_diff_in as u128) * 1_000_000) / elapsed;
205 let rate_out = ((traffic_diff_out as u128) * 1_000_000) / elapsed;
206
207 let stat = TrafficStat {
208 traffic_in,
209 traffic_out,
210 rate_in: rate_in.try_into().unwrap_or(u64::MAX),
211 rate_out: rate_out.try_into().unwrap_or(u64::MAX),
212 };
213 new_rate_map.insert(rule.clone(), stat);
214 }
215
216 self.current_rate_map = new_rate_map;
217
218 self.last_rate_compute = Instant::now()
219 }
220
e705b305 221 /// Returns current [TrafficStat] for each configured rule.
a0172d76
DM
222 pub fn current_rate_map(&self) -> &HashMap<String, TrafficStat> {
223 &self.current_rate_map
224 }
d5f58006 225
610150a4 226 fn update_config(&mut self, config: &SectionConfigData) -> Result<(), Error> {
9531d2c5
TL
227 self.limiter_map
228 .retain(|key, _value| config.sections.contains_key(key));
610150a4 229
9531d2c5 230 let rules: Vec<TrafficControlRule> = config.convert_to_typed_array("rule")?;
610150a4
DM
231
232 let mut active_rules = Vec::new();
233
234 for rule in rules {
9531d2c5
TL
235 let entry = self
236 .limiter_map
237 .entry(rule.name.clone())
238 .or_insert((None, None));
56472190 239 let limit = &rule.limit;
610150a4
DM
240
241 match entry.0 {
9531d2c5
TL
242 Some(ref read_limiter) => match limit.rate_in {
243 Some(rate_in) => {
244 read_limiter.update_rate(
245 rate_in.as_u64(),
246 limit.burst_in.unwrap_or(rate_in).as_u64(),
247 );
610150a4 248 }
9531d2c5
TL
249 None => entry.0 = None,
250 },
610150a4 251 None => {
56472190 252 if let Some(rate_in) = limit.rate_in {
de21d4ef
DM
253 let name = format!("{}.in", rule.name);
254 let limiter = create_limiter(
255 self.use_shared_memory,
256 &name,
118515db 257 rate_in.as_u64(),
56472190 258 limit.burst_in.unwrap_or(rate_in).as_u64(),
de21d4ef 259 )?;
d5f58006 260 entry.0 = Some(limiter);
610150a4
DM
261 }
262 }
263 }
264
265 match entry.1 {
9531d2c5
TL
266 Some(ref write_limiter) => match limit.rate_out {
267 Some(rate_out) => {
268 write_limiter.update_rate(
269 rate_out.as_u64(),
270 limit.burst_out.unwrap_or(rate_out).as_u64(),
271 );
610150a4 272 }
9531d2c5
TL
273 None => entry.1 = None,
274 },
610150a4 275 None => {
56472190 276 if let Some(rate_out) = limit.rate_out {
de21d4ef
DM
277 let name = format!("{}.out", rule.name);
278 let limiter = create_limiter(
279 self.use_shared_memory,
280 &name,
118515db 281 rate_out.as_u64(),
56472190 282 limit.burst_out.unwrap_or(rate_out).as_u64(),
de21d4ef 283 )?;
d5f58006 284 entry.1 = Some(limiter);
610150a4
DM
285 }
286 }
287 }
288
289 let mut timeframe = Vec::new();
290
291 if let Some(ref timefram_list) = rule.timeframe {
292 for duration_str in timefram_list {
293 let duration = parse_daily_duration(duration_str)?;
294 timeframe.push(duration);
295 }
296 }
297
298 let mut networks = Vec::new();
299
300 for network in rule.network.iter() {
301 let cidr = match network.parse() {
302 Ok(cidr) => cidr,
303 Err(err) => {
304 log::error!("unable to parse network '{}' - {}", network, err);
305 continue;
306 }
307 };
308 networks.push(cidr);
309 }
310
9531d2c5
TL
311 active_rules.push(ParsedTcRule {
312 config: rule,
313 networks,
314 timeframe,
315 });
610150a4
DM
316 }
317
318 self.rules = active_rules;
319
320 Ok(())
321 }
322
e705b305
DM
323 /// Returns the rate limiter (if any) for the specified peer address.
324 ///
325 /// - Rules where timeframe does not match are skipped.
326 /// - Rules with smaller network size have higher priority.
327 ///
328 /// Behavior is undefined if more than one rule matches after
329 /// above selection.
610150a4
DM
330 pub fn lookup_rate_limiter(
331 &self,
1993d986 332 peer: SocketAddr,
610150a4 333 now: i64,
8e70d421 334 ) -> (&str, Option<SharedRateLimit>, Option<SharedRateLimit>) {
610150a4
DM
335 let peer_ip = cannonical_ip(peer.ip());
336
337 log::debug!("lookup_rate_limiter: {:?}", peer_ip);
338
339 let now = match TmEditor::with_epoch(now, self.use_utc) {
340 Ok(now) => now,
341 Err(err) => {
342 log::error!("lookup_rate_limiter: TmEditor::with_epoch failed - {}", err);
343 return ("", None, None);
344 }
345 };
346
347 let mut last_rule_match = None;
348
349 for rule in self.rules.iter() {
9531d2c5
TL
350 if !timeframe_match(&rule.timeframe, &now) {
351 continue;
352 }
610150a4
DM
353
354 if let Some(match_len) = network_match_len(&rule.networks, &peer_ip) {
355 match last_rule_match {
356 None => last_rule_match = Some((rule, match_len)),
357 Some((_, last_len)) => {
358 if match_len > last_len {
359 last_rule_match = Some((rule, match_len));
360 }
361 }
362 }
363 }
364 }
365
366 match last_rule_match {
367 Some((rule, _)) => {
368 match self.limiter_map.get(&rule.config.name) {
9531d2c5
TL
369 Some((read_limiter, write_limiter)) => (
370 &rule.config.name,
371 read_limiter.clone(),
372 write_limiter.clone(),
373 ),
610150a4
DM
374 None => ("", None, None), // should never happen
375 }
376 }
377 None => ("", None, None),
378 }
379 }
380}
381
610150a4
DM
382#[cfg(test)]
383mod test {
384 use super::*;
385
386 const fn make_test_time(mday: i32, hour: i32, min: i32) -> i64 {
9531d2c5 387 (mday * 3600 * 24 + hour * 3600 + min * 60) as i64
610150a4
DM
388 }
389
390 #[test]
391 fn testnetwork_match() -> Result<(), Error> {
610150a4
DM
392 let networks = ["192.168.2.1/24", "127.0.0.0/8"];
393 let networks: Vec<IpInet> = networks.iter().map(|n| n.parse().unwrap()).collect();
394
9531d2c5
TL
395 assert_eq!(
396 network_match_len(&networks, &"192.168.2.1".parse()?),
397 Some(24)
398 );
399 assert_eq!(
400 network_match_len(&networks, &"192.168.2.254".parse()?),
401 Some(24)
402 );
610150a4
DM
403 assert_eq!(network_match_len(&networks, &"192.168.3.1".parse()?), None);
404 assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(8));
405 assert_eq!(network_match_len(&networks, &"128.1.1.0".parse()?), None);
406
407 let networks = ["0.0.0.0/0"];
408 let networks: Vec<IpInet> = networks.iter().map(|n| n.parse().unwrap()).collect();
409 assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(0));
9531d2c5
TL
410 assert_eq!(
411 network_match_len(&networks, &"192.168.2.1".parse()?),
412 Some(0)
413 );
610150a4
DM
414
415 Ok(())
416 }
417
418 #[test]
9531d2c5 419 fn test_rule_match() -> Result<(), Error> {
610150a4
DM
420 let config_data = "
421rule: rule1
422 comment my test rule
423 network 192.168.2.0/24
424 rate-in 50000000
425 rate-out 50000000
426 timeframe 8-12
427 timeframe 14-16
428
429rule: rule2
430 network 192.168.2.35/32
431 network 127.0.0.1/8
432 rate-in 150000000
433 rate-out 150000000
434 timeframe 18-20
435
436rule: somewhere
437 network 0.0.0.0/0
438 rate-in 100000000
439 rate-out 100000000
440";
441 let config = pbs_config::traffic_control::CONFIG.parse("testconfig", config_data)?;
442
443 let mut cache = TrafficControlCache::new();
444 cache.use_utc = true;
e3eb062c 445 cache.use_shared_memory = false; // avoid permission problems in test environment
610150a4
DM
446
447 cache.update_config(&config)?;
448
449 const THURSDAY_80_00: i64 = make_test_time(0, 8, 0);
450 const THURSDAY_15_00: i64 = make_test_time(0, 15, 0);
451 const THURSDAY_19_00: i64 = make_test_time(0, 19, 0);
452
453 let local = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
454 let gateway = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), 1234);
455 let private = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 35)), 1234);
456 let somewhere = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 1234);
457
9531d2c5
TL
458 let (rule, read_limiter, write_limiter) =
459 cache.lookup_rate_limiter(somewhere, THURSDAY_80_00);
610150a4
DM
460 assert_eq!(rule, "somewhere");
461 assert!(read_limiter.is_some());
462 assert!(write_limiter.is_some());
463
9531d2c5 464 let (rule, read_limiter, write_limiter) = cache.lookup_rate_limiter(local, THURSDAY_19_00);
610150a4
DM
465 assert_eq!(rule, "rule2");
466 assert!(read_limiter.is_some());
467 assert!(write_limiter.is_some());
468
9531d2c5
TL
469 let (rule, read_limiter, write_limiter) =
470 cache.lookup_rate_limiter(gateway, THURSDAY_15_00);
610150a4
DM
471 assert_eq!(rule, "rule1");
472 assert!(read_limiter.is_some());
473 assert!(write_limiter.is_some());
474
9531d2c5
TL
475 let (rule, read_limiter, write_limiter) =
476 cache.lookup_rate_limiter(gateway, THURSDAY_19_00);
610150a4
DM
477 assert_eq!(rule, "somewhere");
478 assert!(read_limiter.is_some());
479 assert!(write_limiter.is_some());
480
9531d2c5
TL
481 let (rule, read_limiter, write_limiter) =
482 cache.lookup_rate_limiter(private, THURSDAY_19_00);
610150a4
DM
483 assert_eq!(rule, "rule2");
484 assert!(read_limiter.is_some());
485 assert!(write_limiter.is_some());
486
487 Ok(())
488 }
610150a4 489}