1 //! Traffic control implementation
3 use std
::collections
::HashMap
;
4 use std
::net
::{IpAddr, Ipv4Addr, SocketAddr}
;
5 use std
::sync
::{Arc, Mutex}
;
6 use std
::time
::Instant
;
11 use proxmox_http
::{RateLimiter, ShareableRateLimit}
;
12 use proxmox_section_config
::SectionConfigData
;
14 use proxmox_time
::{parse_daily_duration, DailyDuration, TmEditor}
;
16 use pbs_api_types
::TrafficControlRule
;
18 use pbs_config
::ConfigVersionCache
;
20 use crate::tools
::SharedRateLimiter
;
22 pub type SharedRateLimit
= Arc
<dyn ShareableRateLimit
>;
24 lazy_static
::lazy_static
! {
25 /// Shared traffic control cache singleton.
26 pub static ref TRAFFIC_CONTROL_CACHE
: Arc
<Mutex
<TrafficControlCache
>> =
27 Arc
::new(Mutex
::new(TrafficControlCache
::new()));
31 config
: TrafficControlRule
, // original rule config
32 networks
: Vec
<IpInet
>, // parsed networks
33 timeframe
: Vec
<DailyDuration
>, // parsed timeframe
36 /// Traffic control statistics
37 pub struct TrafficStat
{
38 /// Total incoming traffic (bytes)
40 /// Incoming data rate (bytes/second)
42 /// Total outgoing traffic (bytes)
44 /// Outgoing data rate (bytes/second)
48 /// Cache rules from `/etc/proxmox-backup/traffic-control.cfg`
49 /// together with corresponding rate limiter implementation.
50 pub struct TrafficControlCache
{
51 // use shared memory to make it work with daemon restarts
52 use_shared_memory
: bool
,
53 last_rate_compute
: Instant
,
54 current_rate_map
: HashMap
<String
, TrafficStat
>,
56 last_traffic_control_generation
: usize,
57 rules
: Vec
<ParsedTcRule
>,
58 limiter_map
: HashMap
<String
, (Option
<SharedRateLimit
>, Option
<SharedRateLimit
>)>,
59 use_utc
: bool
, // currently only used for testing
62 fn timeframe_match(duration_list
: &[DailyDuration
], now
: &TmEditor
) -> bool
{
63 if duration_list
.is_empty() {
67 for duration
in duration_list
.iter() {
68 if duration
.time_match_with_tm_editor(now
) {
76 fn network_match_len(networks
: &[IpInet
], ip
: &IpAddr
) -> Option
<u8> {
77 let mut match_len
= None
;
79 for cidr
in networks
.iter() {
80 if cidr
.contains(ip
) {
81 let network_length
= cidr
.network_length();
84 if network_length
> len
{
85 match_len
= Some(network_length
);
88 None
=> match_len
= Some(network_length
),
95 fn cannonical_ip(ip
: IpAddr
) -> IpAddr
{
96 // TODO: use std::net::IpAddr::to_cananical once stable
98 IpAddr
::V4(addr
) => IpAddr
::V4(addr
),
99 IpAddr
::V6(addr
) => match addr
.octets() {
100 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a
, b
, c
, d
] => {
101 IpAddr
::V4(Ipv4Addr
::new(a
, b
, c
, d
))
103 _
=> IpAddr
::V6(addr
),
109 use_shared_memory
: bool
,
113 ) -> Result
<SharedRateLimit
, Error
> {
114 if use_shared_memory
{
115 let limiter
= SharedRateLimiter
::mmap_shmem(name
, rate
, burst
)?
;
116 Ok(Arc
::new(limiter
))
118 Ok(Arc
::new(Mutex
::new(RateLimiter
::new(rate
, burst
))))
122 impl TrafficControlCache
{
125 use_shared_memory
: true,
127 limiter_map
: HashMap
::new(),
128 last_traffic_control_generation
: 0,
131 last_rate_compute
: Instant
::now(),
132 current_rate_map
: HashMap
::new(),
136 /// Reload rules from configuration file
138 /// Only reload if configuration file was updated
139 /// ([ConfigVersionCache]) or last update is older that 60
141 pub fn reload(&mut self, now
: i64) {
142 let version_cache
= match ConfigVersionCache
::new() {
146 "TrafficControlCache::reload failed in ConfigVersionCache::new: {}",
153 let traffic_control_generation
= version_cache
.traffic_control_generation();
155 if (self.last_update
!= 0)
156 && (traffic_control_generation
== self.last_traffic_control_generation
)
157 && ((now
- self.last_update
) < 60)
162 log
::debug
!("reload traffic control rules");
164 self.last_traffic_control_generation
= traffic_control_generation
;
165 self.last_update
= now
;
167 if let Err(err
) = self.reload_impl() {
168 log
::error
!("TrafficControlCache::reload failed -> {err}");
172 fn reload_impl(&mut self) -> Result
<(), Error
> {
173 let (config
, _
) = pbs_config
::traffic_control
::config()?
;
175 self.update_config(&config
)
178 /// Compute current data rates.
180 /// This should be called every second (from `proxmox-backup-proxy`).
181 pub fn compute_current_rates(&mut self) {
182 let elapsed
= self.last_rate_compute
.elapsed().as_micros();
183 if elapsed
< 200_000 {
187 let mut new_rate_map
= HashMap
::new();
189 for (rule
, (read_limit
, write_limit
)) in self.limiter_map
.iter() {
190 let traffic_in
= read_limit
.as_ref().map(|l
| l
.traffic()).unwrap_or(0);
191 let traffic_out
= write_limit
.as_ref().map(|l
| l
.traffic()).unwrap_or(0);
194 let traffic_diff_out
;
196 if let Some(stat
) = self.current_rate_map
.get(rule
) {
197 traffic_diff_in
= traffic_in
.saturating_sub(stat
.traffic_in
);
198 traffic_diff_out
= traffic_out
.saturating_sub(stat
.traffic_out
);
201 traffic_diff_out
= 0;
204 let rate_in
= ((traffic_diff_in
as u128
) * 1_000_000) / elapsed
;
205 let rate_out
= ((traffic_diff_out
as u128
) * 1_000_000) / elapsed
;
207 let stat
= TrafficStat
{
210 rate_in
: rate_in
.try_into().unwrap_or(u64::MAX
),
211 rate_out
: rate_out
.try_into().unwrap_or(u64::MAX
),
213 new_rate_map
.insert(rule
.clone(), stat
);
216 self.current_rate_map
= new_rate_map
;
218 self.last_rate_compute
= Instant
::now()
221 /// Returns current [TrafficStat] for each configured rule.
222 pub fn current_rate_map(&self) -> &HashMap
<String
, TrafficStat
> {
223 &self.current_rate_map
226 fn update_config(&mut self, config
: &SectionConfigData
) -> Result
<(), Error
> {
228 .retain(|key
, _value
| config
.sections
.contains_key(key
));
230 let rules
: Vec
<TrafficControlRule
> = config
.convert_to_typed_array("rule")?
;
232 let mut active_rules
= Vec
::new();
237 .entry(rule
.name
.clone())
238 .or_insert((None
, None
));
239 let limit
= &rule
.limit
;
242 Some(ref read_limiter
) => match limit
.rate_in
{
244 read_limiter
.update_rate(
246 limit
.burst_in
.unwrap_or(rate_in
).as_u64(),
249 None
=> entry
.0 = None
,
252 if let Some(rate_in
) = limit
.rate_in
{
253 let name
= format
!("{}.in", rule
.name
);
254 let limiter
= create_limiter(
255 self.use_shared_memory
,
258 limit
.burst_in
.unwrap_or(rate_in
).as_u64(),
260 entry
.0 = Some(limiter
);
266 Some(ref write_limiter
) => match limit
.rate_out
{
268 write_limiter
.update_rate(
270 limit
.burst_out
.unwrap_or(rate_out
).as_u64(),
273 None
=> entry
.1 = None
,
276 if let Some(rate_out
) = limit
.rate_out
{
277 let name
= format
!("{}.out", rule
.name
);
278 let limiter
= create_limiter(
279 self.use_shared_memory
,
282 limit
.burst_out
.unwrap_or(rate_out
).as_u64(),
284 entry
.1 = Some(limiter
);
289 let mut timeframe
= Vec
::new();
291 if let Some(ref timefram_list
) = rule
.timeframe
{
292 for duration_str
in timefram_list
{
293 let duration
= parse_daily_duration(duration_str
)?
;
294 timeframe
.push(duration
);
298 let mut networks
= Vec
::new();
300 for network
in rule
.network
.iter() {
301 let cidr
= match network
.parse() {
304 log
::error
!("unable to parse network '{}' - {}", network
, err
);
311 active_rules
.push(ParsedTcRule
{
318 self.rules
= active_rules
;
323 /// Returns the rate limiter (if any) for the specified peer address.
325 /// - Rules where timeframe does not match are skipped.
326 /// - Rules with smaller network size have higher priority.
328 /// Behavior is undefined if more than one rule matches after
330 pub fn lookup_rate_limiter(
334 ) -> (&str, Option
<SharedRateLimit
>, Option
<SharedRateLimit
>) {
335 let peer_ip
= cannonical_ip(peer
.ip());
337 log
::debug
!("lookup_rate_limiter: {:?}", peer_ip
);
339 let now
= match TmEditor
::with_epoch(now
, self.use_utc
) {
342 log
::error
!("lookup_rate_limiter: TmEditor::with_epoch failed - {}", err
);
343 return ("", None
, None
);
347 let mut last_rule_match
= None
;
349 for rule
in self.rules
.iter() {
350 if !timeframe_match(&rule
.timeframe
, &now
) {
354 if let Some(match_len
) = network_match_len(&rule
.networks
, &peer_ip
) {
355 match last_rule_match
{
356 None
=> last_rule_match
= Some((rule
, match_len
)),
357 Some((_
, last_len
)) => {
358 if match_len
> last_len
{
359 last_rule_match
= Some((rule
, match_len
));
366 match last_rule_match
{
368 match self.limiter_map
.get(&rule
.config
.name
) {
369 Some((read_limiter
, write_limiter
)) => (
371 read_limiter
.clone(),
372 write_limiter
.clone(),
374 None
=> ("", None
, None
), // should never happen
377 None
=> ("", None
, None
),
386 const fn make_test_time(mday
: i32, hour
: i32, min
: i32) -> i64 {
387 (mday
* 3600 * 24 + hour
* 3600 + min
* 60) as i64
391 fn testnetwork_match() -> Result
<(), Error
> {
392 let networks
= ["192.168.2.1/24", "127.0.0.0/8"];
393 let networks
: Vec
<IpInet
> = networks
.iter().map(|n
| n
.parse().unwrap()).collect();
396 network_match_len(&networks
, &"192.168.2.1".parse()?
),
400 network_match_len(&networks
, &"192.168.2.254".parse()?
),
403 assert_eq
!(network_match_len(&networks
, &"192.168.3.1".parse()?
), None
);
404 assert_eq
!(network_match_len(&networks
, &"127.1.1.0".parse()?
), Some(8));
405 assert_eq
!(network_match_len(&networks
, &"128.1.1.0".parse()?
), None
);
407 let networks
= ["0.0.0.0/0"];
408 let networks
: Vec
<IpInet
> = networks
.iter().map(|n
| n
.parse().unwrap()).collect();
409 assert_eq
!(network_match_len(&networks
, &"127.1.1.0".parse()?
), Some(0));
411 network_match_len(&networks
, &"192.168.2.1".parse()?
),
419 fn test_rule_match() -> Result
<(), Error
> {
423 network 192.168.2.0/24
430 network 192.168.2.35/32
441 let config
= pbs_config
::traffic_control
::CONFIG
.parse("testconfig", config_data
)?
;
443 let mut cache
= TrafficControlCache
::new();
444 cache
.use_utc
= true;
445 cache
.use_shared_memory
= false; // avoid permission problems in test environment
447 cache
.update_config(&config
)?
;
449 const THURSDAY_80_00
: i64 = make_test_time(0, 8, 0);
450 const THURSDAY_15_00
: i64 = make_test_time(0, 15, 0);
451 const THURSDAY_19_00
: i64 = make_test_time(0, 19, 0);
453 let local
= SocketAddr
::new(IpAddr
::V4(Ipv4Addr
::new(127, 0, 0, 1)), 1234);
454 let gateway
= SocketAddr
::new(IpAddr
::V4(Ipv4Addr
::new(192, 168, 2, 1)), 1234);
455 let private
= SocketAddr
::new(IpAddr
::V4(Ipv4Addr
::new(192, 168, 2, 35)), 1234);
456 let somewhere
= SocketAddr
::new(IpAddr
::V4(Ipv4Addr
::new(1, 2, 3, 4)), 1234);
458 let (rule
, read_limiter
, write_limiter
) =
459 cache
.lookup_rate_limiter(somewhere
, THURSDAY_80_00
);
460 assert_eq
!(rule
, "somewhere");
461 assert
!(read_limiter
.is_some());
462 assert
!(write_limiter
.is_some());
464 let (rule
, read_limiter
, write_limiter
) = cache
.lookup_rate_limiter(local
, THURSDAY_19_00
);
465 assert_eq
!(rule
, "rule2");
466 assert
!(read_limiter
.is_some());
467 assert
!(write_limiter
.is_some());
469 let (rule
, read_limiter
, write_limiter
) =
470 cache
.lookup_rate_limiter(gateway
, THURSDAY_15_00
);
471 assert_eq
!(rule
, "rule1");
472 assert
!(read_limiter
.is_some());
473 assert
!(write_limiter
.is_some());
475 let (rule
, read_limiter
, write_limiter
) =
476 cache
.lookup_rate_limiter(gateway
, THURSDAY_19_00
);
477 assert_eq
!(rule
, "somewhere");
478 assert
!(read_limiter
.is_some());
479 assert
!(write_limiter
.is_some());
481 let (rule
, read_limiter
, write_limiter
) =
482 cache
.lookup_rate_limiter(private
, THURSDAY_19_00
);
483 assert_eq
!(rule
, "rule2");
484 assert
!(read_limiter
.is_some());
485 assert
!(write_limiter
.is_some());