use anyhow::{bail, format_err, Error};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::task::{Context, Poll};
use futures::*;
use crate::tls::MaybeTlsStream;
use crate::uri::build_authority;
-use super::{RateLimiter, RateLimitedStream};
+use super::{RateLimitedStream, ShareableRateLimit};
+
+type SharedRateLimit = Arc<dyn ShareableRateLimit>;
#[derive(Clone)]
pub struct HttpsConnector {
ssl_connector: Arc<SslConnector>,
proxy: Option<ProxyConfig>,
tcp_keepalive: u32,
- read_limiter: Option<Arc<Mutex<RateLimiter>>>,
- write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ read_limiter: Option<SharedRateLimit>,
+ write_limiter: Option<SharedRateLimit>,
}
impl HttpsConnector {
self.proxy = Some(proxy);
}
- pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ pub fn set_read_limiter(&mut self, limiter: Option<SharedRateLimit>) {
self.read_limiter = limiter;
}
- pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+ pub fn set_write_limiter(&mut self, limiter: Option<SharedRateLimit>) {
self.write_limiter = limiter;
}
use std::task::{Context, Poll};
-use super::{RateLimit, RateLimiter};
+use super::{ShareableRateLimit, RateLimiter};
+
+type SharedRateLimit = Arc<dyn ShareableRateLimit>;
/// A rate limited stream using [RateLimiter]
pub struct RateLimitedStream<S> {
- read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ read_limiter: Option<SharedRateLimit>,
read_delay: Option<Pin<Box<Sleep>>>,
- write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<SharedRateLimit>,
write_delay: Option<Pin<Box<Sleep>>>,
- update_limiter_cb: Option<Box<dyn Fn() -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) + Send>>,
+ update_limiter_cb: Option<Box<dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send>>,
last_limiter_update: Instant,
stream: S,
}
/// Creates a new instance with reads and writes limited to the same `rate`.
pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
let now = Instant::now();
- let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
- let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
+ let read_limiter = RateLimiter::with_start_time(rate, bucket_size, now);
+ let read_limiter: SharedRateLimit = Arc::new(Mutex::new(read_limiter));
+ let write_limiter = RateLimiter::with_start_time(rate, bucket_size, now);
+ let write_limiter: SharedRateLimit = Arc::new(Mutex::new(write_limiter));
Self::with_limiter(stream, Some(read_limiter), Some(write_limiter))
}
/// Creates a new instance with specified [RateLimiters] for reads and writes.
pub fn with_limiter(
stream: S,
- read_limiter: Option<Arc<Mutex<RateLimiter>>>,
- write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ read_limiter: Option<SharedRateLimit>,
+ write_limiter: Option<SharedRateLimit>,
) -> Self {
- Self {
+ Self {
read_limiter,
read_delay: None,
write_limiter,
///
/// Note: This function is called within an async context, so it
/// should be fast and must not block.
- pub fn with_limiter_update_cb<F: Fn() -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) + Send + 'static>(
+ pub fn with_limiter_update_cb<F: Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send + 'static>(
stream: S,
update_limiter_cb: F,
) -> Self {
}
fn register_traffic(
- limiter: &Mutex<RateLimiter>,
+ limiter: &(dyn ShareableRateLimit),
count: usize,
) -> Option<Pin<Box<Sleep>>>{
const MIN_DELAY: Duration = Duration::from_millis(10);
let now = Instant::now();
- let delay = limiter.lock().unwrap()
- .register_traffic(now, count as u64);
+ let delay = limiter.register_traffic(now, count as u64);
if delay >= MIN_DELAY {
let sleep = tokio::time::sleep(delay);
Some(Box::pin(sleep))
let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
- if let Some(ref limiter) = this.write_limiter {
+ if let Some(ref mut limiter) = this.write_limiter {
if let Poll::Ready(Ok(count)) = result {
- this.write_delay = register_traffic(limiter, count);
+ this.write_delay = register_traffic(limiter.as_ref(), count);
}
}
if let Some(ref limiter) = this.write_limiter {
if let Poll::Ready(Ok(count)) = result {
- this.write_delay = register_traffic(limiter, count);
+ this.write_delay = register_traffic(limiter.as_ref(), count);
}
}
if let Some(ref read_limiter) = this.read_limiter {
if let Poll::Ready(Ok(())) = &result {
let count = buf.filled().len() - filled_len;
- this.read_delay = register_traffic(read_limiter, count);
+ this.read_delay = register_traffic(read_limiter.as_ref(), count);
}
}
fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration;
}
+/// Like [RateLimit], but does not require self to be mutable.
+///
+/// This is useful for types providing internal mutability (Mutex).
+pub trait ShareableRateLimit: Send + Sync {
+ fn update_rate(&self, rate: u64, bucket_size: u64);
+ fn average_rate(&self, current_time: Instant) -> f64;
+ fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration;
+}
+
/// Token bucket based rate limiter
pub struct RateLimiter {
rate: u64, // tokens/second
Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
}
}
+
+impl <R: RateLimit + Send> ShareableRateLimit for std::sync::Mutex<R> {
+
+ fn update_rate(&self, rate: u64, bucket_size: u64) {
+ self.lock().unwrap().update_rate(rate, bucket_size);
+ }
+
+ fn average_rate(&self, current_time: Instant) -> f64 {
+ self.lock().unwrap().average_rate(current_time)
+ }
+
+ fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration {
+ self.lock().unwrap().register_traffic(current_time, data_len)
+ }
+}