]>
Commit | Line | Data |
---|---|---|
e705b305 DM |
1 | //! Traffic control implementation |
2 | ||
610150a4 DM |
3 | use std::collections::HashMap; |
4 | use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | |
9531d2c5 | 5 | use std::sync::{Arc, Mutex}; |
a0172d76 | 6 | use std::time::Instant; |
610150a4 DM |
7 | |
8 | use anyhow::Error; | |
9 | use cidr::IpInet; | |
10 | ||
5aeeb44a | 11 | use proxmox_http::{RateLimiter, ShareableRateLimit}; |
610150a4 DM |
12 | use proxmox_section_config::SectionConfigData; |
13 | ||
15cc41b6 | 14 | use proxmox_time::{parse_daily_duration, DailyDuration, TmEditor}; |
610150a4 DM |
15 | |
16 | use pbs_api_types::TrafficControlRule; | |
17 | ||
cb80ffc1 | 18 | use pbs_config::ConfigVersionCache; |
610150a4 | 19 | |
d6644e29 | 20 | use crate::tools::SharedRateLimiter; |
de21d4ef | 21 | |
8e70d421 WB |
22 | pub type SharedRateLimit = Arc<dyn ShareableRateLimit>; |
23 | ||
9531d2c5 | 24 | lazy_static::lazy_static! { |
e705b305 | 25 | /// Shared traffic control cache singleton. |
a0172d76 DM |
26 | pub static ref TRAFFIC_CONTROL_CACHE: Arc<Mutex<TrafficControlCache>> = |
27 | Arc::new(Mutex::new(TrafficControlCache::new())); | |
28 | } | |
29 | ||
610150a4 | 30 | struct ParsedTcRule { |
9531d2c5 TL |
31 | config: TrafficControlRule, // original rule config |
32 | networks: Vec<IpInet>, // parsed networks | |
610150a4 DM |
33 | timeframe: Vec<DailyDuration>, // parsed timeframe |
34 | } | |
35 | ||
e705b305 | 36 | /// Traffic control statistics |
a0172d76 | 37 | pub struct TrafficStat { |
d20137e5 | 38 | /// Total incoming traffic (bytes) |
a0172d76 | 39 | pub traffic_in: u64, |
e705b305 | 40 | /// Incoming data rate (bytes/second) |
a0172d76 | 41 | pub rate_in: u64, |
e705b305 | 42 | /// Total outgoing traffic (bytes) |
a0172d76 | 43 | pub traffic_out: u64, |
e705b305 | 44 | /// Outgoing data rate (bytes/second) |
a0172d76 DM |
45 | pub rate_out: u64, |
46 | } | |
47 | ||
e705b305 DM |
48 | /// Cache rules from `/etc/proxmox-backup/traffic-control.cfg` |
49 | /// together with corresponding rate limiter implementation. | |
610150a4 | 50 | pub struct TrafficControlCache { |
e705b305 | 51 | // use shared memory to make it work with daemon restarts |
de21d4ef | 52 | use_shared_memory: bool, |
a0172d76 DM |
53 | last_rate_compute: Instant, |
54 | current_rate_map: HashMap<String, TrafficStat>, | |
610150a4 DM |
55 | last_update: i64, |
56 | last_traffic_control_generation: usize, | |
57 | rules: Vec<ParsedTcRule>, | |
8e70d421 | 58 | limiter_map: HashMap<String, (Option<SharedRateLimit>, Option<SharedRateLimit>)>, |
610150a4 DM |
59 | use_utc: bool, // currently only used for testing |
60 | } | |
61 | ||
9531d2c5 TL |
62 | fn timeframe_match(duration_list: &[DailyDuration], now: &TmEditor) -> bool { |
63 | if duration_list.is_empty() { | |
64 | return true; | |
65 | } | |
610150a4 DM |
66 | |
67 | for duration in duration_list.iter() { | |
68 | if duration.time_match_with_tm_editor(now) { | |
69 | return true; | |
70 | } | |
71 | } | |
72 | ||
73 | false | |
74 | } | |
75 | ||
9531d2c5 | 76 | fn network_match_len(networks: &[IpInet], ip: &IpAddr) -> Option<u8> { |
610150a4 DM |
77 | let mut match_len = None; |
78 | ||
79 | for cidr in networks.iter() { | |
80 | if cidr.contains(ip) { | |
81 | let network_length = cidr.network_length(); | |
82 | match match_len { | |
83 | Some(len) => { | |
84 | if network_length > len { | |
85 | match_len = Some(network_length); | |
86 | } | |
87 | } | |
88 | None => match_len = Some(network_length), | |
89 | } | |
90 | } | |
91 | } | |
92 | match_len | |
93 | } | |
94 | ||
95 | fn cannonical_ip(ip: IpAddr) -> IpAddr { | |
96 | // TODO: use std::net::IpAddr::to_cananical once stable | |
97 | match ip { | |
98 | IpAddr::V4(addr) => IpAddr::V4(addr), | |
9531d2c5 TL |
99 | IpAddr::V6(addr) => match addr.octets() { |
100 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { | |
101 | IpAddr::V4(Ipv4Addr::new(a, b, c, d)) | |
610150a4 | 102 | } |
9531d2c5 TL |
103 | _ => IpAddr::V6(addr), |
104 | }, | |
610150a4 DM |
105 | } |
106 | } | |
107 | ||
d5f58006 | 108 | fn create_limiter( |
de21d4ef DM |
109 | use_shared_memory: bool, |
110 | name: &str, | |
d5f58006 DM |
111 | rate: u64, |
112 | burst: u64, | |
8e70d421 | 113 | ) -> Result<SharedRateLimit, Error> { |
de21d4ef DM |
114 | if use_shared_memory { |
115 | let limiter = SharedRateLimiter::mmap_shmem(name, rate, burst)?; | |
116 | Ok(Arc::new(limiter)) | |
117 | } else { | |
118 | Ok(Arc::new(Mutex::new(RateLimiter::new(rate, burst)))) | |
119 | } | |
d5f58006 DM |
120 | } |
121 | ||
610150a4 | 122 | impl TrafficControlCache { |
e705b305 | 123 | fn new() -> Self { |
610150a4 | 124 | Self { |
de21d4ef | 125 | use_shared_memory: true, |
610150a4 DM |
126 | rules: Vec::new(), |
127 | limiter_map: HashMap::new(), | |
128 | last_traffic_control_generation: 0, | |
129 | last_update: 0, | |
130 | use_utc: false, | |
a0172d76 DM |
131 | last_rate_compute: Instant::now(), |
132 | current_rate_map: HashMap::new(), | |
610150a4 DM |
133 | } |
134 | } | |
135 | ||
e705b305 DM |
136 | /// Reload rules from configuration file |
137 | /// | |
138 | /// Only reload if configuration file was updated | |
139 | /// ([ConfigVersionCache]) or last update is older that 60 | |
140 | /// seconds. | |
610150a4 | 141 | pub fn reload(&mut self, now: i64) { |
cb80ffc1 DM |
142 | let version_cache = match ConfigVersionCache::new() { |
143 | Ok(cache) => cache, | |
610150a4 | 144 | Err(err) => { |
9531d2c5 TL |
145 | log::error!( |
146 | "TrafficControlCache::reload failed in ConfigVersionCache::new: {}", | |
147 | err | |
148 | ); | |
610150a4 DM |
149 | return; |
150 | } | |
151 | }; | |
152 | ||
cb80ffc1 | 153 | let traffic_control_generation = version_cache.traffic_control_generation(); |
610150a4 | 154 | |
9531d2c5 TL |
155 | if (self.last_update != 0) |
156 | && (traffic_control_generation == self.last_traffic_control_generation) | |
157 | && ((now - self.last_update) < 60) | |
158 | { | |
159 | return; | |
160 | } | |
610150a4 DM |
161 | |
162 | log::debug!("reload traffic control rules"); | |
163 | ||
164 | self.last_traffic_control_generation = traffic_control_generation; | |
165 | self.last_update = now; | |
166 | ||
6aff2de5 MS |
167 | if let Err(err) = self.reload_impl() { |
168 | log::error!("TrafficControlCache::reload failed -> {err}"); | |
610150a4 DM |
169 | } |
170 | } | |
171 | ||
172 | fn reload_impl(&mut self) -> Result<(), Error> { | |
173 | let (config, _) = pbs_config::traffic_control::config()?; | |
174 | ||
175 | self.update_config(&config) | |
176 | } | |
177 | ||
e705b305 DM |
178 | /// Compute current data rates. |
179 | /// | |
180 | /// This should be called every second (from `proxmox-backup-proxy`). | |
a0172d76 | 181 | pub fn compute_current_rates(&mut self) { |
a0172d76 | 182 | let elapsed = self.last_rate_compute.elapsed().as_micros(); |
9531d2c5 TL |
183 | if elapsed < 200_000 { |
184 | return; | |
185 | } // not enough data | |
a0172d76 DM |
186 | |
187 | let mut new_rate_map = HashMap::new(); | |
188 | ||
189 | for (rule, (read_limit, write_limit)) in self.limiter_map.iter() { | |
190 | let traffic_in = read_limit.as_ref().map(|l| l.traffic()).unwrap_or(0); | |
191 | let traffic_out = write_limit.as_ref().map(|l| l.traffic()).unwrap_or(0); | |
192 | ||
193 | let traffic_diff_in; | |
194 | let traffic_diff_out; | |
195 | ||
196 | if let Some(stat) = self.current_rate_map.get(rule) { | |
197 | traffic_diff_in = traffic_in.saturating_sub(stat.traffic_in); | |
198 | traffic_diff_out = traffic_out.saturating_sub(stat.traffic_out); | |
199 | } else { | |
200 | traffic_diff_in = 0; | |
201 | traffic_diff_out = 0; | |
202 | } | |
203 | ||
204 | let rate_in = ((traffic_diff_in as u128) * 1_000_000) / elapsed; | |
205 | let rate_out = ((traffic_diff_out as u128) * 1_000_000) / elapsed; | |
206 | ||
207 | let stat = TrafficStat { | |
208 | traffic_in, | |
209 | traffic_out, | |
210 | rate_in: rate_in.try_into().unwrap_or(u64::MAX), | |
211 | rate_out: rate_out.try_into().unwrap_or(u64::MAX), | |
212 | }; | |
213 | new_rate_map.insert(rule.clone(), stat); | |
214 | } | |
215 | ||
216 | self.current_rate_map = new_rate_map; | |
217 | ||
218 | self.last_rate_compute = Instant::now() | |
219 | } | |
220 | ||
e705b305 | 221 | /// Returns current [TrafficStat] for each configured rule. |
a0172d76 DM |
222 | pub fn current_rate_map(&self) -> &HashMap<String, TrafficStat> { |
223 | &self.current_rate_map | |
224 | } | |
d5f58006 | 225 | |
610150a4 | 226 | fn update_config(&mut self, config: &SectionConfigData) -> Result<(), Error> { |
9531d2c5 TL |
227 | self.limiter_map |
228 | .retain(|key, _value| config.sections.contains_key(key)); | |
610150a4 | 229 | |
9531d2c5 | 230 | let rules: Vec<TrafficControlRule> = config.convert_to_typed_array("rule")?; |
610150a4 DM |
231 | |
232 | let mut active_rules = Vec::new(); | |
233 | ||
234 | for rule in rules { | |
9531d2c5 TL |
235 | let entry = self |
236 | .limiter_map | |
237 | .entry(rule.name.clone()) | |
238 | .or_insert((None, None)); | |
56472190 | 239 | let limit = &rule.limit; |
610150a4 DM |
240 | |
241 | match entry.0 { | |
9531d2c5 TL |
242 | Some(ref read_limiter) => match limit.rate_in { |
243 | Some(rate_in) => { | |
244 | read_limiter.update_rate( | |
245 | rate_in.as_u64(), | |
246 | limit.burst_in.unwrap_or(rate_in).as_u64(), | |
247 | ); | |
610150a4 | 248 | } |
9531d2c5 TL |
249 | None => entry.0 = None, |
250 | }, | |
610150a4 | 251 | None => { |
56472190 | 252 | if let Some(rate_in) = limit.rate_in { |
de21d4ef DM |
253 | let name = format!("{}.in", rule.name); |
254 | let limiter = create_limiter( | |
255 | self.use_shared_memory, | |
256 | &name, | |
118515db | 257 | rate_in.as_u64(), |
56472190 | 258 | limit.burst_in.unwrap_or(rate_in).as_u64(), |
de21d4ef | 259 | )?; |
d5f58006 | 260 | entry.0 = Some(limiter); |
610150a4 DM |
261 | } |
262 | } | |
263 | } | |
264 | ||
265 | match entry.1 { | |
9531d2c5 TL |
266 | Some(ref write_limiter) => match limit.rate_out { |
267 | Some(rate_out) => { | |
268 | write_limiter.update_rate( | |
269 | rate_out.as_u64(), | |
270 | limit.burst_out.unwrap_or(rate_out).as_u64(), | |
271 | ); | |
610150a4 | 272 | } |
9531d2c5 TL |
273 | None => entry.1 = None, |
274 | }, | |
610150a4 | 275 | None => { |
56472190 | 276 | if let Some(rate_out) = limit.rate_out { |
de21d4ef DM |
277 | let name = format!("{}.out", rule.name); |
278 | let limiter = create_limiter( | |
279 | self.use_shared_memory, | |
280 | &name, | |
118515db | 281 | rate_out.as_u64(), |
56472190 | 282 | limit.burst_out.unwrap_or(rate_out).as_u64(), |
de21d4ef | 283 | )?; |
d5f58006 | 284 | entry.1 = Some(limiter); |
610150a4 DM |
285 | } |
286 | } | |
287 | } | |
288 | ||
289 | let mut timeframe = Vec::new(); | |
290 | ||
291 | if let Some(ref timefram_list) = rule.timeframe { | |
292 | for duration_str in timefram_list { | |
293 | let duration = parse_daily_duration(duration_str)?; | |
294 | timeframe.push(duration); | |
295 | } | |
296 | } | |
297 | ||
298 | let mut networks = Vec::new(); | |
299 | ||
300 | for network in rule.network.iter() { | |
301 | let cidr = match network.parse() { | |
302 | Ok(cidr) => cidr, | |
303 | Err(err) => { | |
304 | log::error!("unable to parse network '{}' - {}", network, err); | |
305 | continue; | |
306 | } | |
307 | }; | |
308 | networks.push(cidr); | |
309 | } | |
310 | ||
9531d2c5 TL |
311 | active_rules.push(ParsedTcRule { |
312 | config: rule, | |
313 | networks, | |
314 | timeframe, | |
315 | }); | |
610150a4 DM |
316 | } |
317 | ||
318 | self.rules = active_rules; | |
319 | ||
320 | Ok(()) | |
321 | } | |
322 | ||
e705b305 DM |
323 | /// Returns the rate limiter (if any) for the specified peer address. |
324 | /// | |
325 | /// - Rules where timeframe does not match are skipped. | |
326 | /// - Rules with smaller network size have higher priority. | |
327 | /// | |
328 | /// Behavior is undefined if more than one rule matches after | |
329 | /// above selection. | |
610150a4 DM |
330 | pub fn lookup_rate_limiter( |
331 | &self, | |
1993d986 | 332 | peer: SocketAddr, |
610150a4 | 333 | now: i64, |
8e70d421 | 334 | ) -> (&str, Option<SharedRateLimit>, Option<SharedRateLimit>) { |
610150a4 DM |
335 | let peer_ip = cannonical_ip(peer.ip()); |
336 | ||
337 | log::debug!("lookup_rate_limiter: {:?}", peer_ip); | |
338 | ||
339 | let now = match TmEditor::with_epoch(now, self.use_utc) { | |
340 | Ok(now) => now, | |
341 | Err(err) => { | |
342 | log::error!("lookup_rate_limiter: TmEditor::with_epoch failed - {}", err); | |
343 | return ("", None, None); | |
344 | } | |
345 | }; | |
346 | ||
347 | let mut last_rule_match = None; | |
348 | ||
349 | for rule in self.rules.iter() { | |
9531d2c5 TL |
350 | if !timeframe_match(&rule.timeframe, &now) { |
351 | continue; | |
352 | } | |
610150a4 DM |
353 | |
354 | if let Some(match_len) = network_match_len(&rule.networks, &peer_ip) { | |
355 | match last_rule_match { | |
356 | None => last_rule_match = Some((rule, match_len)), | |
357 | Some((_, last_len)) => { | |
358 | if match_len > last_len { | |
359 | last_rule_match = Some((rule, match_len)); | |
360 | } | |
361 | } | |
362 | } | |
363 | } | |
364 | } | |
365 | ||
366 | match last_rule_match { | |
367 | Some((rule, _)) => { | |
368 | match self.limiter_map.get(&rule.config.name) { | |
9531d2c5 TL |
369 | Some((read_limiter, write_limiter)) => ( |
370 | &rule.config.name, | |
371 | read_limiter.clone(), | |
372 | write_limiter.clone(), | |
373 | ), | |
610150a4 DM |
374 | None => ("", None, None), // should never happen |
375 | } | |
376 | } | |
377 | None => ("", None, None), | |
378 | } | |
379 | } | |
380 | } | |
381 | ||
610150a4 DM |
382 | #[cfg(test)] |
383 | mod test { | |
384 | use super::*; | |
385 | ||
386 | const fn make_test_time(mday: i32, hour: i32, min: i32) -> i64 { | |
9531d2c5 | 387 | (mday * 3600 * 24 + hour * 3600 + min * 60) as i64 |
610150a4 DM |
388 | } |
389 | ||
390 | #[test] | |
391 | fn testnetwork_match() -> Result<(), Error> { | |
610150a4 DM |
392 | let networks = ["192.168.2.1/24", "127.0.0.0/8"]; |
393 | let networks: Vec<IpInet> = networks.iter().map(|n| n.parse().unwrap()).collect(); | |
394 | ||
9531d2c5 TL |
395 | assert_eq!( |
396 | network_match_len(&networks, &"192.168.2.1".parse()?), | |
397 | Some(24) | |
398 | ); | |
399 | assert_eq!( | |
400 | network_match_len(&networks, &"192.168.2.254".parse()?), | |
401 | Some(24) | |
402 | ); | |
610150a4 DM |
403 | assert_eq!(network_match_len(&networks, &"192.168.3.1".parse()?), None); |
404 | assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(8)); | |
405 | assert_eq!(network_match_len(&networks, &"128.1.1.0".parse()?), None); | |
406 | ||
407 | let networks = ["0.0.0.0/0"]; | |
408 | let networks: Vec<IpInet> = networks.iter().map(|n| n.parse().unwrap()).collect(); | |
409 | assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(0)); | |
9531d2c5 TL |
410 | assert_eq!( |
411 | network_match_len(&networks, &"192.168.2.1".parse()?), | |
412 | Some(0) | |
413 | ); | |
610150a4 DM |
414 | |
415 | Ok(()) | |
416 | } | |
417 | ||
418 | #[test] | |
9531d2c5 | 419 | fn test_rule_match() -> Result<(), Error> { |
610150a4 DM |
420 | let config_data = " |
421 | rule: rule1 | |
422 | comment my test rule | |
423 | network 192.168.2.0/24 | |
424 | rate-in 50000000 | |
425 | rate-out 50000000 | |
426 | timeframe 8-12 | |
427 | timeframe 14-16 | |
428 | ||
429 | rule: rule2 | |
430 | network 192.168.2.35/32 | |
431 | network 127.0.0.1/8 | |
432 | rate-in 150000000 | |
433 | rate-out 150000000 | |
434 | timeframe 18-20 | |
435 | ||
436 | rule: somewhere | |
437 | network 0.0.0.0/0 | |
438 | rate-in 100000000 | |
439 | rate-out 100000000 | |
440 | "; | |
441 | let config = pbs_config::traffic_control::CONFIG.parse("testconfig", config_data)?; | |
442 | ||
443 | let mut cache = TrafficControlCache::new(); | |
444 | cache.use_utc = true; | |
e3eb062c | 445 | cache.use_shared_memory = false; // avoid permission problems in test environment |
610150a4 DM |
446 | |
447 | cache.update_config(&config)?; | |
448 | ||
449 | const THURSDAY_80_00: i64 = make_test_time(0, 8, 0); | |
450 | const THURSDAY_15_00: i64 = make_test_time(0, 15, 0); | |
451 | const THURSDAY_19_00: i64 = make_test_time(0, 19, 0); | |
452 | ||
453 | let local = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234); | |
454 | let gateway = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), 1234); | |
455 | let private = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 35)), 1234); | |
456 | let somewhere = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 1234); | |
457 | ||
9531d2c5 TL |
458 | let (rule, read_limiter, write_limiter) = |
459 | cache.lookup_rate_limiter(somewhere, THURSDAY_80_00); | |
610150a4 DM |
460 | assert_eq!(rule, "somewhere"); |
461 | assert!(read_limiter.is_some()); | |
462 | assert!(write_limiter.is_some()); | |
463 | ||
9531d2c5 | 464 | let (rule, read_limiter, write_limiter) = cache.lookup_rate_limiter(local, THURSDAY_19_00); |
610150a4 DM |
465 | assert_eq!(rule, "rule2"); |
466 | assert!(read_limiter.is_some()); | |
467 | assert!(write_limiter.is_some()); | |
468 | ||
9531d2c5 TL |
469 | let (rule, read_limiter, write_limiter) = |
470 | cache.lookup_rate_limiter(gateway, THURSDAY_15_00); | |
610150a4 DM |
471 | assert_eq!(rule, "rule1"); |
472 | assert!(read_limiter.is_some()); | |
473 | assert!(write_limiter.is_some()); | |
474 | ||
9531d2c5 TL |
475 | let (rule, read_limiter, write_limiter) = |
476 | cache.lookup_rate_limiter(gateway, THURSDAY_19_00); | |
610150a4 DM |
477 | assert_eq!(rule, "somewhere"); |
478 | assert!(read_limiter.is_some()); | |
479 | assert!(write_limiter.is_some()); | |
480 | ||
9531d2c5 TL |
481 | let (rule, read_limiter, write_limiter) = |
482 | cache.lookup_rate_limiter(private, THURSDAY_19_00); | |
610150a4 DM |
483 | assert_eq!(rule, "rule2"); |
484 | assert!(read_limiter.is_some()); | |
485 | assert!(write_limiter.is_some()); | |
486 | ||
487 | Ok(()) | |
488 | } | |
610150a4 | 489 | } |